test(cli): add e2e test for workflow runs / inspect / thread #173
@@ -37,7 +37,7 @@
|
||||
* ```
|
||||
*/
|
||||
|
||||
import { existsSync, mkdirSync, mkdtempSync, rmSync, writeFileSync } from "node:fs";
|
||||
import { existsSync, mkdirSync, mkdtempSync, rmSync, symlinkSync, writeFileSync } from "node:fs";
|
||||
import { createRequire } from "node:module";
|
||||
import { tmpdir } from "node:os";
|
||||
import { dirname, join } from "node:path";
|
||||
@@ -52,10 +52,13 @@ import { logsCommand } from "../commands/logs.js";
|
||||
import { senseCommand } from "../commands/sense.js";
|
||||
import { statusCommand } from "../commands/status.js";
|
||||
import { stopCommand } from "../commands/stop.js";
|
||||
import { threadCommand } from "../commands/thread.js";
|
||||
import { workflowCommand } from "../commands/workflow.js";
|
||||
|
||||
const require = createRequire(import.meta.url);
|
||||
const nerveDaemonRoot = dirname(require.resolve("@uncaged/nerve-daemon/package.json"));
|
||||
const senseWorkerScript = join(nerveDaemonRoot, "dist", "sense-worker.js");
|
||||
const workflowWorkerScript = join(nerveDaemonRoot, "dist", "workflow-worker.js");
|
||||
|
||||
const nerveYamlTemplate = `senses:
|
||||
counter:
|
||||
@@ -63,7 +66,11 @@ const nerveYamlTemplate = `senses:
|
||||
|
||||
reflexes: []
|
||||
|
||||
workflows: {}
|
||||
workflows:
|
||||
echo:
|
||||
concurrency: 1
|
||||
overflow: queue
|
||||
max_queue: 10
|
||||
|
||||
max_rounds: 10
|
||||
|
||||
@@ -73,6 +80,31 @@ api:
|
||||
host: 127.0.0.1
|
||||
`;
|
||||
|
||||
/**
|
||||
* Minimal echo workflow (one role round then END).
|
||||
* Short delay in the role so two sequential CLI triggers can observe a queued run while the first is active.
|
||||
*/
|
||||
const echoWorkflowIndexJs = `const END = "__end__";
|
||||
|
||||
export default {
|
||||
name: "echo",
|
||||
roles: {
|
||||
echo: async (start, _messages) => {
|
||||
await new Promise((r) => setTimeout(r, 350));
|
||||
const p = typeof start.content === "string" ? start.content : "";
|
||||
return {
|
||||
content: p.length > 0 ? "echo:" + p : "echo:empty",
|
||||
meta: {},
|
||||
};
|
||||
},
|
||||
},
|
||||
moderator({ steps }) {
|
||||
if (steps.length === 0) return "echo";
|
||||
return END;
|
||||
},
|
||||
};
|
||||
`;
|
||||
|
||||
/** Empty migration — counter sense uses only `_signals` (auto-created by daemon). */
|
||||
const counterMigration = `-- no-op migration for e2e counter sense
|
||||
SELECT 1;
|
||||
@@ -98,6 +130,8 @@ const e2eRootCommand = defineCommand({
|
||||
daemon: daemonCommand,
|
||||
status: statusCommand,
|
||||
stop: stopCommand,
|
||||
workflow: workflowCommand,
|
||||
thread: threadCommand,
|
||||
},
|
||||
});
|
||||
|
||||
@@ -107,16 +141,27 @@ function defaultTestConfig(): NerveConfig {
|
||||
counter: { group: "e2e", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
workflows: {
|
||||
echo: { concurrency: 1, overflow: "queue", maxQueue: 10 },
|
||||
},
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
};
|
||||
}
|
||||
|
||||
/** Symlink monorepo `@uncaged/nerve-daemon` so `loadDaemonModule` (thread/workflow store) resolves under the temp workspace. */
|
||||
function linkWorkspaceDaemonPackage(nerveRoot: string): void {
|
||||
const scopeDir = join(nerveRoot, "node_modules", "@uncaged");
|
||||
mkdirSync(scopeDir, { recursive: true });
|
||||
const linkPath = join(scopeDir, "nerve-daemon");
|
||||
symlinkSync(nerveDaemonRoot, linkPath);
|
||||
}
|
||||
|
||||
function writeWorkspaceLayout(nerveRoot: string): void {
|
||||
mkdirSync(join(nerveRoot, "data", "senses"), { recursive: true });
|
||||
mkdirSync(join(nerveRoot, "data", "blobs"), { recursive: true });
|
||||
mkdirSync(join(nerveRoot, "senses", "counter", "migrations"), { recursive: true });
|
||||
mkdirSync(join(nerveRoot, "workflows", "echo"), { recursive: true });
|
||||
writeFileSync(join(nerveRoot, "nerve.yaml"), nerveYamlTemplate, "utf8");
|
||||
writeFileSync(
|
||||
join(nerveRoot, "senses", "counter", "migrations", "001.sql"),
|
||||
@@ -124,6 +169,8 @@ function writeWorkspaceLayout(nerveRoot: string): void {
|
||||
"utf8",
|
||||
);
|
||||
writeFileSync(join(nerveRoot, "senses", "counter", "index.js"), counterIndexJs, "utf8");
|
||||
writeFileSync(join(nerveRoot, "workflows", "echo", "index.js"), echoWorkflowIndexJs, "utf8");
|
||||
linkWorkspaceDaemonPackage(nerveRoot);
|
||||
}
|
||||
|
||||
export type TestDaemonHandle = {
|
||||
@@ -175,6 +222,11 @@ export async function startTestDaemon(
|
||||
`Missing "${senseWorkerScript}". Run \`pnpm --filter @uncaged/nerve-daemon build\` (cli package "pretest" runs this automatically).`,
|
||||
);
|
||||
}
|
||||
if (!existsSync(workflowWorkerScript)) {
|
||||
throw new Error(
|
||||
`Missing "${workflowWorkerScript}". Run \`pnpm --filter @uncaged/nerve-daemon build\`.`,
|
||||
);
|
||||
}
|
||||
|
||||
const fakeHome = mkdtempSync(join(tmpdir(), "nerve-cli-e2e-"));
|
||||
const nerveRoot = join(fakeHome, ".uncaged-nerve");
|
||||
@@ -254,7 +306,7 @@ function patchWriteStream(stream: NodeJS.WriteStream, sink: string[]): () => voi
|
||||
|
||||
/**
|
||||
* Runs `nerve <args>` for the subset wired in `e2eRootCommand` (`sense`, `logs`, `daemon`,
|
||||
* top-level `status` / `stop`), with `process.env.HOME` pointing at `handle.fakeHome` so
|
||||
* `status`, `stop`, `workflow`, `thread`), with `process.env.HOME` pointing at `handle.fakeHome` so
|
||||
* `getNerveRoot()` resolves to the test workspace. Captures stdout/stderr; sets `exitCode`
|
||||
* when `process.exit` is invoked or on thrown errors.
|
||||
*/
|
||||
|
||||
@@ -0,0 +1,109 @@
|
||||
/**
|
||||
* E2E (issue #160): real kernel + workflow worker + IPC, then CLI `workflow` / `thread`
|
||||
* against logs.db. Run listings live on `nerve thread list` (there is no `nerve workflow runs` subcommand).
|
||||
*/
|
||||
|
||||
import { join } from "node:path";
|
||||
|
||||
import { createLogStore } from "@uncaged/nerve-store";
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
|
||||
import { type TestDaemonHandle, runCli, startTestDaemon, stopTestDaemon } from "./e2e-harness.js";
|
||||
|
||||
const PAYLOAD_A = JSON.stringify({ prompt: "alpha-e2e", maxRounds: 10 });
|
||||
const PAYLOAD_B = JSON.stringify({ prompt: "beta-e2e", maxRounds: 10 });
|
||||
|
||||
async function waitForCompletedEchoRuns(
|
||||
logsDbPath: string,
|
||||
minCount: number,
|
||||
timeoutMs: number,
|
||||
): Promise<void> {
|
||||
const deadline = Date.now() + timeoutMs;
|
||||
while (Date.now() < deadline) {
|
||||
const store = createLogStore(logsDbPath);
|
||||
try {
|
||||
const done = store.getAllWorkflowRuns("echo").filter((r) => r.status === "completed");
|
||||
if (done.length >= minCount) return;
|
||||
} finally {
|
||||
store.close();
|
||||
}
|
||||
await new Promise((r) => setTimeout(r, 40));
|
||||
}
|
||||
throw new Error(
|
||||
`Timed out after ${String(timeoutMs)}ms waiting for ${String(minCount)} completed echo run(s)`,
|
||||
);
|
||||
}
|
||||
|
||||
describe("e2e workflow CLI (real daemon)", () => {
|
||||
let daemon: TestDaemonHandle | null = null;
|
||||
|
||||
afterEach(async () => {
|
||||
const h = daemon;
|
||||
daemon = null;
|
||||
if (h === null) return;
|
||||
await Promise.race([
|
||||
stopTestDaemon(h),
|
||||
new Promise<never>((_, reject) =>
|
||||
setTimeout(() => reject(new Error("stopTestDaemon timed out after 10s")), 10_000),
|
||||
),
|
||||
]);
|
||||
});
|
||||
|
||||
it(
|
||||
"trigger, thread list / --all, inspect, show (echo workflow)",
|
||||
{ timeout: 30_000 },
|
||||
async () => {
|
||||
daemon = await startTestDaemon();
|
||||
const logsDb = join(daemon.nerveRoot, "data", "logs.db");
|
||||
|
||||
const t1 = await runCli(daemon, ["workflow", "trigger", "echo", "--payload", PAYLOAD_A]);
|
||||
expect(t1.exitCode).toBe(0);
|
||||
expect(t1.stdout).toContain("Triggered");
|
||||
|
||||
const t2 = await runCli(daemon, ["workflow", "trigger", "echo", "--payload", PAYLOAD_B]);
|
||||
expect(t2.exitCode).toBe(0);
|
||||
|
||||
const activeRightAfter = await runCli(daemon, ["thread", "list"]);
|
||||
expect(activeRightAfter.exitCode).toBe(0);
|
||||
expect(activeRightAfter.stdout).toContain("echo");
|
||||
expect(activeRightAfter.stdout).toMatch(/queued|started/);
|
||||
|
||||
await waitForCompletedEchoRuns(logsDb, 2, 25_000);
|
||||
|
||||
const store = createLogStore(logsDb);
|
||||
let runId: string;
|
||||
try {
|
||||
const completed = store
|
||||
.getAllWorkflowRuns("echo")
|
||||
.filter((r) => r.status === "completed")
|
||||
.sort((a, b) => a.timestamp - b.timestamp);
|
||||
expect(completed.length).toBeGreaterThanOrEqual(2);
|
||||
runId = completed[0]?.runId ?? "";
|
||||
expect(runId.length).toBeGreaterThan(0);
|
||||
} finally {
|
||||
store.close();
|
||||
}
|
||||
|
||||
const listAll = await runCli(daemon, ["thread", "list", "--all"]);
|
||||
expect(listAll.exitCode).toBe(0);
|
||||
expect(listAll.stdout).toContain(runId);
|
||||
expect(listAll.stdout).toContain("✅");
|
||||
expect(listAll.stdout).toContain("workflow=echo");
|
||||
|
||||
const listDefault = await runCli(daemon, ["thread", "list"]);
|
||||
expect(listDefault.exitCode).toBe(0);
|
||||
expect(listDefault.stdout).toMatch(/No active workflow runs|📭 No active/);
|
||||
|
||||
const inspect = await runCli(daemon, ["thread", "inspect", runId, "--limit", "50"]);
|
||||
expect(inspect.exitCode).toBe(0);
|
||||
expect(inspect.stdout).toContain(`Workflow run: ${runId}`);
|
||||
expect(inspect.stdout).toContain("type=started");
|
||||
expect(inspect.stdout).toContain("type=completed");
|
||||
|
||||
const show = await runCli(daemon, ["thread", "show", runId, "--budget", "50000"]);
|
||||
expect(show.exitCode).toBe(0);
|
||||
expect(show.stdout).toContain("echo:alpha-e2e");
|
||||
expect(show.stdout).toContain("[#1 echo]");
|
||||
},
|
||||
);
|
||||
});
|
||||
Reference in New Issue
Block a user