diff --git a/packages/cli/src/commands/workflow.ts b/packages/cli/src/commands/workflow.ts index d10a420..105a8a0 100644 --- a/packages/cli/src/commands/workflow.ts +++ b/packages/cli/src/commands/workflow.ts @@ -515,7 +515,7 @@ const workflowTriggerCommand = defineCommand({ payload: { type: "string", description: - 'JSON with optional "prompt" (string) and "maxRounds" (number) for the workflow run (default: {})', + 'JSON with optional "prompt" (string), "maxRounds" (number), and "dryRun" (boolean) for the workflow run (default: {})', default: "{}", }, }, @@ -530,10 +530,12 @@ const workflowTriggerCommand = defineCommand({ let prompt = ""; let maxRounds = DEFAULT_ENGINE_MAX_ROUNDS; + let dryRun = false; if (isPlainRecord(triggerPayload)) { const p = triggerPayload; if (typeof p.prompt === "string") prompt = p.prompt; if (typeof p.maxRounds === "number") maxRounds = p.maxRounds; + if (typeof p.dryRun === "boolean") dryRun = p.dryRun; } if (!isRunning()) { @@ -544,7 +546,7 @@ const workflowTriggerCommand = defineCommand({ const socketPath = getSocketPath(); let response: DaemonIpcTriggerResponse; try { - response = await triggerWorkflowViaDaemon(socketPath, args.name, prompt, maxRounds); + response = await triggerWorkflowViaDaemon(socketPath, args.name, prompt, maxRounds, dryRun); } catch (e) { const msg = e instanceof Error ? e.message : String(e); process.stderr.write(`❌ Failed to contact daemon: ${msg}\n`); diff --git a/packages/cli/src/daemon-client.ts b/packages/cli/src/daemon-client.ts index 6f333ce..c085257 100644 --- a/packages/cli/src/daemon-client.ts +++ b/packages/cli/src/daemon-client.ts @@ -135,12 +135,14 @@ export function triggerWorkflowViaDaemon( workflow: string, prompt: string, maxRounds: number, + dryRun = false, ): Promise { const message: DaemonIpcRequest = { type: "trigger-workflow", workflow, prompt, maxRounds, + dryRun, }; return sendAndReceive(socketPath, message, parseDaemonResponse); } diff --git a/packages/core/src/__tests__/daemon-ipc-protocol.test.ts b/packages/core/src/__tests__/daemon-ipc-protocol.test.ts index 390703e..5102465 100644 --- a/packages/core/src/__tests__/daemon-ipc-protocol.test.ts +++ b/packages/core/src/__tests__/daemon-ipc-protocol.test.ts @@ -18,6 +18,27 @@ describe("parseDaemonIpcRequest", () => { workflow: "wf", prompt: "go", maxRounds: 3, + dryRun: false, + }); + }); + + it("parses trigger-workflow with dryRun true", () => { + expect( + parseDaemonIpcRequest( + JSON.stringify({ + type: "trigger-workflow", + workflow: "wf", + prompt: "go", + maxRounds: 3, + dryRun: true, + }), + ), + ).toEqual({ + type: "trigger-workflow", + workflow: "wf", + prompt: "go", + maxRounds: 3, + dryRun: true, }); }); diff --git a/packages/core/src/daemon-ipc-protocol.ts b/packages/core/src/daemon-ipc-protocol.ts index 8bb464b..a8e90a1 100644 --- a/packages/core/src/daemon-ipc-protocol.ts +++ b/packages/core/src/daemon-ipc-protocol.ts @@ -13,6 +13,7 @@ export type DaemonIpcTriggerWorkflowRequest = { workflow: string; prompt: string; maxRounds: number; + dryRun: boolean; }; /** Client → daemon: run a sense compute on demand. */ @@ -51,6 +52,22 @@ export type DaemonIpcResponse = | DaemonIpcErrorResponse | { ok: true; senses: SenseInfo[] }; +function parseTriggerWorkflowFields( + req: Record, +): DaemonIpcTriggerWorkflowRequest | null { + if (typeof req.workflow !== "string" || req.workflow.length === 0) return null; + if (typeof req.prompt !== "string") return null; + if (typeof req.maxRounds !== "number") return null; + const dryRun = typeof req.dryRun === "boolean" ? req.dryRun : false; + return { + type: "trigger-workflow", + workflow: req.workflow, + prompt: req.prompt, + maxRounds: req.maxRounds, + dryRun, + }; +} + /** * Parse a single line of JSON into a {@link DaemonIpcRequest}, or null if invalid. * Kept in core with the request types so CLI and daemon stay aligned at compile time. @@ -61,15 +78,7 @@ export function parseDaemonIpcRequest(line: string): DaemonIpcRequest | null { if (!isPlainRecord(obj)) return null; const req = obj; if (req.type === "trigger-workflow") { - if (typeof req.workflow !== "string" || req.workflow.length === 0) return null; - if (typeof req.prompt !== "string") return null; - if (typeof req.maxRounds !== "number") return null; - return { - type: "trigger-workflow", - workflow: req.workflow, - prompt: req.prompt, - maxRounds: req.maxRounds, - }; + return parseTriggerWorkflowFields(req); } if (req.type === "trigger-sense") { if (typeof req.sense !== "string" || req.sense.length === 0) return null; diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index d565548..a5539f6 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -89,11 +89,11 @@ export type Role = ( /** Maps role names to their meta types — the single generic that drives all inference. */ export type RoleMeta = Record>; -/** Engine start frame: prompt, max rounds cap, and timestamps for the thread. */ +/** Engine start frame: prompt, max rounds cap, dry-run flag, and timestamps for the thread. */ export type StartSignal = { role: START; content: string; - meta: { maxRounds: number }; + meta: { maxRounds: number; dryRun: boolean }; timestamp: number; }; diff --git a/packages/daemon/src/__tests__/crash-recovery.test.ts b/packages/daemon/src/__tests__/crash-recovery.test.ts index 0d8d228..2ce3566 100644 --- a/packages/daemon/src/__tests__/crash-recovery.test.ts +++ b/packages/daemon/src/__tests__/crash-recovery.test.ts @@ -127,8 +127,8 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => { }); const mgr = createWorkflowManager("/nerve-root", config, logStore); - mgr.startWorkflow("my-wf", { prompt: "test 1", maxRounds: 10 }); - mgr.startWorkflow("my-wf", { prompt: "test 2", maxRounds: 10 }); + mgr.startWorkflow("my-wf", { prompt: "test 1", maxRounds: 10, dryRun: false }); + mgr.startWorkflow("my-wf", { prompt: "test 2", maxRounds: 10, dryRun: false }); expect(mgr.activeCount("my-wf")).toBe(2); // Simulate unexpected exit (not shutdown) @@ -154,8 +154,8 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => { }); const mgr = createWorkflowManager("/nerve-root", config, logStore); - mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10 }); - mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10 }); + mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); expect(mgr.activeCount("my-wf")).toBe(2); const child = mockChildren[0]; @@ -179,7 +179,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => { }); const mgr = createWorkflowManager("/nerve-root", config, logStore); - mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10 }); + mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); expect(mockChildren).toHaveLength(1); const child = mockChildren[0]; @@ -212,7 +212,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => { }); const mgr = createWorkflowManager("/nerve-root", config, logStore); - mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10 }); + mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); const firstChild = mockChildren[0]; firstChild.exitCode = 1; firstChild.connected = false; @@ -256,7 +256,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => { const mgr = createWorkflowManager("/nerve-root", config, logStore); // Start one thread to fill the concurrency slot (so queued run stays queued on respawn) - mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10 }); + mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); const firstChild = mockChildren[0]; firstChild.exitCode = 1; firstChild.connected = false; @@ -281,7 +281,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => { }); const mgr = createWorkflowManager("/nerve-root", config, logStore); - mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10 }); + mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); const child = mockChildren[0]; const startCall = (child.send as ReturnType).mock.calls[0]; @@ -317,7 +317,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => { }); const mgr = createWorkflowManager("/nerve-root", config, logStore); - const launch = { prompt: "build-docker for myrepo", maxRounds: 10 }; + const launch = { prompt: "build-docker for myrepo", maxRounds: 10, dryRun: false }; mgr.startWorkflow("my-wf", launch); const startedCall = logStore.upsertWorkflowRun.mock.calls.find( @@ -327,7 +327,11 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => { const logEntry = startedCall?.[0] as { payload: string | null }; expect(logEntry.payload).not.toBeNull(); const parsed = JSON.parse(logEntry.payload as string) as Record; - expect(parsed).toMatchObject({ prompt: "build-docker for myrepo", maxRounds: 10 }); + expect(parsed).toMatchObject({ + prompt: "build-docker for myrepo", + maxRounds: 10, + dryRun: false, + }); const stopPromise = mgr.stop(); await vi.runAllTimersAsync(); @@ -349,7 +353,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => { const mgr = createWorkflowManager("/nerve-root", config, logStore); // Start one thread to fill the concurrency slot - mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10 }); + mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); const firstChild = mockChildren[0]; // Crash once → respawn → crash again → second respawn @@ -385,7 +389,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => { }); const mgr = createWorkflowManager("/nerve-root", config, logStore); - mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10 }); + mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); const firstChild = mockChildren[0]; firstChild.exitCode = 1; firstChild.connected = false; @@ -415,7 +419,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => { }); const mgr = createWorkflowManager("/nerve-root", config, logStore); - mgr.startWorkflow("crash-wf", { prompt: "test", maxRounds: 10 }); + mgr.startWorkflow("crash-wf", { prompt: "test", maxRounds: 10, dryRun: false }); // Crash the worker 6 times in rapid succession (within CRASH_WINDOW_MS = 60s) for (let i = 0; i < 6; i++) { diff --git a/packages/daemon/src/__tests__/daemon-ipc.test.ts b/packages/daemon/src/__tests__/daemon-ipc.test.ts index f4f644e..7b47daf 100644 --- a/packages/daemon/src/__tests__/daemon-ipc.test.ts +++ b/packages/daemon/src/__tests__/daemon-ipc.test.ts @@ -154,6 +154,7 @@ describe("daemon-ipc — trigger-sense", () => { workflow: "my-workflow", prompt: "test prompt", maxRounds: 10, + dryRun: false, }); expect(resp).toEqual({ ok: true }); @@ -161,6 +162,7 @@ describe("daemon-ipc — trigger-sense", () => { expect(wfManager.startWorkflow).toHaveBeenCalledWith("my-workflow", { prompt: "test prompt", maxRounds: 10, + dryRun: false, }); }); diff --git a/packages/daemon/src/__tests__/hot-reload.test.ts b/packages/daemon/src/__tests__/hot-reload.test.ts index 98a0ed1..d92072d 100644 --- a/packages/daemon/src/__tests__/hot-reload.test.ts +++ b/packages/daemon/src/__tests__/hot-reload.test.ts @@ -102,7 +102,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => { const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } }); const mgr = createWorkflowManager("/nerve-root", config, logStore); - mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10 }); + mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); expect(mockChildren).toHaveLength(1); // Remove workflow from config before drain completes @@ -121,8 +121,8 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => { const config = makeWfConfig({ "my-wf": { concurrency: 2, overflow: "drop" } }); const mgr = createWorkflowManager("/nerve-root", config, logStore); - mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10 }); - mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10 }); + mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); expect(mgr.activeCount("my-wf")).toBe(2); const drainPromise = mgr.drainAndRespawn("my-wf", 5000); @@ -153,7 +153,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => { const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } }); const mgr = createWorkflowManager("/nerve-root", config, logStore); - mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10 }); + mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); expect(mockChildren).toHaveLength(1); const drainPromise = mgr.drainAndRespawn("my-wf", 5000); @@ -169,7 +169,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => { const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } }); const mgr = createWorkflowManager("/nerve-root", config, logStore); - mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10 }); + mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); expect(mockChildren).toHaveLength(1); const drainPromise = mgr.drainAndRespawn("my-wf", 5000); @@ -186,7 +186,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => { const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } }); const mgr = createWorkflowManager("/nerve-root", config, logStore); - mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10 }); + mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); const drainPromise = mgr.drainAndRespawn("my-wf", 5000); await vi.runAllTimersAsync(); @@ -211,14 +211,14 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => { const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } }); const mgr = createWorkflowManager("/nerve-root", config, logStore); - mgr.startWorkflow("my-wf", { prompt: "first", maxRounds: 10 }); + mgr.startWorkflow("my-wf", { prompt: "first", maxRounds: 10, dryRun: false }); const drainPromise = mgr.drainAndRespawn("my-wf", 5000); await vi.runAllTimersAsync(); await drainPromise; // Start a new thread on the fresh worker - mgr.startWorkflow("my-wf", { prompt: "second", maxRounds: 10 }); + mgr.startWorkflow("my-wf", { prompt: "second", maxRounds: 10, dryRun: false }); const newChild = mockChildren[1]; const startCalls = (newChild.send as ReturnType).mock.calls.filter( @@ -261,7 +261,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => { }); // Trigger a workflow thread so a worker is spawned - kernel.workflowManager.startWorkflow("my-wf", { prompt: "test", maxRounds: 10 }); + kernel.workflowManager.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); // Manually call drainAndRespawn (simulating what kernel does on workflow file change) const drainPromise = kernel.workflowManager.drainAndRespawn("my-wf", 1000); @@ -296,7 +296,11 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => { }); // Spawn a worker for old-wf - kernel.workflowManager.startWorkflow("old-wf", { prompt: "test", maxRounds: 10 }); + kernel.workflowManager.startWorkflow("old-wf", { + prompt: "test", + maxRounds: 10, + dryRun: false, + }); expect(mockChildren).toHaveLength(1); // Reload config without old-wf @@ -334,7 +338,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => { logStore, }); - kernel.workflowManager.startWorkflow("my-wf", { prompt: "test", maxRounds: 10 }); + kernel.workflowManager.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); const workersBefore = mockChildren.length; // Reload with updated concurrency — should NOT spawn a new workflow worker @@ -354,8 +358,8 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => { expect(kernel.workflowManager.activeCount("my-wf")).toBe(1); // Can now start up to 5 concurrent threads (previously only 1) - kernel.workflowManager.startWorkflow("my-wf", { prompt: "test", maxRounds: 10 }); - kernel.workflowManager.startWorkflow("my-wf", { prompt: "test", maxRounds: 10 }); + kernel.workflowManager.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + kernel.workflowManager.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); expect(kernel.workflowManager.activeCount("my-wf")).toBe(3); const stopPromise = kernel.stop(); diff --git a/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts b/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts index c28d725..87d7fbf 100644 --- a/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts +++ b/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts @@ -202,6 +202,7 @@ describe("kernel + workflowManager integration", () => { workflow: "alert-workflow", prompt: "handle critical alert", maxRounds: 5, + dryRun: false, }); const stopPromise = kernel.stop(); diff --git a/packages/daemon/src/__tests__/workflow-manager.test.ts b/packages/daemon/src/__tests__/workflow-manager.test.ts index b736fb0..3830c1e 100644 --- a/packages/daemon/src/__tests__/workflow-manager.test.ts +++ b/packages/daemon/src/__tests__/workflow-manager.test.ts @@ -115,7 +115,7 @@ describe("WorkflowManager", () => { }); const mgr = createWorkflowManager("/nerve-root", config, logStore); - mgr.startWorkflow("my-workflow", { prompt: "test", maxRounds: 10 }); + mgr.startWorkflow("my-workflow", { prompt: "test", maxRounds: 10, dryRun: false }); expect(mockChildren).toHaveLength(1); expect(mockChildren[0].send).toHaveBeenCalledWith( @@ -131,8 +131,8 @@ describe("WorkflowManager", () => { }); const mgr = createWorkflowManager("/nerve-root", config, logStore); - mgr.startWorkflow("my-workflow", { prompt: "test 1", maxRounds: 10 }); - mgr.startWorkflow("my-workflow", { prompt: "test 2", maxRounds: 10 }); + mgr.startWorkflow("my-workflow", { prompt: "test 1", maxRounds: 10, dryRun: false }); + mgr.startWorkflow("my-workflow", { prompt: "test 2", maxRounds: 10, dryRun: false }); // Only one forked child — worker is reused expect(mockChildren).toHaveLength(1); @@ -147,7 +147,7 @@ describe("WorkflowManager", () => { }); const mgr = createWorkflowManager("/nerve-root", config, logStore); - mgr.startWorkflow("my-workflow", { prompt: "test", maxRounds: 10 }); + mgr.startWorkflow("my-workflow", { prompt: "test", maxRounds: 10, dryRun: false }); expect(logStore.upsertWorkflowRun).toHaveBeenCalledWith( expect.objectContaining({ source: "workflow", type: "started" }), @@ -164,9 +164,9 @@ describe("WorkflowManager", () => { }); const mgr = createWorkflowManager("/nerve-root", config, logStore); - mgr.startWorkflow("drop-wf", { prompt: "first", maxRounds: 10 }); + mgr.startWorkflow("drop-wf", { prompt: "first", maxRounds: 10, dryRun: false }); // now at limit — second call should be dropped - mgr.startWorkflow("drop-wf", { prompt: "second", maxRounds: 10 }); + mgr.startWorkflow("drop-wf", { prompt: "second", maxRounds: 10, dryRun: false }); expect(mgr.activeCount("drop-wf")).toBe(1); expect(mgr.queueLength("drop-wf")).toBe(0); @@ -181,8 +181,8 @@ describe("WorkflowManager", () => { }); const mgr = createWorkflowManager("/nerve-root", config, logStore); - mgr.startWorkflow("drop-wf", { prompt: "test", maxRounds: 10 }); - mgr.startWorkflow("drop-wf", { prompt: "test", maxRounds: 10 }); + mgr.startWorkflow("drop-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + mgr.startWorkflow("drop-wf", { prompt: "test", maxRounds: 10, dryRun: false }); const droppedCall = logStore.upsertWorkflowRun.mock.calls.find( ([entry]) => entry.type === "dropped", @@ -199,8 +199,8 @@ describe("WorkflowManager", () => { }); const mgr = createWorkflowManager("/nerve-root", config, logStore); - mgr.startWorkflow("queue-wf", { prompt: "first", maxRounds: 10 }); - mgr.startWorkflow("queue-wf", { prompt: "second", maxRounds: 10 }); + mgr.startWorkflow("queue-wf", { prompt: "first", maxRounds: 10, dryRun: false }); + mgr.startWorkflow("queue-wf", { prompt: "second", maxRounds: 10, dryRun: false }); expect(mgr.activeCount("queue-wf")).toBe(1); expect(mgr.queueLength("queue-wf")).toBe(1); @@ -213,8 +213,8 @@ describe("WorkflowManager", () => { }); const mgr = createWorkflowManager("/nerve-root", config, logStore); - mgr.startWorkflow("queue-wf", { prompt: "test", maxRounds: 10 }); - mgr.startWorkflow("queue-wf", { prompt: "test", maxRounds: 10 }); + mgr.startWorkflow("queue-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + mgr.startWorkflow("queue-wf", { prompt: "test", maxRounds: 10, dryRun: false }); const queuedCall = logStore.upsertWorkflowRun.mock.calls.find( ([entry]) => entry.type === "queued", @@ -233,12 +233,12 @@ describe("WorkflowManager", () => { const mgr = createWorkflowManager("/nerve-root", config, logStore); // Fill the concurrency slot - mgr.startWorkflow("queue-wf", { prompt: "test 0", maxRounds: 10 }); + mgr.startWorkflow("queue-wf", { prompt: "test 0", maxRounds: 10, dryRun: false }); // Fill the queue to maxQueue - mgr.startWorkflow("queue-wf", { prompt: "test 1", maxRounds: 10 }); - mgr.startWorkflow("queue-wf", { prompt: "test 2", maxRounds: 10 }); + mgr.startWorkflow("queue-wf", { prompt: "test 1", maxRounds: 10, dryRun: false }); + mgr.startWorkflow("queue-wf", { prompt: "test 2", maxRounds: 10, dryRun: false }); // This one should push out the oldest - mgr.startWorkflow("queue-wf", { prompt: "test 3", maxRounds: 10 }); + mgr.startWorkflow("queue-wf", { prompt: "test 3", maxRounds: 10, dryRun: false }); // Queue should still be at maxQueue (2) expect(mgr.queueLength("queue-wf")).toBe(2); @@ -259,8 +259,8 @@ describe("WorkflowManager", () => { }); const mgr = createWorkflowManager("/nerve-root", config, logStore); - mgr.startWorkflow("queue-wf", { prompt: "first", maxRounds: 10 }); - mgr.startWorkflow("queue-wf", { prompt: "second", maxRounds: 10 }); + mgr.startWorkflow("queue-wf", { prompt: "first", maxRounds: 10, dryRun: false }); + mgr.startWorkflow("queue-wf", { prompt: "second", maxRounds: 10, dryRun: false }); expect(mgr.activeCount("queue-wf")).toBe(1); expect(mgr.queueLength("queue-wf")).toBe(1); @@ -294,8 +294,8 @@ describe("WorkflowManager", () => { }); const mgr = createWorkflowManager("/nerve-root", config, logStore); - mgr.startWorkflow("queue-wf", { prompt: "first", maxRounds: 10 }); - mgr.startWorkflow("queue-wf", { prompt: "second", maxRounds: 10 }); + mgr.startWorkflow("queue-wf", { prompt: "first", maxRounds: 10, dryRun: false }); + mgr.startWorkflow("queue-wf", { prompt: "second", maxRounds: 10, dryRun: false }); const child = mockChildren[0]; const firstRunId = (child.send as ReturnType).mock.calls[0][0].runId as string; @@ -321,8 +321,8 @@ describe("WorkflowManager", () => { }); const mgr = createWorkflowManager("/nerve-root", config, logStore); - mgr.startWorkflow("wf-a", { prompt: "test", maxRounds: 10 }); - mgr.startWorkflow("wf-b", { prompt: "test", maxRounds: 10 }); + mgr.startWorkflow("wf-a", { prompt: "test", maxRounds: 10, dryRun: false }); + mgr.startWorkflow("wf-b", { prompt: "test", maxRounds: 10, dryRun: false }); // Two distinct workers should have been forked expect(mockChildren).toHaveLength(2); @@ -348,7 +348,7 @@ describe("WorkflowManager", () => { await vi.runAllTimersAsync(); await stopPromise; - mgr.startWorkflow("wf-a", { prompt: "test", maxRounds: 10 }); + mgr.startWorkflow("wf-a", { prompt: "test", maxRounds: 10, dryRun: false }); // No worker should have been spawned expect(mockChildren).toHaveLength(0); @@ -361,7 +361,7 @@ describe("WorkflowManager", () => { const config = makeConfig({}); const mgr = createWorkflowManager("/nerve-root", config, logStore); - mgr.startWorkflow("no-such-workflow", { prompt: "test", maxRounds: 10 }); + mgr.startWorkflow("no-such-workflow", { prompt: "test", maxRounds: 10, dryRun: false }); expect(mockChildren).toHaveLength(0); expect(logStore.upsertWorkflowRun).not.toHaveBeenCalled(); diff --git a/packages/daemon/src/daemon-ipc.ts b/packages/daemon/src/daemon-ipc.ts index 166f067..bb78e83 100644 --- a/packages/daemon/src/daemon-ipc.ts +++ b/packages/daemon/src/daemon-ipc.ts @@ -55,6 +55,7 @@ export function createDaemonIpcServer( workflowManager.startWorkflow(req.workflow, { prompt: req.prompt, maxRounds: req.maxRounds, + dryRun: req.dryRun, }); const resp: DaemonIpcResponse = { ok: true }; socket.write(`${JSON.stringify(resp)}\n`); diff --git a/packages/daemon/src/ipc.ts b/packages/daemon/src/ipc.ts index ff31aa4..06df5ac 100644 --- a/packages/daemon/src/ipc.ts +++ b/packages/daemon/src/ipc.ts @@ -34,6 +34,8 @@ export type StartThreadMessage = { prompt: string; /** Safety-valve: max moderator rounds for this thread (engine launch parameter). */ maxRounds: number; + /** When true, roles may skip side effects (thread-level hint on the start frame). */ + dryRun: boolean; }; /** Parent → Workflow Worker: resume an existing thread after crash recovery */ @@ -44,6 +46,8 @@ export type ResumeThreadMessage = { messages: Array<{ role: string; content: string; meta: unknown; timestamp: number }>; /** Safety-valve: max moderator rounds for this thread. */ maxRounds: number; + /** Thread-level dry-run hint (aligns with persisted `__start__` meta when replaying). */ + dryRun: boolean; }; /** Union of all messages the parent sends to a worker */ @@ -135,6 +139,7 @@ function validateStartThreadMsg(obj: Record): string | null { if (typeof obj.workflow !== "string") return "'start-thread' message missing string 'workflow'"; if (typeof obj.prompt !== "string") return "'start-thread' message missing string 'prompt'"; if (typeof obj.maxRounds !== "number") return "'start-thread' message missing number 'maxRounds'"; + if (typeof obj.dryRun !== "boolean") return "'start-thread' message missing boolean 'dryRun'"; return null; } @@ -143,6 +148,7 @@ function validateResumeThreadMsg(obj: Record): string | null { if (!Array.isArray(obj.messages)) return "'resume-thread' message missing 'messages' array"; if (typeof obj.maxRounds !== "number") return "'resume-thread' message missing number 'maxRounds'"; + if (typeof obj.dryRun !== "boolean") return "'resume-thread' message missing boolean 'dryRun'"; return null; } @@ -180,6 +186,7 @@ export function parseParentMessage(raw: unknown): Result workflow: obj.workflow, prompt: obj.prompt, maxRounds: obj.maxRounds, + dryRun: obj.dryRun, } as StartThreadMessage); } if (obj.type === "resume-thread") { @@ -191,6 +198,7 @@ export function parseParentMessage(raw: unknown): Result runId: obj.runId, messages: obj.messages as ResumeThreadMessage["messages"], maxRounds: obj.maxRounds, + dryRun: obj.dryRun, } as ResumeThreadMessage); } return err(new Error(`Unhandled IPC message type: "${obj.type}"`)); diff --git a/packages/daemon/src/kernel.ts b/packages/daemon/src/kernel.ts index 207fc9c..17d8db7 100644 --- a/packages/daemon/src/kernel.ts +++ b/packages/daemon/src/kernel.ts @@ -148,7 +148,7 @@ export function createKernel( const route = routeSenseComputeOutput(msg.payload); if (route.kind === "launch") { const { workflowName, maxRounds, prompt } = route.launch; - workflowManager.startWorkflow(workflowName, { prompt, maxRounds }); + workflowManager.startWorkflow(workflowName, { prompt, maxRounds, dryRun: false }); logStore.append({ source: "sense", type: "workflow-launch", diff --git a/packages/daemon/src/workflow-manager.ts b/packages/daemon/src/workflow-manager.ts index dddb440..e65ba84 100644 --- a/packages/daemon/src/workflow-manager.ts +++ b/packages/daemon/src/workflow-manager.ts @@ -31,6 +31,7 @@ import { export type WorkflowLaunchParams = { prompt: string; maxRounds: number; + dryRun: boolean; }; export type WorkflowManager = { @@ -58,6 +59,7 @@ type PendingThread = { runId: string; prompt: string; maxRounds: number; + dryRun: boolean; }; type WorkflowState = { @@ -90,20 +92,22 @@ const DEFAULT_MAX_QUEUE = 100; function readLaunchFromTriggerPayload( raw: unknown, engineDefaultMaxRounds: number, -): { prompt: string; maxRounds: number } { +): { prompt: string; maxRounds: number; dryRun: boolean } { if (isPlainRecord(raw)) { const o = raw; if (typeof o.prompt === "string" && typeof o.maxRounds === "number") { - return { prompt: o.prompt, maxRounds: o.maxRounds }; + const dryRun = typeof o.dryRun === "boolean" ? o.dryRun : false; + return { prompt: o.prompt, maxRounds: o.maxRounds, dryRun }; } } - return { prompt: "", maxRounds: engineDefaultMaxRounds }; + return { prompt: "", maxRounds: engineDefaultMaxRounds, dryRun: false }; } function ensureThreadMessagesWithStart( messages: Array<{ role: string; content: string; meta: unknown; timestamp: number }>, fallbackPrompt: string, fallbackMaxRounds: number, + fallbackDryRun: boolean, ): WorkflowMessage[] { const mapped: WorkflowMessage[] = messages.map((m) => ({ role: m.role, @@ -117,7 +121,7 @@ function ensureThreadMessagesWithStart( const start: WorkflowMessage = { role: START, content: fallbackPrompt, - meta: { maxRounds: fallbackMaxRounds }, + meta: { maxRounds: fallbackMaxRounds, dryRun: fallbackDryRun }, timestamp: Date.now(), }; return [start, ...mapped]; @@ -260,6 +264,7 @@ export function createWorkflowManager( runId: string, prompt: string, maxRounds: number, + dryRun: boolean, ): void { const state = getOrCreateState(workflowName); state.active.add(runId); @@ -271,9 +276,10 @@ export function createWorkflowManager( workflow: workflowName, prompt, maxRounds, + dryRun, }; sendStartThread(worker.process, msg); - logWorkflowEvent(workflowName, runId, "started", { prompt, maxRounds }); + logWorkflowEvent(workflowName, runId, "started", { prompt, maxRounds, dryRun }); } function dequeueNext(workflowName: string): void { @@ -286,7 +292,7 @@ export function createWorkflowManager( if (state.active.size < concurrency) { const next = state.queue.shift(); if (next !== undefined) { - dispatchThread(workflowName, next.runId, next.prompt, next.maxRounds); + dispatchThread(workflowName, next.runId, next.prompt, next.maxRounds, next.dryRun); } } } @@ -311,7 +317,12 @@ export function createWorkflowManager( logStore.getTriggerPayload(runId), config.maxRounds, ); - state.queue.push({ runId, prompt: launch.prompt, maxRounds: launch.maxRounds }); + state.queue.push({ + runId, + prompt: launch.prompt, + maxRounds: launch.maxRounds, + dryRun: launch.dryRun, + }); process.stderr.write( `[workflow-manager] crash-recovery: re-queued thread "${runId}" for "${workflowName}"\n`, ); @@ -329,13 +340,19 @@ export function createWorkflowManager( logStore.getTriggerPayload(runId), config.maxRounds, ); - const messages = ensureThreadMessagesWithStart(rawMessages, launch.prompt, launch.maxRounds); + const messages = ensureThreadMessagesWithStart( + rawMessages, + launch.prompt, + launch.maxRounds, + launch.dryRun, + ); state.active.add(runId); const msg: ResumeThreadMessage = { type: "resume-thread", runId, messages, maxRounds: launch.maxRounds, + dryRun: launch.dryRun, }; sendResumeThread(worker.process, msg); process.stderr.write( @@ -531,10 +548,10 @@ export function createWorkflowManager( const state = getOrCreateState(workflowName); const runId = crypto.randomUUID(); - const { prompt, maxRounds } = launch; + const { prompt, maxRounds, dryRun } = launch; if (state.active.size < wfConfig.concurrency) { - dispatchThread(workflowName, runId, prompt, maxRounds); + dispatchThread(workflowName, runId, prompt, maxRounds, dryRun); return; } @@ -559,7 +576,7 @@ export function createWorkflowManager( } } - state.queue.push({ runId, prompt, maxRounds }); + state.queue.push({ runId, prompt, maxRounds, dryRun }); logWorkflowEvent(workflowName, runId, "queued"); process.stderr.write( `[workflow-manager] queued thread for "${workflowName}" runId "${runId}" (queue length: ${state.queue.length})\n`, diff --git a/packages/daemon/src/workflow-worker.ts b/packages/daemon/src/workflow-worker.ts index b8dbcb6..54a3fc9 100644 --- a/packages/daemon/src/workflow-worker.ts +++ b/packages/daemon/src/workflow-worker.ts @@ -86,7 +86,18 @@ function validateRoleResult( } function isStartMeta(meta: unknown): meta is StartSignal["meta"] { - return isPlainRecord(meta) && typeof meta.maxRounds === "number"; + return ( + isPlainRecord(meta) && typeof meta.maxRounds === "number" && typeof meta.dryRun === "boolean" + ); +} + +function normalizeStartMeta(meta: unknown, maxRoundsFallback: number): StartSignal["meta"] { + if (!isPlainRecord(meta)) { + return { maxRounds: maxRoundsFallback, dryRun: false }; + } + const maxRounds = typeof meta.maxRounds === "number" ? meta.maxRounds : maxRoundsFallback; + const dryRun = typeof meta.dryRun === "boolean" ? meta.dryRun : false; + return { maxRounds, dryRun }; } function startSignalFromWorkflowMessage( @@ -97,11 +108,11 @@ function startSignalFromWorkflowMessage( return { role: START, content: "", - meta: { maxRounds: maxRoundsFallback }, + meta: { maxRounds: maxRoundsFallback, dryRun: false }, timestamp: Date.now(), }; } - const meta = isStartMeta(msg.meta) ? msg.meta : { maxRounds: maxRoundsFallback }; + const meta = isStartMeta(msg.meta) ? msg.meta : normalizeStartMeta(msg.meta, maxRoundsFallback); return { role: START, content: msg.content, @@ -121,6 +132,7 @@ function initThreadMessages( resumeMessages: WorkflowMessage[], freshPrompt: string | null, maxRounds: number, + dryRun: boolean, ): ThreadMessagesState { if (resumeMessages.length > 0) { const [first, ...rest] = resumeMessages; @@ -135,7 +147,7 @@ function initThreadMessages( start: { role: START, content: prompt, - meta: { maxRounds }, + meta: { maxRounds, dryRun }, timestamp: Date.now(), }, messages: [...resumeMessages], @@ -145,7 +157,7 @@ function initThreadMessages( const start: StartSignal = { role: START, content: prompt, - meta: { maxRounds }, + meta: { maxRounds, dryRun }, timestamp: Date.now(), }; sendWorkflowMessage(runId, { @@ -189,12 +201,14 @@ async function runThread( maxRounds: number, resumeMessages: WorkflowMessage[] = [], freshPrompt: string | null = null, + dryRun = false, ): Promise { const { start, messages: roleMessages } = initThreadMessages( runId, resumeMessages, freshPrompt, maxRounds, + dryRun, ); let roleRound = roleMessages.length; @@ -308,11 +322,11 @@ function handleMessage( if (msg.type === "start-thread") { if (shuttingDown.value) return; - const { runId, prompt, maxRounds } = msg; + const { runId, prompt, maxRounds, dryRun } = msg; const previous = inFlight.get(runId) ?? Promise.resolve(); const next = previous - .then(() => runThread(def, runId, maxRounds, [], prompt)) + .then(() => runThread(def, runId, maxRounds, [], prompt, dryRun)) .catch((e: unknown) => { const errMsg = e instanceof Error ? e.message : String(e); sendWorkflowError(runId, errMsg); @@ -327,11 +341,11 @@ function handleMessage( if (msg.type === "resume-thread") { if (shuttingDown.value) return; - const { runId, messages, maxRounds } = msg; + const { runId, messages, maxRounds, dryRun } = msg; const previous = inFlight.get(runId) ?? Promise.resolve(); const next = previous - .then(() => runThread(def, runId, maxRounds, messages, null)) + .then(() => runThread(def, runId, maxRounds, messages, null, dryRun)) .catch((e: unknown) => { const errMsg = e instanceof Error ? e.message : String(e); sendWorkflowError(runId, errMsg); diff --git a/packages/store/src/log-store.ts b/packages/store/src/log-store.ts index 34b94b9..c025db9 100644 --- a/packages/store/src/log-store.ts +++ b/packages/store/src/log-store.ts @@ -242,6 +242,41 @@ function runInTransaction(db: DatabaseSync, fn: () => T): T { } } +function launchShapeFromRecord(rec: Record): { + prompt: string; + maxRounds: number; + dryRun: boolean; +} | null { + if (typeof rec.prompt !== "string" || typeof rec.maxRounds !== "number") return null; + return { + prompt: rec.prompt, + maxRounds: rec.maxRounds, + dryRun: typeof rec.dryRun === "boolean" ? rec.dryRun : false, + }; +} + +/** Parse JSON from a workflow `started` log row into a trigger / launch payload for crash recovery. */ +function triggerPayloadFromStartedLogJson(payload: string): unknown | null { + let parsed: unknown; + try { + parsed = JSON.parse(payload); + } catch { + return null; + } + if (!isPlainRecord(parsed)) return null; + + const direct = launchShapeFromRecord(parsed); + if (direct !== null) return direct; + + const inner = parsed.triggerPayload; + if (inner !== null && isPlainRecord(inner)) { + const fromInner = launchShapeFromRecord(inner); + if (fromInner !== null) return fromInner; + return inner; + } + return null; +} + function runOptionalVacuum(sqlite: DatabaseSync, vacuum?: boolean): boolean { if (vacuum !== true) return false; sqlite.exec("VACUUM"); @@ -513,15 +548,7 @@ export function createLogStore(dbPath: string): LogStore { function getTriggerPayload(runId: string): unknown { const row = getTriggerPayloadStmt.get(runId) as { payload: string | null } | undefined; if (row === undefined || row.payload === null) return null; - try { - const parsed: unknown = JSON.parse(row.payload); - if (isPlainRecord(parsed)) { - return parsed.triggerPayload ?? null; - } - } catch { - // malformed - } - return null; + return triggerPayloadFromStartedLogJson(row.payload); } function getThreadEvents(runId: string): Array<{ type: string; [key: string]: unknown }> { diff --git a/packages/workflow-utils/src/index.ts b/packages/workflow-utils/src/index.ts index 42aa438..c874a09 100644 --- a/packages/workflow-utils/src/index.ts +++ b/packages/workflow-utils/src/index.ts @@ -19,3 +19,4 @@ export { type SpawnResult, type SpawnSafeOptions, } from "./spawn-safe.js"; +export { isDryRun } from "./start-signal.js"; diff --git a/packages/workflow-utils/src/start-signal.ts b/packages/workflow-utils/src/start-signal.ts new file mode 100644 index 0000000..3c37efa --- /dev/null +++ b/packages/workflow-utils/src/start-signal.ts @@ -0,0 +1,6 @@ +import type { StartSignal } from "@uncaged/nerve-core"; + +/** Returns the thread-level dry-run flag from the workflow start frame. */ +export function isDryRun(start: StartSignal): boolean { + return start.meta.dryRun; +}