From c45a2f36d239b32fba9d2509916c076301b4bf10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Mon, 27 Apr 2026 06:45:01 +0000 Subject: [PATCH] test(cli): add e2e test for workflow runs / inspect / thread Start real daemon with echo workflow, trigger runs, verify: - thread list shows active runs - thread list --all includes completed runs - thread inspect shows log entries - thread show displays conversation output Extends e2e-harness with workflow support (echo workflow config). Fixes #160 --- packages/cli/src/__tests__/e2e-harness.ts | 60 +++++++++- .../cli/src/__tests__/e2e-workflow.test.ts | 109 ++++++++++++++++++ 2 files changed, 165 insertions(+), 4 deletions(-) create mode 100644 packages/cli/src/__tests__/e2e-workflow.test.ts diff --git a/packages/cli/src/__tests__/e2e-harness.ts b/packages/cli/src/__tests__/e2e-harness.ts index f863f47..da60c66 100644 --- a/packages/cli/src/__tests__/e2e-harness.ts +++ b/packages/cli/src/__tests__/e2e-harness.ts @@ -37,7 +37,7 @@ * ``` */ -import { existsSync, mkdirSync, mkdtempSync, rmSync, writeFileSync } from "node:fs"; +import { existsSync, mkdirSync, mkdtempSync, rmSync, symlinkSync, writeFileSync } from "node:fs"; import { createRequire } from "node:module"; import { tmpdir } from "node:os"; import { dirname, join } from "node:path"; @@ -52,10 +52,13 @@ import { logsCommand } from "../commands/logs.js"; import { senseCommand } from "../commands/sense.js"; import { statusCommand } from "../commands/status.js"; import { stopCommand } from "../commands/stop.js"; +import { threadCommand } from "../commands/thread.js"; +import { workflowCommand } from "../commands/workflow.js"; const require = createRequire(import.meta.url); const nerveDaemonRoot = dirname(require.resolve("@uncaged/nerve-daemon/package.json")); const senseWorkerScript = join(nerveDaemonRoot, "dist", "sense-worker.js"); +const workflowWorkerScript = join(nerveDaemonRoot, "dist", "workflow-worker.js"); const nerveYamlTemplate = `senses: counter: @@ -63,7 +66,11 @@ const nerveYamlTemplate = `senses: reflexes: [] -workflows: {} +workflows: + echo: + concurrency: 1 + overflow: queue + max_queue: 10 max_rounds: 10 @@ -73,6 +80,31 @@ api: host: 127.0.0.1 `; +/** + * Minimal echo workflow (one role round then END). + * Short delay in the role so two sequential CLI triggers can observe a queued run while the first is active. + */ +const echoWorkflowIndexJs = `const END = "__end__"; + +export default { + name: "echo", + roles: { + echo: async (start, _messages) => { + await new Promise((r) => setTimeout(r, 350)); + const p = typeof start.content === "string" ? start.content : ""; + return { + content: p.length > 0 ? "echo:" + p : "echo:empty", + meta: {}, + }; + }, + }, + moderator({ steps }) { + if (steps.length === 0) return "echo"; + return END; + }, +}; +`; + /** Empty migration — counter sense uses only `_signals` (auto-created by daemon). */ const counterMigration = `-- no-op migration for e2e counter sense SELECT 1; @@ -98,6 +130,8 @@ const e2eRootCommand = defineCommand({ daemon: daemonCommand, status: statusCommand, stop: stopCommand, + workflow: workflowCommand, + thread: threadCommand, }, }); @@ -107,16 +141,27 @@ function defaultTestConfig(): NerveConfig { counter: { group: "e2e", throttle: null, timeout: null, gracePeriod: null }, }, reflexes: [], - workflows: {}, + workflows: { + echo: { concurrency: 1, overflow: "queue", maxQueue: 10 }, + }, maxRounds: 10, api: { port: null, token: null, host: "127.0.0.1" }, }; } +/** Symlink monorepo `@uncaged/nerve-daemon` so `loadDaemonModule` (thread/workflow store) resolves under the temp workspace. */ +function linkWorkspaceDaemonPackage(nerveRoot: string): void { + const scopeDir = join(nerveRoot, "node_modules", "@uncaged"); + mkdirSync(scopeDir, { recursive: true }); + const linkPath = join(scopeDir, "nerve-daemon"); + symlinkSync(nerveDaemonRoot, linkPath); +} + function writeWorkspaceLayout(nerveRoot: string): void { mkdirSync(join(nerveRoot, "data", "senses"), { recursive: true }); mkdirSync(join(nerveRoot, "data", "blobs"), { recursive: true }); mkdirSync(join(nerveRoot, "senses", "counter", "migrations"), { recursive: true }); + mkdirSync(join(nerveRoot, "workflows", "echo"), { recursive: true }); writeFileSync(join(nerveRoot, "nerve.yaml"), nerveYamlTemplate, "utf8"); writeFileSync( join(nerveRoot, "senses", "counter", "migrations", "001.sql"), @@ -124,6 +169,8 @@ function writeWorkspaceLayout(nerveRoot: string): void { "utf8", ); writeFileSync(join(nerveRoot, "senses", "counter", "index.js"), counterIndexJs, "utf8"); + writeFileSync(join(nerveRoot, "workflows", "echo", "index.js"), echoWorkflowIndexJs, "utf8"); + linkWorkspaceDaemonPackage(nerveRoot); } export type TestDaemonHandle = { @@ -175,6 +222,11 @@ export async function startTestDaemon( `Missing "${senseWorkerScript}". Run \`pnpm --filter @uncaged/nerve-daemon build\` (cli package "pretest" runs this automatically).`, ); } + if (!existsSync(workflowWorkerScript)) { + throw new Error( + `Missing "${workflowWorkerScript}". Run \`pnpm --filter @uncaged/nerve-daemon build\`.`, + ); + } const fakeHome = mkdtempSync(join(tmpdir(), "nerve-cli-e2e-")); const nerveRoot = join(fakeHome, ".uncaged-nerve"); @@ -254,7 +306,7 @@ function patchWriteStream(stream: NodeJS.WriteStream, sink: string[]): () => voi /** * Runs `nerve ` for the subset wired in `e2eRootCommand` (`sense`, `logs`, `daemon`, - * top-level `status` / `stop`), with `process.env.HOME` pointing at `handle.fakeHome` so + * `status`, `stop`, `workflow`, `thread`), with `process.env.HOME` pointing at `handle.fakeHome` so * `getNerveRoot()` resolves to the test workspace. Captures stdout/stderr; sets `exitCode` * when `process.exit` is invoked or on thrown errors. */ diff --git a/packages/cli/src/__tests__/e2e-workflow.test.ts b/packages/cli/src/__tests__/e2e-workflow.test.ts new file mode 100644 index 0000000..9d44d4a --- /dev/null +++ b/packages/cli/src/__tests__/e2e-workflow.test.ts @@ -0,0 +1,109 @@ +/** + * E2E (issue #160): real kernel + workflow worker + IPC, then CLI `workflow` / `thread` + * against logs.db. Run listings live on `nerve thread list` (there is no `nerve workflow runs` subcommand). + */ + +import { join } from "node:path"; + +import { createLogStore } from "@uncaged/nerve-store"; +import { afterEach, describe, expect, it } from "vitest"; + +import { type TestDaemonHandle, runCli, startTestDaemon, stopTestDaemon } from "./e2e-harness.js"; + +const PAYLOAD_A = JSON.stringify({ prompt: "alpha-e2e", maxRounds: 10 }); +const PAYLOAD_B = JSON.stringify({ prompt: "beta-e2e", maxRounds: 10 }); + +async function waitForCompletedEchoRuns( + logsDbPath: string, + minCount: number, + timeoutMs: number, +): Promise { + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + const store = createLogStore(logsDbPath); + try { + const done = store.getAllWorkflowRuns("echo").filter((r) => r.status === "completed"); + if (done.length >= minCount) return; + } finally { + store.close(); + } + await new Promise((r) => setTimeout(r, 40)); + } + throw new Error( + `Timed out after ${String(timeoutMs)}ms waiting for ${String(minCount)} completed echo run(s)`, + ); +} + +describe("e2e workflow CLI (real daemon)", () => { + let daemon: TestDaemonHandle | null = null; + + afterEach(async () => { + const h = daemon; + daemon = null; + if (h === null) return; + await Promise.race([ + stopTestDaemon(h), + new Promise((_, reject) => + setTimeout(() => reject(new Error("stopTestDaemon timed out after 10s")), 10_000), + ), + ]); + }); + + it( + "trigger, thread list / --all, inspect, show (echo workflow)", + { timeout: 30_000 }, + async () => { + daemon = await startTestDaemon(); + const logsDb = join(daemon.nerveRoot, "data", "logs.db"); + + const t1 = await runCli(daemon, ["workflow", "trigger", "echo", "--payload", PAYLOAD_A]); + expect(t1.exitCode).toBe(0); + expect(t1.stdout).toContain("Triggered"); + + const t2 = await runCli(daemon, ["workflow", "trigger", "echo", "--payload", PAYLOAD_B]); + expect(t2.exitCode).toBe(0); + + const activeRightAfter = await runCli(daemon, ["thread", "list"]); + expect(activeRightAfter.exitCode).toBe(0); + expect(activeRightAfter.stdout).toContain("echo"); + expect(activeRightAfter.stdout).toMatch(/queued|started/); + + await waitForCompletedEchoRuns(logsDb, 2, 25_000); + + const store = createLogStore(logsDb); + let runId: string; + try { + const completed = store + .getAllWorkflowRuns("echo") + .filter((r) => r.status === "completed") + .sort((a, b) => a.timestamp - b.timestamp); + expect(completed.length).toBeGreaterThanOrEqual(2); + runId = completed[0]?.runId ?? ""; + expect(runId.length).toBeGreaterThan(0); + } finally { + store.close(); + } + + const listAll = await runCli(daemon, ["thread", "list", "--all"]); + expect(listAll.exitCode).toBe(0); + expect(listAll.stdout).toContain(runId); + expect(listAll.stdout).toContain("✅"); + expect(listAll.stdout).toContain("workflow=echo"); + + const listDefault = await runCli(daemon, ["thread", "list"]); + expect(listDefault.exitCode).toBe(0); + expect(listDefault.stdout).toMatch(/No active workflow runs|📭 No active/); + + const inspect = await runCli(daemon, ["thread", "inspect", runId, "--limit", "50"]); + expect(inspect.exitCode).toBe(0); + expect(inspect.stdout).toContain(`Workflow run: ${runId}`); + expect(inspect.stdout).toContain("type=started"); + expect(inspect.stdout).toContain("type=completed"); + + const show = await runCli(daemon, ["thread", "show", runId, "--budget", "50000"]); + expect(show.exitCode).toBe(0); + expect(show.stdout).toContain("echo:alpha-e2e"); + expect(show.stdout).toContain("[#1 echo]"); + }, + ); +}); -- 2.43.0