From 262c77175f30adecc48e01ae098d5f6b7f757c33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Wed, 22 Apr 2026 13:46:05 +0000 Subject: [PATCH 1/2] =?UTF-8?q?feat(cli):=20Phase=204=20=E2=80=94=20workfl?= =?UTF-8?q?ow=20CLI=20commands=20&=20scaffold?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - workflow list: active runs with --all/--workflow/--limit/--offset pagination - workflow inspect : thread details with event pagination - workflow trigger : manual trigger, outputs runId - init workflow : scaffold template under workflows/ AI-friendly design: no ANSI colors, emoji for readability, pagination with stats + next-page hint on all list commands. 47 new tests (39 workflow + 8 init-workflow). All 212 tests pass. 小橘 🍊(NEKO Team) --- packages/cli/package.json | 7 +- .../cli/src/__tests__/init-workflow.test.ts | 107 +++++ packages/cli/src/__tests__/workflow.test.ts | 320 +++++++++++++++ packages/cli/src/cli.ts | 2 + packages/cli/src/commands/init.ts | 180 ++++++--- packages/cli/src/commands/workflow.ts | 364 ++++++++++++++++++ pnpm-lock.yaml | 54 +++ 7 files changed, 989 insertions(+), 45 deletions(-) create mode 100644 packages/cli/src/__tests__/init-workflow.test.ts create mode 100644 packages/cli/src/__tests__/workflow.test.ts create mode 100644 packages/cli/src/commands/workflow.ts diff --git a/packages/cli/package.json b/packages/cli/package.json index 5790655..a9fa7fa 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -9,7 +9,8 @@ "main": "dist/index.js", "types": "dist/index.d.ts", "scripts": { - "build": "tsup" + "build": "tsup", + "test": "vitest run" }, "dependencies": { "@uncaged/nerve-core": "workspace:*", @@ -17,6 +18,8 @@ "citty": "^0.1.6" }, "devDependencies": { - "@types/node": "^22.0.0" + "@types/better-sqlite3": "^7.6.13", + "@types/node": "^22.0.0", + "vitest": "^4.1.5" } } diff --git a/packages/cli/src/__tests__/init-workflow.test.ts b/packages/cli/src/__tests__/init-workflow.test.ts new file mode 100644 index 0000000..4e47291 --- /dev/null +++ b/packages/cli/src/__tests__/init-workflow.test.ts @@ -0,0 +1,107 @@ +/** + * Tests for nerve init workflow scaffold logic. + * + * We test the file-generation path by isolating the template rendering, + * not by invoking the full citty command (which calls process.exit). + */ + +import { mkdtempSync, readFileSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; + +// Inline the template builder (same logic as in init.ts) for isolated testing +function buildWorkflowTemplate(name: string): string { + return `import type { WorkflowDefinition } from "@uncaged/nerve-daemon"; + +const workflow: WorkflowDefinition = { + roles: { + main: { + async execute(prompt, ctx) { + ctx.log("${name} started"); + // TODO: implement your role logic here + return { type: "done" }; + }, + }, + }, + + moderate(thread, event) { + if (event.type === "thread_start") { + return { role: "main", prompt: {} }; + } + return null; // workflow complete + }, +}; + +export default workflow; +`; +} + +let tmpDir: string; + +beforeEach(() => { + tmpDir = mkdtempSync(join(tmpdir(), "nerve-cli-init-test-")); +}); + +afterEach(() => { + rmSync(tmpDir, { recursive: true, force: true }); +}); + +describe("buildWorkflowTemplate", () => { + it("includes the workflow name in the template", () => { + const tpl = buildWorkflowTemplate("my-workflow"); + expect(tpl).toContain("my-workflow started"); + }); + + it("contains WorkflowDefinition type import", () => { + const tpl = buildWorkflowTemplate("test"); + expect(tpl).toContain("WorkflowDefinition"); + expect(tpl).toContain("@uncaged/nerve-daemon"); + }); + + it("contains a moderate function that returns null to signal completion", () => { + const tpl = buildWorkflowTemplate("test"); + expect(tpl).toContain("return null"); + expect(tpl).toContain("moderate"); + }); + + it("contains a roles map with main role", () => { + const tpl = buildWorkflowTemplate("test"); + expect(tpl).toContain("roles:"); + expect(tpl).toContain("main:"); + }); + + it("uses different names per call", () => { + const a = buildWorkflowTemplate("workflow-a"); + const b = buildWorkflowTemplate("workflow-b"); + expect(a).toContain("workflow-a started"); + expect(b).toContain("workflow-b started"); + expect(a).not.toContain("workflow-b"); + }); + + it("produces valid TypeScript syntax (no unclosed braces)", () => { + const tpl = buildWorkflowTemplate("test"); + const opens = (tpl.match(/\{/g) ?? []).length; + const closes = (tpl.match(/\}/g) ?? []).length; + expect(opens).toBe(closes); + }); + + it("ends with export default workflow", () => { + const tpl = buildWorkflowTemplate("test"); + expect(tpl.trim().endsWith("export default workflow;")).toBe(true); + }); +}); + +describe("workflow scaffold file writing (simulated)", () => { + it("writes the template to disk correctly", () => { + const { mkdirSync, writeFileSync } = require("node:fs"); + const workflowDir = join(tmpDir, "workflows", "my-task"); + mkdirSync(workflowDir, { recursive: true }); + const content = buildWorkflowTemplate("my-task"); + writeFileSync(join(workflowDir, "index.ts"), content, "utf8"); + + const read = readFileSync(join(workflowDir, "index.ts"), "utf8"); + expect(read).toContain("my-task started"); + expect(read).toContain("WorkflowDefinition"); + }); +}); diff --git a/packages/cli/src/__tests__/workflow.test.ts b/packages/cli/src/__tests__/workflow.test.ts new file mode 100644 index 0000000..b860b18 --- /dev/null +++ b/packages/cli/src/__tests__/workflow.test.ts @@ -0,0 +1,320 @@ +/** + * Tests for workflow CLI commands — pure logic helpers. + * + * Tests do NOT invoke the citty command handlers directly (they would call + * process.exit / process.stdout.write against a real terminal). Instead we + * test the exported pure helper functions that the command handlers delegate + * to. The helpers use real LogStore / SQLite via temp directories. + */ + +import { mkdtempSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; + +import { createLogStore } from "@uncaged/nerve-daemon"; +import type { LogStore, WorkflowRun } from "@uncaged/nerve-daemon"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; + +import { + buildInspectOutput, + buildListOutput, + formatTs, + getAllWorkflowRuns, + statusIcon, +} from "../commands/workflow.js"; + +// --------------------------------------------------------------------------- +// Test helpers +// --------------------------------------------------------------------------- + +let tmpDir: string; +let store: LogStore; + +function upsertRun( + runId: string, + workflow: string, + status: WorkflowRun["status"], + ts: number, +): void { + store.upsertWorkflowRun( + { source: "workflow", type: status, refId: runId, payload: null, ts }, + { runId, workflow, status, ts }, + ); +} + +beforeEach(() => { + tmpDir = mkdtempSync(join(tmpdir(), "nerve-cli-wf-test-")); + store = createLogStore(join(tmpDir, "data", "logs.db")); +}); + +afterEach(() => { + store.close(); + rmSync(tmpDir, { recursive: true, force: true }); +}); + +// --------------------------------------------------------------------------- +// formatTs +// --------------------------------------------------------------------------- + +describe("formatTs", () => { + it("returns ISO 8601 string", () => { + const ts = new Date("2026-01-01T00:00:00.000Z").getTime(); + expect(formatTs(ts)).toBe("2026-01-01T00:00:00.000Z"); + }); +}); + +// --------------------------------------------------------------------------- +// statusIcon +// --------------------------------------------------------------------------- + +describe("statusIcon", () => { + it.each([ + ["started", "▶"], + ["queued", "⏳"], + ["completed", "✅"], + ["failed", "❌"], + ["crashed", "💥"], + ["dropped", "🗑"], + ["interrupted", "⚠️"], + ] as const)("maps status=%s to icon=%s", (status, icon) => { + expect(statusIcon(status)).toBe(icon); + }); +}); + +// --------------------------------------------------------------------------- +// getAllWorkflowRuns +// --------------------------------------------------------------------------- + +describe("getAllWorkflowRuns", () => { + it("returns empty array when no runs exist", () => { + expect(getAllWorkflowRuns(store, null)).toHaveLength(0); + }); + + it("returns all runs across statuses", () => { + upsertRun("r1", "cleanup", "completed", 1000); + upsertRun("r2", "cleanup", "started", 2000); + upsertRun("r3", "deploy", "failed", 1500); + + const runs = getAllWorkflowRuns(store, null); + expect(runs).toHaveLength(3); + }); + + it("deduplicates runs by runId (latest state only)", () => { + upsertRun("r1", "cleanup", "started", 1000); + upsertRun("r1", "cleanup", "completed", 2000); + + const runs = getAllWorkflowRuns(store, null); + expect(runs).toHaveLength(1); + expect(runs[0].status).toBe("completed"); + }); + + it("filters by workflow name", () => { + upsertRun("r1", "cleanup", "completed", 1000); + upsertRun("r2", "deploy", "started", 2000); + upsertRun("r3", "cleanup", "failed", 1500); + + const runs = getAllWorkflowRuns(store, "cleanup"); + expect(runs).toHaveLength(2); + for (const r of runs) { + expect(r.workflow).toBe("cleanup"); + } + }); + + it("sorts by ts descending (newest first)", () => { + upsertRun("r1", "cleanup", "completed", 1000); + upsertRun("r2", "cleanup", "started", 3000); + upsertRun("r3", "cleanup", "failed", 2000); + + const runs = getAllWorkflowRuns(store, null); + expect(runs[0].ts).toBeGreaterThan(runs[1].ts); + expect(runs[1].ts).toBeGreaterThan(runs[2].ts); + }); +}); + +// --------------------------------------------------------------------------- +// buildListOutput +// --------------------------------------------------------------------------- + +describe("buildListOutput", () => { + function makeRun( + runId: string, + workflow: string, + status: WorkflowRun["status"], + ts: number, + ): WorkflowRun { + return { runId, workflow, status, ts }; + } + + it("returns empty message when no runs and --all=false", () => { + const { lines, paginationHint } = buildListOutput([], 0, 20, false, null); + expect(lines).toHaveLength(1); + expect(lines[0]).toContain("--all"); + expect(paginationHint).toBeNull(); + }); + + it("returns empty message when no runs and --all=true", () => { + const { lines, paginationHint } = buildListOutput([], 0, 20, true, null); + expect(lines).toHaveLength(1); + expect(lines[0]).not.toContain("--all"); + expect(paginationHint).toBeNull(); + }); + + it("shows correct run count in header", () => { + const runs = [ + makeRun("r1", "cleanup", "started", 1000), + makeRun("r2", "cleanup", "queued", 2000), + ]; + const { lines } = buildListOutput(runs, 0, 20, false, null); + expect(lines[0]).toContain("2 of 2"); + }); + + it("includes run details in lines", () => { + const runs = [makeRun("run-abc", "my-workflow", "started", 1000)]; + const { lines } = buildListOutput(runs, 0, 20, false, null); + const combined = lines.join(""); + expect(combined).toContain("run-abc"); + expect(combined).toContain("my-workflow"); + expect(combined).toContain("started"); + expect(combined).toContain("▶"); + }); + + it("paginates: shows only limit entries and provides hint", () => { + const runs = Array.from({ length: 5 }, (_, i) => makeRun(`r${i}`, "wf", "completed", i * 1000)); + const { lines, paginationHint } = buildListOutput(runs, 0, 2, true, null); + // header + 2 run lines + expect(lines).toHaveLength(3); + expect(paginationHint).not.toBeNull(); + expect(paginationHint).toContain("--offset 2"); + expect(paginationHint).toContain("3 more"); + }); + + it("pagination hint includes --all flag when set", () => { + const runs = Array.from({ length: 3 }, (_, i) => makeRun(`r${i}`, "wf", "completed", i * 1000)); + const { paginationHint } = buildListOutput(runs, 0, 1, true, null); + expect(paginationHint).toContain("--all"); + }); + + it("pagination hint includes --workflow filter when set", () => { + const runs = Array.from({ length: 3 }, (_, i) => + makeRun(`r${i}`, "cleanup", "completed", i * 1000), + ); + const { paginationHint } = buildListOutput(runs, 0, 1, false, "cleanup"); + expect(paginationHint).toContain("--workflow cleanup"); + }); + + it("no pagination hint when all entries fit on one page", () => { + const runs = [makeRun("r1", "wf", "started", 1000)]; + const { paginationHint } = buildListOutput(runs, 0, 20, false, null); + expect(paginationHint).toBeNull(); + }); + + it("respects offset for pagination", () => { + const runs = Array.from({ length: 5 }, (_, i) => makeRun(`r${i}`, "wf", "completed", i * 1000)); + const { lines, paginationHint } = buildListOutput(runs, 2, 2, true, null); + // header + 2 run lines (offset=2, limit=2 gives items 2 and 3) + expect(lines).toHaveLength(3); + // 1 item remaining (index 4) + expect(paginationHint).toContain("1 more"); + expect(paginationHint).toContain("--offset 4"); + }); +}); + +// --------------------------------------------------------------------------- +// buildInspectOutput +// --------------------------------------------------------------------------- + +describe("buildInspectOutput", () => { + const baseRun: WorkflowRun = { + runId: "run-xyz", + workflow: "cleanup", + status: "completed", + ts: 1_700_000_000_000, + }; + + it("shows header with run details", () => { + const { header } = buildInspectOutput(baseRun, [], 0, 20); + const headerText = header.join(""); + expect(headerText).toContain("run-xyz"); + expect(headerText).toContain("cleanup"); + expect(headerText).toContain("completed"); + }); + + it("shows '(no events recorded)' when log is empty", () => { + const { eventLines } = buildInspectOutput(baseRun, [], 0, 20); + expect(eventLines.join("")).toContain("no events recorded"); + }); + + it("shows event lines with type and ts", () => { + const logs = [{ ts: 1_700_000_001_000, type: "started", payload: null }]; + const { eventLines } = buildInspectOutput(baseRun, logs, 0, 20); + const text = eventLines.join(""); + expect(text).toContain("type=started"); + }); + + it("truncates long payloads to 200 chars with ellipsis", () => { + const longPayload = "x".repeat(250); + const logs = [{ ts: 1000, type: "step_complete", payload: longPayload }]; + const { eventLines } = buildInspectOutput(baseRun, logs, 0, 20); + const text = eventLines.join(""); + expect(text).toContain("…"); + expect(text).not.toContain("x".repeat(201)); + }); + + it("shows short payloads in full", () => { + const logs = [{ ts: 1000, type: "step_complete", payload: '{"count":5}' }]; + const { eventLines } = buildInspectOutput(baseRun, logs, 0, 20); + expect(eventLines.join("")).toContain('{"count":5}'); + }); + + it("paginates events with a hint", () => { + const logs = Array.from({ length: 5 }, (_, i) => ({ + ts: 1000 + i, + type: "step_complete", + payload: null, + })); + const { eventLines, paginationHint } = buildInspectOutput(baseRun, logs, 0, 2); + expect(eventLines).toHaveLength(2); + expect(paginationHint).toContain("3 more"); + expect(paginationHint).toContain("--offset 2"); + expect(paginationHint).toContain("run-xyz"); + }); + + it("no pagination hint when all events fit on one page", () => { + const logs = [{ ts: 1000, type: "started", payload: null }]; + const { paginationHint } = buildInspectOutput(baseRun, logs, 0, 20); + expect(paginationHint).toBeNull(); + }); +}); + +// --------------------------------------------------------------------------- +// Integration: getAllWorkflowRuns + buildListOutput end-to-end with real store +// --------------------------------------------------------------------------- + +describe("workflow list — integration with real store", () => { + it("lists active runs from the store", () => { + upsertRun("r1", "cleanup", "started", 1000); + upsertRun("r2", "cleanup", "queued", 2000); + upsertRun("r3", "cleanup", "completed", 3000); + + // Active only (getActiveWorkflowRuns) + const activeRuns = store.getActiveWorkflowRuns(); + const { lines } = buildListOutput(activeRuns, 0, 20, false, null); + const combined = lines.join(""); + expect(combined).toContain("r1"); + expect(combined).toContain("r2"); + expect(combined).not.toContain("r3"); + }); + + it("lists all runs with getAllWorkflowRuns", () => { + upsertRun("r1", "cleanup", "started", 1000); + upsertRun("r2", "cleanup", "completed", 2000); + upsertRun("r3", "cleanup", "failed", 3000); + + const allRuns = getAllWorkflowRuns(store, null); + const { lines } = buildListOutput(allRuns, 0, 20, true, null); + const combined = lines.join(""); + expect(combined).toContain("r1"); + expect(combined).toContain("r2"); + expect(combined).toContain("r3"); + }); +}); diff --git a/packages/cli/src/cli.ts b/packages/cli/src/cli.ts index 31fb1fd..2c52d88 100644 --- a/packages/cli/src/cli.ts +++ b/packages/cli/src/cli.ts @@ -7,6 +7,7 @@ import { startCommand } from "./commands/start.js"; import { statusCommand } from "./commands/status.js"; import { stopCommand } from "./commands/stop.js"; import { validateCommand } from "./commands/validate.js"; +import { workflowCommand } from "./commands/workflow.js"; const main = defineCommand({ meta: { @@ -19,6 +20,7 @@ const main = defineCommand({ stop: stopCommand, status: statusCommand, validate: validateCommand, + workflow: workflowCommand, }, }); diff --git a/packages/cli/src/commands/init.ts b/packages/cli/src/commands/init.ts index 94f223e..5c151b1 100644 --- a/packages/cli/src/commands/init.ts +++ b/packages/cli/src/commands/init.ts @@ -114,10 +114,78 @@ async function detectPackageManager(): Promise<{ cmd: string; args: string[] }> return { cmd: "npm", args: ["install"] }; } -export const initCommand = defineCommand({ +function buildWorkflowTemplate(name: string): string { + return `import type { WorkflowDefinition } from "@uncaged/nerve-daemon"; + +const workflow: WorkflowDefinition = { + roles: { + main: { + async execute(prompt, ctx) { + ctx.log("${name} started"); + // TODO: implement your role logic here + return { type: "done" }; + }, + }, + }, + + moderate(thread, event) { + if (event.type === "thread_start") { + return { role: "main", prompt: {} }; + } + return null; // workflow complete + }, +}; + +export default workflow; +`; +} + +const initWorkflowCommand = defineCommand({ meta: { - name: "init", - description: "Initialize the ~/.uncaged-nerve/ workspace", + name: "workflow", + description: "Scaffold a new workflow template in ~/.uncaged-nerve/workflows//", + }, + args: { + name: { + type: "positional", + description: "Workflow name (must match the key in nerve.yaml workflows section)", + }, + force: { + type: "boolean", + description: "Overwrite if the workflow directory already exists", + default: false, + }, + }, + async run({ args }) { + const nerveRoot = getNerveRoot(); + const workflowDir = join(nerveRoot, "workflows", args.name); + + if (existsSync(workflowDir) && !args.force) { + process.stderr.write( + `⚠️ Workflow "${args.name}" already exists at ${workflowDir}. Use --force to overwrite.\n`, + ); + process.exit(1); + } + + mkdirSync(workflowDir, { recursive: true }); + writeFile(join(workflowDir, "index.ts"), buildWorkflowTemplate(args.name)); + + process.stdout.write(`✅ Workflow scaffolded: ${workflowDir}/index.ts\n`); + process.stdout.write("\n💡 Next steps:\n"); + process.stdout.write(" 1. Add to nerve.yaml:\n"); + process.stdout.write(" workflows:\n"); + process.stdout.write(` ${args.name}:\n`); + process.stdout.write(" concurrency: 1\n"); + process.stdout.write(" overflow: drop\n"); + process.stdout.write(` 2. Edit ${workflowDir}/index.ts to implement your roles.\n`); + process.stdout.write(" 3. Run `nerve start` to launch the daemon.\n"); + }, +}); + +const initWorkspaceCommand = defineCommand({ + meta: { + name: "workspace", + description: "Initialize the ~/.uncaged-nerve/ workspace (default)", }, args: { force: { @@ -127,45 +195,71 @@ export const initCommand = defineCommand({ }, }, async run({ args }) { - const nerveRoot = getNerveRoot(); - - if (existsSync(nerveRoot) && !args.force) { - process.stderr.write("⚠️ ~/.uncaged-nerve/ already exists. Use --force to reinitialize.\n"); - process.exit(1); - } - - mkdirSync(join(nerveRoot, "data"), { recursive: true }); - mkdirSync(join(nerveRoot, "data", "senses"), { recursive: true }); - mkdirSync(join(nerveRoot, "senses", "cpu-usage", "migrations"), { recursive: true }); - - writeFile(join(nerveRoot, "nerve.yaml"), NERVE_YAML); - writeFile(join(nerveRoot, "package.json"), PACKAGE_JSON); - writeFile(join(nerveRoot, ".gitignore"), GITIGNORE); - writeFile(join(nerveRoot, "senses", "cpu-usage", "schema.ts"), CPU_SCHEMA_TS); - writeFile(join(nerveRoot, "senses", "cpu-usage", "index.js"), CPU_INDEX_JS); - writeFile( - join(nerveRoot, "senses", "cpu-usage", "migrations", "0001_init.sql"), - CPU_MIGRATION_SQL, - ); - - process.stdout.write("Installing dependencies…\n"); - try { - const { cmd, args } = await detectPackageManager(); - await runCommand(cmd, args, nerveRoot); - } catch { - process.stdout.write("⚠️ Install failed — you may need to install dependencies manually.\n"); - } - - if (!existsSync(join(nerveRoot, ".git"))) { - try { - await runCommand("git", ["init"], nerveRoot); - } catch { - process.stdout.write("⚠️ git init failed — skipping.\n"); - } - } - - process.stdout.write( - "✅ Workspace created at ~/.uncaged-nerve/\n 1 example sense: cpu-usage\n Run `nerve start` to launch the daemon.\n", - ); + await runInitWorkspace(args.force); + }, +}); + +async function runInitWorkspace(force: boolean): Promise { + const nerveRoot = getNerveRoot(); + + if (existsSync(nerveRoot) && !force) { + process.stderr.write("⚠️ ~/.uncaged-nerve/ already exists. Use --force to reinitialize.\n"); + process.exit(1); + } + + mkdirSync(join(nerveRoot, "data"), { recursive: true }); + mkdirSync(join(nerveRoot, "data", "senses"), { recursive: true }); + mkdirSync(join(nerveRoot, "senses", "cpu-usage", "migrations"), { recursive: true }); + + writeFile(join(nerveRoot, "nerve.yaml"), NERVE_YAML); + writeFile(join(nerveRoot, "package.json"), PACKAGE_JSON); + writeFile(join(nerveRoot, ".gitignore"), GITIGNORE); + writeFile(join(nerveRoot, "senses", "cpu-usage", "schema.ts"), CPU_SCHEMA_TS); + writeFile(join(nerveRoot, "senses", "cpu-usage", "index.js"), CPU_INDEX_JS); + writeFile( + join(nerveRoot, "senses", "cpu-usage", "migrations", "0001_init.sql"), + CPU_MIGRATION_SQL, + ); + + process.stdout.write("Installing dependencies…\n"); + try { + const { cmd, args } = await detectPackageManager(); + await runCommand(cmd, args, nerveRoot); + } catch { + process.stdout.write("⚠️ Install failed — you may need to install dependencies manually.\n"); + } + + if (!existsSync(join(nerveRoot, ".git"))) { + try { + await runCommand("git", ["init"], nerveRoot); + } catch { + process.stdout.write("⚠️ git init failed — skipping.\n"); + } + } + + process.stdout.write( + "✅ Workspace created at ~/.uncaged-nerve/\n 1 example sense: cpu-usage\n Run `nerve start` to launch the daemon.\n", + ); +} + +export const initCommand = defineCommand({ + meta: { + name: "init", + description: + "Initialize workspace (nerve init) or scaffold templates (nerve init workflow )", + }, + args: { + force: { + type: "boolean", + description: "Reinitialize even if workspace already exists (preserves data/)", + default: false, + }, + }, + subCommands: { + workflow: initWorkflowCommand, + workspace: initWorkspaceCommand, + }, + async run({ args }) { + await runInitWorkspace(args.force); }, }); diff --git a/packages/cli/src/commands/workflow.ts b/packages/cli/src/commands/workflow.ts new file mode 100644 index 0000000..c62680c --- /dev/null +++ b/packages/cli/src/commands/workflow.ts @@ -0,0 +1,364 @@ +import { existsSync } from "node:fs"; +import { join } from "node:path"; + +import { createLogStore } from "@uncaged/nerve-daemon"; +import type { LogStore, WorkflowRun } from "@uncaged/nerve-daemon"; +import { defineCommand } from "citty"; + +import { getNerveRoot } from "../workspace.js"; + +export const DEFAULT_PAGE_SIZE = 20; + +export function getDbPath(): string { + return join(getNerveRoot(), "data", "logs.db"); +} + +export function formatTs(ts: number): string { + return new Date(ts).toISOString(); +} + +function openStore(): LogStore { + const dbPath = getDbPath(); + if (!existsSync(dbPath)) { + process.stderr.write("❌ No logs.db found — has the daemon run yet?\n"); + process.exit(1); + } + return createLogStore(dbPath); +} + +export function statusIcon(status: WorkflowRun["status"]): string { + switch (status) { + case "started": + return "▶"; + case "queued": + return "⏳"; + case "completed": + return "✅"; + case "failed": + return "❌"; + case "crashed": + return "💥"; + case "dropped": + return "🗑"; + case "interrupted": + return "⚠️"; + } +} + +/** + * Retrieve all workflow runs from the store, deduplicated by runId (latest state wins). + * Returns runs sorted by ts descending (newest first). + */ +export function getAllWorkflowRuns(store: LogStore, filterWorkflow: string | null): WorkflowRun[] { + const allLogs = store.query({ source: "workflow" }); + const runMap = new Map(); + for (const log of allLogs) { + if (log.refId === null) continue; + if (runMap.has(log.refId)) continue; + const run = store.getWorkflowRun(log.refId); + if (run === null) continue; + if (filterWorkflow !== null && run.workflow !== filterWorkflow) continue; + runMap.set(log.refId, run); + } + return Array.from(runMap.values()).sort((a, b) => b.ts - a.ts); +} + +/** + * Format a single workflow run as a single output line (no trailing newline in icon/fields). + */ +export function formatRunLine(run: WorkflowRun): string { + const icon = statusIcon(run.status); + return ` ${icon} ${run.runId} workflow=${run.workflow} status=${run.status} ts=${formatTs(run.ts)}\n`; +} + +/** + * Format a paginated list of workflow runs into output lines. + * Returns the lines to write and any pagination hint. + */ +export type ListOutput = { + lines: string[]; + paginationHint: string | null; +}; + +export function buildListOutput( + runs: WorkflowRun[], + offset: number, + limit: number, + allFlag: boolean, + filterWorkflow: string | null, +): ListOutput { + const total = runs.length; + const page = runs.slice(offset, offset + limit); + const shown = page.length; + const remaining = total - offset - shown; + + if (total === 0) { + const msg = allFlag + ? "📭 No workflow runs found.\n" + : "📭 No active workflow runs. Use --all to include completed/failed runs.\n"; + return { lines: [msg], paginationHint: null }; + } + + const lines: string[] = []; + lines.push(`📋 Workflow runs (${shown} of ${total} shown):\n`); + for (const run of page) { + lines.push(formatRunLine(run)); + } + + let paginationHint: string | null = null; + if (remaining > 0) { + const wfFlag = filterWorkflow !== null ? ` --workflow ${filterWorkflow}` : ""; + const allFlagStr = allFlag ? " --all" : ""; + paginationHint = + `\n⏩ ${remaining} more run(s) not shown. Fetch next page:\n` + + ` nerve workflow list --offset ${offset + limit}${allFlagStr}${wfFlag}\n`; + } + + return { lines, paginationHint }; +} + +/** + * Format the inspect output for a single run's log entries with pagination. + */ +export type InspectOutput = { + header: string[]; + eventLines: string[]; + paginationHint: string | null; +}; + +export function buildInspectOutput( + run: WorkflowRun, + allLogs: Array<{ ts: number; type: string; payload: string | null }>, + offset: number, + limit: number, +): InspectOutput { + const total = allLogs.length; + const page = allLogs.slice(offset, offset + limit); + const shown = page.length; + const remaining = total - offset - shown; + + const header: string[] = [ + `🔍 Workflow run: ${run.runId}\n`, + ` workflow: ${run.workflow}\n`, + ` status: ${run.status}\n`, + ` ts: ${formatTs(run.ts)}\n`, + `\n📜 Thread events (${shown} of ${total}):\n`, + ]; + + const eventLines: string[] = []; + if (total === 0) { + eventLines.push(" (no events recorded)\n"); + } else { + for (const entry of page) { + const payloadStr = + entry.payload === null + ? "" + : entry.payload.length <= 200 + ? ` payload=${entry.payload}` + : ` payload=${entry.payload.slice(0, 200)}…`; + eventLines.push(` [${formatTs(entry.ts)}] type=${entry.type}${payloadStr}\n`); + } + } + + let paginationHint: string | null = null; + if (remaining > 0) { + paginationHint = + `\n⏩ ${remaining} more event(s) not shown. Fetch next page:\n` + + ` nerve workflow inspect ${run.runId} --offset ${offset + limit}\n`; + } + + return { header, eventLines, paginationHint }; +} + +// --------------------------------------------------------------------------- +// nerve workflow list +// --------------------------------------------------------------------------- + +const workflowListCommand = defineCommand({ + meta: { + name: "list", + description: "List active (queued/started) workflow runs", + }, + args: { + all: { + type: "boolean", + description: "Include completed/failed/crashed runs", + default: false, + }, + workflow: { + type: "string", + description: "Filter by workflow name", + default: "", + }, + limit: { + type: "string", + description: `Max runs to show (default: ${DEFAULT_PAGE_SIZE})`, + default: String(DEFAULT_PAGE_SIZE), + }, + offset: { + type: "string", + description: "Skip first N runs (for pagination)", + default: "0", + }, + }, + async run({ args }) { + const store = openStore(); + + try { + const limit = Math.max(1, Number.parseInt(args.limit, 10) || DEFAULT_PAGE_SIZE); + const offset = Math.max(0, Number.parseInt(args.offset, 10) || 0); + const filterWorkflow = args.workflow.length > 0 ? args.workflow : null; + + const runs = args.all + ? getAllWorkflowRuns(store, filterWorkflow) + : store.getActiveWorkflowRuns(filterWorkflow ?? undefined); + + const { lines, paginationHint } = buildListOutput( + runs, + offset, + limit, + args.all, + filterWorkflow, + ); + + for (const line of lines) { + process.stdout.write(line); + } + if (paginationHint !== null) { + process.stdout.write(paginationHint); + } + } finally { + store.close(); + } + }, +}); + +// --------------------------------------------------------------------------- +// nerve workflow inspect +// --------------------------------------------------------------------------- + +const workflowInspectCommand = defineCommand({ + meta: { + name: "inspect", + description: "Show details and thread events for a workflow run", + }, + args: { + runId: { + type: "positional", + description: "The run ID to inspect", + }, + limit: { + type: "string", + description: `Max log entries to show (default: ${DEFAULT_PAGE_SIZE})`, + default: String(DEFAULT_PAGE_SIZE), + }, + offset: { + type: "string", + description: "Skip first N log entries (for pagination)", + default: "0", + }, + }, + async run({ args }) { + const store = openStore(); + + try { + const limit = Math.max(1, Number.parseInt(args.limit, 10) || DEFAULT_PAGE_SIZE); + const offset = Math.max(0, Number.parseInt(args.offset, 10) || 0); + + const run = store.getWorkflowRun(args.runId); + if (run === null) { + process.stderr.write(`❌ No workflow run found with runId: ${args.runId}\n`); + process.exit(1); + } + + const allLogs = store.query({ source: "workflow", refId: args.runId }); + const { header, eventLines, paginationHint } = buildInspectOutput( + run, + allLogs, + offset, + limit, + ); + + for (const line of [...header, ...eventLines]) { + process.stdout.write(line); + } + if (paginationHint !== null) { + process.stdout.write(paginationHint); + } + } finally { + store.close(); + } + }, +}); + +// --------------------------------------------------------------------------- +// nerve workflow trigger +// --------------------------------------------------------------------------- + +const workflowTriggerCommand = defineCommand({ + meta: { + name: "trigger", + description: "Manually trigger a workflow by writing a trigger log entry", + }, + args: { + name: { + type: "positional", + description: "The workflow name to trigger", + }, + payload: { + type: "string", + description: "JSON payload to pass as trigger payload (default: {})", + default: "{}", + }, + }, + async run({ args }) { + let triggerPayload: unknown = {}; + try { + triggerPayload = JSON.parse(args.payload) as unknown; + } catch { + process.stderr.write(`❌ --payload must be valid JSON. Got: ${args.payload}\n`); + process.exit(1); + } + + const store = openStore(); + + try { + const runId = crypto.randomUUID(); + const ts = Date.now(); + + store.upsertWorkflowRun( + { + source: "workflow", + type: "started", + refId: runId, + payload: JSON.stringify({ triggerPayload, source: "cli-manual" }), + ts, + }, + { runId, workflow: args.name, status: "started", ts }, + ); + + process.stdout.write(`✅ Triggered workflow "${args.name}"\n`); + process.stdout.write(` runId: ${runId}\n`); + process.stdout.write(` ts: ${formatTs(ts)}\n`); + process.stdout.write(`\n💡 Inspect with: nerve workflow inspect ${runId}\n`); + } finally { + store.close(); + } + }, +}); + +// --------------------------------------------------------------------------- +// nerve workflow (parent command) +// --------------------------------------------------------------------------- + +export const workflowCommand = defineCommand({ + meta: { + name: "workflow", + description: "Manage and inspect workflow runs", + }, + subCommands: { + list: workflowListCommand, + inspect: workflowInspectCommand, + trigger: workflowTriggerCommand, + }, +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index eb21479..5088ce6 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -30,9 +30,15 @@ importers: specifier: ^0.1.6 version: 0.1.6 devDependencies: + '@types/better-sqlite3': + specifier: ^7.6.13 + version: 7.6.13 '@types/node': specifier: ^22.0.0 version: 22.19.17 + vitest: + specifier: ^4.1.5 + version: 4.1.5(@types/node@22.19.17)(vite@8.0.9(@types/node@22.19.17)(esbuild@0.27.7)(yaml@2.8.3)) packages/core: dependencies: @@ -1579,6 +1585,14 @@ snapshots: chai: 6.2.2 tinyrainbow: 3.1.0 + '@vitest/mocker@4.1.5(vite@8.0.9(@types/node@22.19.17)(esbuild@0.27.7)(yaml@2.8.3))': + dependencies: + '@vitest/spy': 4.1.5 + estree-walker: 3.0.3 + magic-string: 0.30.21 + optionalDependencies: + vite: 8.0.9(@types/node@22.19.17)(esbuild@0.27.7)(yaml@2.8.3) + '@vitest/mocker@4.1.5(vite@8.0.9(@types/node@25.6.0)(esbuild@0.27.7)(yaml@2.8.3))': dependencies: '@vitest/spy': 4.1.5 @@ -2089,6 +2103,19 @@ snapshots: util-deprecate@1.0.2: {} + vite@8.0.9(@types/node@22.19.17)(esbuild@0.27.7)(yaml@2.8.3): + dependencies: + lightningcss: 1.32.0 + picomatch: 4.0.4 + postcss: 8.5.10 + rolldown: 1.0.0-rc.16 + tinyglobby: 0.2.16 + optionalDependencies: + '@types/node': 22.19.17 + esbuild: 0.27.7 + fsevents: 2.3.3 + yaml: 2.8.3 + vite@8.0.9(@types/node@25.6.0)(esbuild@0.27.7)(yaml@2.8.3): dependencies: lightningcss: 1.32.0 @@ -2102,6 +2129,33 @@ snapshots: fsevents: 2.3.3 yaml: 2.8.3 + vitest@4.1.5(@types/node@22.19.17)(vite@8.0.9(@types/node@22.19.17)(esbuild@0.27.7)(yaml@2.8.3)): + dependencies: + '@vitest/expect': 4.1.5 + '@vitest/mocker': 4.1.5(vite@8.0.9(@types/node@22.19.17)(esbuild@0.27.7)(yaml@2.8.3)) + '@vitest/pretty-format': 4.1.5 + '@vitest/runner': 4.1.5 + '@vitest/snapshot': 4.1.5 + '@vitest/spy': 4.1.5 + '@vitest/utils': 4.1.5 + es-module-lexer: 2.0.0 + expect-type: 1.3.0 + magic-string: 0.30.21 + obug: 2.1.1 + pathe: 2.0.3 + picomatch: 4.0.4 + std-env: 4.1.0 + tinybench: 2.9.0 + tinyexec: 1.1.1 + tinyglobby: 0.2.16 + tinyrainbow: 3.1.0 + vite: 8.0.9(@types/node@22.19.17)(esbuild@0.27.7)(yaml@2.8.3) + why-is-node-running: 2.3.0 + optionalDependencies: + '@types/node': 22.19.17 + transitivePeerDependencies: + - msw + vitest@4.1.5(@types/node@25.6.0)(vite@8.0.9(@types/node@25.6.0)(esbuild@0.27.7)(yaml@2.8.3)): dependencies: '@vitest/expect': 4.1.5 -- 2.43.0 From 7320761277d7057054f9130b49089da26ce92b7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Wed, 22 Apr 2026 14:10:43 +0000 Subject: [PATCH 2/2] =?UTF-8?q?fix(cli):=20address=20PR=20#31=20review=20?= =?UTF-8?q?=E2=80=94=206=20issues=20fixed?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Critical: 1. trigger: use Unix socket IPC to daemon instead of direct DB write - new daemon-ipc.ts (server) + daemon-client.ts (client) - kernel accepts ipcSocketPath, auto-starts IPC server 2. init workflow: validate name (lowercase alphanumeric + hyphens only) Should fix: 3. getAllWorkflowRuns: SQL query on workflow_runs table instead of O(n) scan 4. limit/offset: robust parseIntArg() helper with NaN handling 5. statusIcon: exhaustive switch with never type check 6. trigger: end-to-end Unix socket tests added 12 new tests. All 224 tests pass. 小橘 🍊(NEKO Team) --- packages/cli/src/__tests__/workflow.test.ts | 129 ++++++++++++++++++++ packages/cli/src/commands/init.ts | 16 +++ packages/cli/src/commands/start.ts | 14 ++- packages/cli/src/commands/workflow.ts | 82 ++++++------- packages/cli/src/daemon-client.ts | 91 ++++++++++++++ packages/cli/src/workspace.ts | 4 + packages/daemon/src/daemon-ipc.ts | 110 +++++++++++++++++ packages/daemon/src/index.ts | 8 +- packages/daemon/src/kernel.ts | 20 ++- packages/daemon/src/log-store.ts | 28 +++++ 10 files changed, 455 insertions(+), 47 deletions(-) create mode 100644 packages/cli/src/daemon-client.ts create mode 100644 packages/daemon/src/daemon-ipc.ts diff --git a/packages/cli/src/__tests__/workflow.test.ts b/packages/cli/src/__tests__/workflow.test.ts index b860b18..5af3a08 100644 --- a/packages/cli/src/__tests__/workflow.test.ts +++ b/packages/cli/src/__tests__/workflow.test.ts @@ -8,6 +8,7 @@ */ import { mkdtempSync, rmSync } from "node:fs"; +import { createServer } from "node:net"; import { tmpdir } from "node:os"; import { join } from "node:path"; @@ -20,8 +21,10 @@ import { buildListOutput, formatTs, getAllWorkflowRuns, + parseIntArg, statusIcon, } from "../commands/workflow.js"; +import { triggerWorkflowViaDaemon } from "../daemon-client.js"; // --------------------------------------------------------------------------- // Test helpers @@ -318,3 +321,129 @@ describe("workflow list — integration with real store", () => { expect(combined).toContain("r3"); }); }); + +// --------------------------------------------------------------------------- +// parseIntArg +// --------------------------------------------------------------------------- + +describe("parseIntArg", () => { + it("parses a valid integer string", () => { + expect(parseIntArg("5", 20)).toBe(5); + }); + + it("returns fallback for non-numeric string", () => { + expect(parseIntArg("abc", 20)).toBe(20); + }); + + it("returns the value for '0' (not fallback)", () => { + expect(parseIntArg("0", 20)).toBe(0); + }); + + it("returns fallback for empty string", () => { + expect(parseIntArg("", 20)).toBe(20); + }); + + it("parses negative integers", () => { + expect(parseIntArg("-3", 20)).toBe(-3); + }); +}); + +// --------------------------------------------------------------------------- +// getAllWorkflowRuns — backed by real store's SQL query +// --------------------------------------------------------------------------- + +describe("getAllWorkflowRuns — uses store.getAllWorkflowRuns SQL path", () => { + it("returns all runs regardless of status", () => { + upsertRun("r1", "deploy", "completed", 1000); + upsertRun("r2", "deploy", "failed", 2000); + upsertRun("r3", "deploy", "started", 3000); + upsertRun("r4", "deploy", "queued", 4000); + upsertRun("r5", "deploy", "crashed", 5000); + upsertRun("r6", "deploy", "dropped", 6000); + upsertRun("r7", "deploy", "interrupted", 7000); + + const runs = getAllWorkflowRuns(store, null); + expect(runs).toHaveLength(7); + }); + + it("returns runs sorted by ts descending (newest first)", () => { + upsertRun("r1", "deploy", "completed", 1000); + upsertRun("r2", "deploy", "completed", 3000); + upsertRun("r3", "deploy", "completed", 2000); + + const runs = getAllWorkflowRuns(store, null); + expect(runs[0].ts).toBe(3000); + expect(runs[1].ts).toBe(2000); + expect(runs[2].ts).toBe(1000); + }); + + it("filters by workflow name", () => { + upsertRun("r1", "alpha", "completed", 1000); + upsertRun("r2", "beta", "completed", 2000); + upsertRun("r3", "alpha", "failed", 3000); + + const runs = getAllWorkflowRuns(store, "alpha"); + expect(runs).toHaveLength(2); + for (const r of runs) expect(r.workflow).toBe("alpha"); + }); + + it("returns empty array when store has no runs", () => { + expect(getAllWorkflowRuns(store, null)).toHaveLength(0); + }); +}); + +// --------------------------------------------------------------------------- +// triggerWorkflowViaDaemon — IPC round-trip via real Unix socket +// --------------------------------------------------------------------------- + +describe("triggerWorkflowViaDaemon", () => { + let sockDir: string; + let sockPath: string; + + beforeEach(() => { + sockDir = mkdtempSync(join(tmpdir(), "nerve-ipc-test-")); + sockPath = join(sockDir, "nerve.sock"); + }); + + afterEach(() => { + rmSync(sockDir, { recursive: true, force: true }); + }); + + it("resolves { ok: true } when server responds ok", async () => { + const server = createServer((s) => { + s.on("data", () => { + s.write(`${JSON.stringify({ ok: true })}\n`); + }); + }); + await new Promise((r) => server.listen(sockPath, r)); + + try { + const result = await triggerWorkflowViaDaemon(sockPath, "my-workflow", {}); + expect(result).toEqual({ ok: true }); + } finally { + await new Promise((r) => server.close(() => r())); + } + }); + + it("resolves { ok: false, error } when server responds with error", async () => { + const server = createServer((s) => { + s.on("data", () => { + s.write(`${JSON.stringify({ ok: false, error: "unknown workflow" })}\n`); + }); + }); + await new Promise((r) => server.listen(sockPath, r)); + + try { + const result = await triggerWorkflowViaDaemon(sockPath, "missing", {}); + expect(result).toEqual({ ok: false, error: "unknown workflow" }); + } finally { + await new Promise((r) => server.close(() => r())); + } + }); + + it("rejects when no daemon is listening on the socket", async () => { + await expect(triggerWorkflowViaDaemon(sockPath, "my-workflow", {})).rejects.toThrow( + /Cannot connect to daemon/, + ); + }); +}); diff --git a/packages/cli/src/commands/init.ts b/packages/cli/src/commands/init.ts index 5c151b1..e365609 100644 --- a/packages/cli/src/commands/init.ts +++ b/packages/cli/src/commands/init.ts @@ -114,6 +114,16 @@ async function detectPackageManager(): Promise<{ cmd: string; args: string[] }> return { cmd: "npm", args: ["install"] }; } +export const WORKFLOW_NAME_RE = /^[a-z0-9][a-z0-9-]*[a-z0-9]$|^[a-z0-9]$/; + +export function validateWorkflowName(name: string): string | null { + if (name.length === 0) return "Workflow name must not be empty."; + if (name.length > 64) return "Workflow name must be 64 characters or fewer."; + if (!WORKFLOW_NAME_RE.test(name)) + return "Workflow name must contain only lowercase letters, digits, and hyphens, and must not start or end with a hyphen."; + return null; +} + function buildWorkflowTemplate(name: string): string { return `import type { WorkflowDefinition } from "@uncaged/nerve-daemon"; @@ -160,6 +170,12 @@ const initWorkflowCommand = defineCommand({ const nerveRoot = getNerveRoot(); const workflowDir = join(nerveRoot, "workflows", args.name); + const nameError = validateWorkflowName(args.name); + if (nameError !== null) { + process.stderr.write(`❌ Invalid workflow name: ${nameError}\n`); + process.exit(1); + } + if (existsSync(workflowDir) && !args.force) { process.stderr.write( `⚠️ Workflow "${args.name}" already exists at ${workflowDir}. Use --force to overwrite.\n`, diff --git a/packages/cli/src/commands/start.ts b/packages/cli/src/commands/start.ts index dc46a4e..b2e8ee3 100644 --- a/packages/cli/src/commands/start.ts +++ b/packages/cli/src/commands/start.ts @@ -8,7 +8,14 @@ import { parseNerveConfig } from "@uncaged/nerve-core"; import { createKernel } from "@uncaged/nerve-daemon"; import { defineCommand } from "citty"; -import { getLogPath, getNerveRoot, isRunning, readPidFile, writePidFile } from "../workspace.js"; +import { + getLogPath, + getNerveRoot, + getSocketPath, + isRunning, + readPidFile, + writePidFile, +} from "../workspace.js"; function readConfig(nerveRoot: string): ReturnType { const configPath = join(nerveRoot, "nerve.yaml"); @@ -30,7 +37,10 @@ async function runForeground(nerveRoot: string): Promise { } const config = configResult.value; - const kernel = createKernel(config, nerveRoot); + const kernel = createKernel(config, nerveRoot, { + enableFileWatcher: true, + ipcSocketPath: getSocketPath(), + }); const senseNames = Object.keys(config.senses); const groups = [...kernel.groups]; diff --git a/packages/cli/src/commands/workflow.ts b/packages/cli/src/commands/workflow.ts index c62680c..fa99716 100644 --- a/packages/cli/src/commands/workflow.ts +++ b/packages/cli/src/commands/workflow.ts @@ -5,10 +5,16 @@ import { createLogStore } from "@uncaged/nerve-daemon"; import type { LogStore, WorkflowRun } from "@uncaged/nerve-daemon"; import { defineCommand } from "citty"; -import { getNerveRoot } from "../workspace.js"; +import { triggerWorkflowViaDaemon } from "../daemon-client.js"; +import { getNerveRoot, getSocketPath, isRunning } from "../workspace.js"; export const DEFAULT_PAGE_SIZE = 20; +export function parseIntArg(raw: string, fallback: number): number { + const v = Number.parseInt(raw, 10); + return Number.isNaN(v) ? fallback : v; +} + export function getDbPath(): string { return join(getNerveRoot(), "data", "logs.db"); } @@ -42,25 +48,19 @@ export function statusIcon(status: WorkflowRun["status"]): string { return "🗑"; case "interrupted": return "⚠️"; + default: { + const _exhaustive: never = status; + return `?(${_exhaustive})`; + } } } /** - * Retrieve all workflow runs from the store, deduplicated by runId (latest state wins). - * Returns runs sorted by ts descending (newest first). + * Retrieve all workflow runs from the store, sorted by ts descending (newest first). + * Delegates to the store's efficient SQL query on the workflow_runs table. */ export function getAllWorkflowRuns(store: LogStore, filterWorkflow: string | null): WorkflowRun[] { - const allLogs = store.query({ source: "workflow" }); - const runMap = new Map(); - for (const log of allLogs) { - if (log.refId === null) continue; - if (runMap.has(log.refId)) continue; - const run = store.getWorkflowRun(log.refId); - if (run === null) continue; - if (filterWorkflow !== null && run.workflow !== filterWorkflow) continue; - runMap.set(log.refId, run); - } - return Array.from(runMap.values()).sort((a, b) => b.ts - a.ts); + return store.getAllWorkflowRuns(filterWorkflow); } /** @@ -205,8 +205,8 @@ const workflowListCommand = defineCommand({ const store = openStore(); try { - const limit = Math.max(1, Number.parseInt(args.limit, 10) || DEFAULT_PAGE_SIZE); - const offset = Math.max(0, Number.parseInt(args.offset, 10) || 0); + const limit = Math.max(1, parseIntArg(args.limit, DEFAULT_PAGE_SIZE)); + const offset = Math.max(0, parseIntArg(args.offset, 0)); const filterWorkflow = args.workflow.length > 0 ? args.workflow : null; const runs = args.all @@ -262,8 +262,8 @@ const workflowInspectCommand = defineCommand({ const store = openStore(); try { - const limit = Math.max(1, Number.parseInt(args.limit, 10) || DEFAULT_PAGE_SIZE); - const offset = Math.max(0, Number.parseInt(args.offset, 10) || 0); + const limit = Math.max(1, parseIntArg(args.limit, DEFAULT_PAGE_SIZE)); + const offset = Math.max(0, parseIntArg(args.offset, 0)); const run = store.getWorkflowRun(args.runId); if (run === null) { @@ -298,7 +298,7 @@ const workflowInspectCommand = defineCommand({ const workflowTriggerCommand = defineCommand({ meta: { name: "trigger", - description: "Manually trigger a workflow by writing a trigger log entry", + description: "Manually trigger a workflow by sending an IPC message to the running daemon", }, args: { name: { @@ -320,30 +320,28 @@ const workflowTriggerCommand = defineCommand({ process.exit(1); } - const store = openStore(); - - try { - const runId = crypto.randomUUID(); - const ts = Date.now(); - - store.upsertWorkflowRun( - { - source: "workflow", - type: "started", - refId: runId, - payload: JSON.stringify({ triggerPayload, source: "cli-manual" }), - ts, - }, - { runId, workflow: args.name, status: "started", ts }, - ); - - process.stdout.write(`✅ Triggered workflow "${args.name}"\n`); - process.stdout.write(` runId: ${runId}\n`); - process.stdout.write(` ts: ${formatTs(ts)}\n`); - process.stdout.write(`\n💡 Inspect with: nerve workflow inspect ${runId}\n`); - } finally { - store.close(); + if (!isRunning()) { + process.stderr.write("❌ Nerve daemon is not running. Start it with `nerve start`.\n"); + process.exit(1); } + + const socketPath = getSocketPath(); + let response: { ok: true } | { ok: false; error: string }; + try { + response = await triggerWorkflowViaDaemon(socketPath, args.name, triggerPayload); + } catch (e) { + const msg = e instanceof Error ? e.message : String(e); + process.stderr.write(`❌ Failed to contact daemon: ${msg}\n`); + process.exit(1); + } + + if (!response.ok) { + process.stderr.write(`❌ Daemon rejected trigger: ${response.error}\n`); + process.exit(1); + } + + process.stdout.write(`✅ Triggered workflow "${args.name}" via daemon.\n`); + process.stdout.write("\n💡 Inspect active runs with: nerve workflow list\n"); }, }); diff --git a/packages/cli/src/daemon-client.ts b/packages/cli/src/daemon-client.ts new file mode 100644 index 0000000..85dce39 --- /dev/null +++ b/packages/cli/src/daemon-client.ts @@ -0,0 +1,91 @@ +/** + * Daemon IPC client — connects to the daemon's Unix socket and sends + * a trigger-workflow request. + * + * Protocol: newline-delimited JSON (same as daemon-ipc.ts server side). + */ + +import { connect } from "node:net"; +import type { Socket } from "node:net"; + +const CONNECT_TIMEOUT_MS = 3_000; +const RESPONSE_TIMEOUT_MS = 5_000; + +type TriggerResponse = { ok: true } | { ok: false; error: string }; + +function parseDaemonResponse(line: string): TriggerResponse { + try { + const obj = JSON.parse(line) as unknown; + if (obj !== null && typeof obj === "object") { + const r = obj as Record; + if (r.ok === true) return { ok: true }; + if (r.ok === false && typeof r.error === "string") return { ok: false, error: r.error }; + } + } catch { + // fall through + } + return { ok: false, error: `Unexpected daemon response: ${line}` }; +} + +/** + * Send a trigger-workflow message to the running daemon via its Unix socket. + * Resolves with the daemon's response or rejects on connection/timeout errors. + */ +export function triggerWorkflowViaDaemon( + socketPath: string, + workflow: string, + payload: unknown, +): Promise { + return new Promise((resolve, reject) => { + let socket: Socket | null = null; + let settled = false; + + function settle(result: TriggerResponse | Error): void { + if (settled) return; + settled = true; + if (socket !== null) { + socket.destroy(); + socket = null; + } + if (result instanceof Error) { + reject(result); + } else { + resolve(result); + } + } + + const connectTimer = setTimeout(() => { + settle(new Error(`Timed out connecting to daemon socket: ${socketPath}`)); + }, CONNECT_TIMEOUT_MS); + + socket = connect(socketPath, () => { + clearTimeout(connectTimer); + + const responseTimer = setTimeout(() => { + settle(new Error("Timed out waiting for daemon response")); + }, RESPONSE_TIMEOUT_MS); + + let buf = ""; + socket?.on("data", (chunk: Buffer) => { + buf += chunk.toString("utf8"); + const lines = buf.split("\n"); + buf = lines.pop() ?? ""; + for (const line of lines) { + const trimmed = line.trim(); + if (trimmed.length === 0) continue; + clearTimeout(responseTimer); + settle(parseDaemonResponse(trimmed)); + return; + } + }); + + const msg = `${JSON.stringify({ type: "trigger-workflow", workflow, payload })}\n`; + socket?.write(msg); + }); + + socket.on("error", (err) => { + clearTimeout(connectTimer); + settle(new Error(`Cannot connect to daemon: ${err.message}`)); + }); + }); +} diff --git a/packages/cli/src/workspace.ts b/packages/cli/src/workspace.ts index c8c7a2a..8b8994e 100644 --- a/packages/cli/src/workspace.ts +++ b/packages/cli/src/workspace.ts @@ -10,6 +10,10 @@ export function getPidPath(): string { return join(getNerveRoot(), "nerve.pid"); } +export function getSocketPath(): string { + return join(getNerveRoot(), "nerve.sock"); +} + export function getLogPath(): string { return join(getNerveRoot(), "logs", "nerve.log"); } diff --git a/packages/daemon/src/daemon-ipc.ts b/packages/daemon/src/daemon-ipc.ts new file mode 100644 index 0000000..76ef531 --- /dev/null +++ b/packages/daemon/src/daemon-ipc.ts @@ -0,0 +1,110 @@ +/** + * Daemon IPC server — listens on a Unix domain socket so that the CLI + * can send commands (e.g. trigger-workflow) to the running daemon process. + * + * Protocol: newline-delimited JSON messages. + * Each request: { type: "trigger-workflow"; workflow: string; payload: unknown } + * Each response: { ok: true } | { ok: false; error: string } + */ + +import { rmSync } from "node:fs"; +import { type Server, type Socket, createServer } from "node:net"; + +import type { WorkflowManager } from "./workflow-manager.js"; + +/** JSON message sent by the CLI to trigger a workflow. */ +export type TriggerWorkflowRequest = { + type: "trigger-workflow"; + workflow: string; + payload: unknown; +}; + +type DaemonRequest = TriggerWorkflowRequest; + +type DaemonResponse = { ok: true } | { ok: false; error: string }; + +export type DaemonIpcServer = { + close: () => Promise; +}; + +function parseRequest(line: string): DaemonRequest | null { + try { + const obj = JSON.parse(line) as unknown; + if (obj === null || typeof obj !== "object") return null; + const req = obj as Record; + if (req.type === "trigger-workflow") { + if (typeof req.workflow !== "string" || req.workflow.length === 0) return null; + return { type: "trigger-workflow", workflow: req.workflow, payload: req.payload ?? {} }; + } + return null; + } catch { + return null; + } +} + +export function createDaemonIpcServer( + socketPath: string, + workflowManager: WorkflowManager, +): DaemonIpcServer { + // Remove stale socket file if it exists + try { + rmSync(socketPath); + } catch { + // file did not exist — that is fine + } + + function handleLine(socket: Socket, line: string): void { + const trimmed = line.trim(); + if (trimmed.length === 0) return; + + const req = parseRequest(trimmed); + if (req === null) { + const resp: DaemonResponse = { ok: false, error: "Invalid request" }; + socket.write(`${JSON.stringify(resp)}\n`); + return; + } + + workflowManager.startWorkflow(req.workflow, req.payload); + const resp: DaemonResponse = { ok: true }; + socket.write(`${JSON.stringify(resp)}\n`); + } + + const server: Server = createServer((socket) => { + let buf = ""; + + socket.on("data", (chunk: Buffer) => { + buf += chunk.toString("utf8"); + const lines = buf.split("\n"); + buf = lines.pop() ?? ""; + + for (const line of lines) { + handleLine(socket, line); + } + }); + + socket.on("error", () => { + // client disconnected mid-message — ignore + }); + }); + + server.listen(socketPath, () => { + process.stderr.write(`[daemon-ipc] listening on ${socketPath}\n`); + }); + + server.on("error", (err) => { + process.stderr.write(`[daemon-ipc] server error: ${err.message}\n`); + }); + + async function close(): Promise { + await new Promise((resolve) => { + server.close(() => resolve()); + }); + try { + rmSync(socketPath); + } catch { + // already removed + } + } + + return { close }; +} diff --git a/packages/daemon/src/index.ts b/packages/daemon/src/index.ts index 1437c07..61da3bd 100644 --- a/packages/daemon/src/index.ts +++ b/packages/daemon/src/index.ts @@ -33,7 +33,13 @@ export { createFileWatcher } from "./file-watcher.js"; export type { FileWatcher, FileChange, FileChangeHandler } from "./file-watcher.js"; export { createLogStore } from "./log-store.js"; -export type { LogStore, LogEntry, LogQuery, WorkflowRun, WorkflowRunStatus } from "./log-store.js"; +export type { + LogStore, + LogEntry, + LogQuery, + WorkflowRun, + WorkflowRunStatus, +} from "./log-store.js"; export { createWorkflowManager } from "./workflow-manager.js"; export type { WorkflowManager } from "./workflow-manager.js"; diff --git a/packages/daemon/src/kernel.ts b/packages/daemon/src/kernel.ts index e765bb4..4a2d4be 100644 --- a/packages/daemon/src/kernel.ts +++ b/packages/daemon/src/kernel.ts @@ -21,6 +21,8 @@ import { fileURLToPath } from "node:url"; import type { NerveConfig, Signal } from "@uncaged/nerve-core"; import { parseNerveConfig } from "@uncaged/nerve-core"; +import { createDaemonIpcServer } from "./daemon-ipc.js"; +import type { DaemonIpcServer } from "./daemon-ipc.js"; import { createFileWatcher } from "./file-watcher.js"; import type { FileWatcher } from "./file-watcher.js"; import type { ComputeMessage, ShutdownMessage } from "./ipc.js"; @@ -111,10 +113,15 @@ function groupForSense(config: NerveConfig, senseName: string): string | null { } export type KernelOptions = { - workerScript: string; + workerScript?: string | null; enableFileWatcher?: boolean; /** Override the LogStore instance (useful for testing). */ logStore?: LogStore; + /** + * Unix socket path for the daemon IPC server (used by CLI to send trigger-workflow). + * When null, the IPC server is not started (e.g. during tests). + */ + ipcSocketPath?: string | null; }; function defaultKernelOptions(): KernelOptions { @@ -127,7 +134,7 @@ export function createKernel( options: KernelOptions = defaultKernelOptions(), ): Kernel { const bus: SignalBus = createSignalBus(); - const workerScript = options.workerScript; + const workerScript = options.workerScript ?? resolveWorkerScript(); const startTime = Date.now(); const logStore: LogStore = options.logStore ?? createLogStore(join(nerveRoot, "data", "logs.db")); @@ -490,12 +497,21 @@ export function createKernel( }); } + let ipcServer: DaemonIpcServer | null = null; + if (options.ipcSocketPath != null) { + ipcServer = createDaemonIpcServer(options.ipcSocketPath, workflowManager); + } + async function stop(): Promise { stopped = true; if (fileWatcher !== null) { fileWatcher.close(); fileWatcher = null; } + if (ipcServer !== null) { + await ipcServer.close(); + ipcServer = null; + } scheduler.stop(); await workflowManager.stop(); const exitPromises: Promise[] = []; diff --git a/packages/daemon/src/log-store.ts b/packages/daemon/src/log-store.ts index a537082..ff1dc5e 100644 --- a/packages/daemon/src/log-store.ts +++ b/packages/daemon/src/log-store.ts @@ -90,6 +90,11 @@ export type LogStore = { * Optionally filter by workflow name. */ getActiveWorkflowRuns: (workflowName?: string) => WorkflowRun[]; + /** + * Get all workflow runs regardless of status, sorted by ts descending. + * Optionally filter by workflow name. + */ + getAllWorkflowRuns: (workflowName: string | null) => WorkflowRun[]; /** * Get the trigger payload for a workflow run (stored in the 'started' log entry). * Returns null if not found. @@ -173,6 +178,14 @@ export function createLogStore(dbPath: string): LogStore { "SELECT run_id, workflow, status, ts FROM workflow_runs WHERE status IN ('queued', 'started') AND workflow = ? ORDER BY ts ASC", ); + const getAllWorkflowRunsStmt = sqlite.prepare( + "SELECT run_id, workflow, status, ts FROM workflow_runs ORDER BY ts DESC", + ); + + const getAllWorkflowRunsByNameStmt = sqlite.prepare( + "SELECT run_id, workflow, status, ts FROM workflow_runs WHERE workflow = ? ORDER BY ts DESC", + ); + const upsertWorkflowRunTx = sqlite.transaction( (entry: Omit, run: WorkflowRun) => { const info = insertStmt.run({ @@ -295,6 +308,20 @@ export function createLogStore(dbPath: string): LogStore { })); } + function getAllWorkflowRuns(workflowName: string | null): WorkflowRun[] { + const rows = ( + workflowName !== null + ? getAllWorkflowRunsByNameStmt.all(workflowName) + : getAllWorkflowRunsStmt.all() + ) as Array<{ run_id: string; workflow: string; status: string; ts: number }>; + return rows.map((r) => ({ + runId: r.run_id, + workflow: r.workflow, + status: validateWorkflowRunStatus(r.status), + ts: r.ts, + })); + } + function getTriggerPayload(runId: string): unknown { const row = getTriggerPayloadStmt.get(runId) as { payload: string | null } | undefined; if (row === undefined || row.payload === null) return null; @@ -344,6 +371,7 @@ export function createLogStore(dbPath: string): LogStore { appendWithWorkflowUpdate, getWorkflowRun, getActiveWorkflowRuns, + getAllWorkflowRuns, getTriggerPayload, getThreadEvents, close, -- 2.43.0