diff --git a/packages/daemon/src/__tests__/daemon-ipc.test.ts b/packages/daemon/src/__tests__/daemon-ipc.test.ts index 688250d..8deb9cc 100644 --- a/packages/daemon/src/__tests__/daemon-ipc.test.ts +++ b/packages/daemon/src/__tests__/daemon-ipc.test.ts @@ -31,6 +31,7 @@ function makeMockWorkflowManager() { stop: vi.fn(async () => {}), totalActiveCount: vi.fn(() => 0), drainAndRespawn: vi.fn(async () => {}), + drainWhenIdle: vi.fn(), updateConfig: vi.fn(), getActiveWorkflowRuns: vi.fn(() => []), }; diff --git a/packages/daemon/src/__tests__/hot-reload.test.ts b/packages/daemon/src/__tests__/hot-reload.test.ts index 5a7d356..f5dd4a7 100644 --- a/packages/daemon/src/__tests__/hot-reload.test.ts +++ b/packages/daemon/src/__tests__/hot-reload.test.ts @@ -3,6 +3,7 @@ * * Verifies that: * - drainAndRespawn() sends shutdown, waits for exit, then respawns the worker + * - drainWhenIdle() defers drain+respawn until in-flight threads finish * - Kernel dispatches handleWorkflowFileChange when file-watcher emits a workflow change * - Kernel logs a workflow_reload system event on hot reload * - drainAndRespawn on a non-existent worker is a no-op @@ -238,6 +239,199 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => { }); }); +describe("WorkflowManager — drainWhenIdle (hot reload without interrupting in-flight)", () => { + beforeEach(() => { + mockChildren.length = 0; + vi.useFakeTimers({ shouldAdvanceTime: true }); + }); + + afterEach(() => { + vi.useRealTimers(); + vi.clearAllMocks(); + }); + + it("does not send shutdown while a thread is still active", () => { + const logStore = makeLogStore(); + const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } }); + const mgr = createWorkflowManager("/nerve-root", config, logStore); + + mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + const child = mockChildren[0]; + + mgr.drainWhenIdle("my-wf"); + + const shutdownCalls = (child.send as ReturnType).mock.calls.filter( + (args: unknown[]) => + args[0] !== null && + typeof args[0] === "object" && + (args[0] as Record).type === "shutdown", + ); + expect(shutdownCalls).toHaveLength(0); + }); + + it("sends shutdown after the last active thread completes (deferred drain)", async () => { + const logStore = makeLogStore(); + const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } }); + const mgr = createWorkflowManager("/nerve-root", config, logStore); + + mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + const child = mockChildren[0]; + const runId = (child.send as ReturnType).mock.calls[0][0] as { runId: string }; + + mgr.drainWhenIdle("my-wf"); + + child.emit("message", { + type: "thread-event", + runId: runId.runId, + eventType: "completed", + payload: null, + }); + + await vi.runAllTimersAsync(); + + expect(child.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" })); + expect(mockChildren).toHaveLength(2); + + const stopPromise = mgr.stop(); + await vi.runAllTimersAsync(); + await stopPromise; + }); + + it("waits for all concurrent threads before draining", async () => { + const logStore = makeLogStore(); + const config = makeWfConfig({ "my-wf": { concurrency: 2, overflow: "drop" } }); + const mgr = createWorkflowManager("/nerve-root", config, logStore); + + mgr.startWorkflow("my-wf", { prompt: "a", maxRounds: 10, dryRun: false }); + mgr.startWorkflow("my-wf", { prompt: "b", maxRounds: 10, dryRun: false }); + const child = mockChildren[0]; + const sendMock = child.send as ReturnType; + const runIdA = (sendMock.mock.calls[0][0] as { runId: string }).runId; + const runIdB = (sendMock.mock.calls[1][0] as { runId: string }).runId; + + mgr.drainWhenIdle("my-wf"); + + child.emit("message", { + type: "thread-event", + runId: runIdA, + eventType: "completed", + payload: null, + }); + await vi.runAllTimersAsync(); + + const shutdownBefore = sendMock.mock.calls.filter( + (args: unknown[]) => + args[0] !== null && + typeof args[0] === "object" && + (args[0] as Record).type === "shutdown", + ); + expect(shutdownBefore).toHaveLength(0); + + child.emit("message", { + type: "thread-event", + runId: runIdB, + eventType: "completed", + payload: null, + }); + await vi.runAllTimersAsync(); + + expect(child.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" })); + + const stopPromise = mgr.stop(); + await vi.runAllTimersAsync(); + await stopPromise; + }); + + it("duplicate drainWhenIdle while busy only schedules one deferred drain", async () => { + const logStore = makeLogStore(); + const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } }); + const mgr = createWorkflowManager("/nerve-root", config, logStore); + + mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + const child = mockChildren[0]; + const runId = (child.send as ReturnType).mock.calls[0][0] as { runId: string }; + + mgr.drainWhenIdle("my-wf"); + mgr.drainWhenIdle("my-wf"); + + child.emit("message", { + type: "thread-event", + runId: runId.runId, + eventType: "completed", + payload: null, + }); + await vi.runAllTimersAsync(); + + const shutdownCalls = (child.send as ReturnType).mock.calls.filter( + (args: unknown[]) => + args[0] !== null && + typeof args[0] === "object" && + (args[0] as Record).type === "shutdown", + ); + expect(shutdownCalls).toHaveLength(1); + + const stopPromise = mgr.stop(); + await vi.runAllTimersAsync(); + await stopPromise; + }); + + it("deferred drain runs after workflow-error clears the active thread", async () => { + const logStore = makeLogStore(); + const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } }); + const mgr = createWorkflowManager("/nerve-root", config, logStore); + + mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + const child = mockChildren[0]; + const runId = (child.send as ReturnType).mock.calls[0][0] as { runId: string }; + + mgr.drainWhenIdle("my-wf"); + + child.emit("message", { + type: "workflow-error", + runId: runId.runId, + error: "role failed", + exitCode: 1, + }); + await vi.runAllTimersAsync(); + + expect(child.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" })); + + const stopPromise = mgr.stop(); + await vi.runAllTimersAsync(); + await stopPromise; + }); + + it("drains immediately when there is no in-flight thread", async () => { + const logStore = makeLogStore(); + const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } }); + const mgr = createWorkflowManager("/nerve-root", config, logStore); + + mgr.startWorkflow("my-wf", { prompt: "once", maxRounds: 10, dryRun: false }); + const firstChild = mockChildren[0]; + const runId = (firstChild.send as ReturnType).mock.calls[0][0] as { + runId: string; + }; + + firstChild.emit("message", { + type: "thread-event", + runId: runId.runId, + eventType: "completed", + payload: null, + }); + await vi.runAllTimersAsync(); + + mgr.drainWhenIdle("my-wf"); + await vi.runAllTimersAsync(); + + expect(firstChild.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" })); + expect(mockChildren).toHaveLength(2); + + const stopPromise = mgr.stop(); + await vi.runAllTimersAsync(); + await stopPromise; + }); +}); + describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => { let nerveRoot: string; diff --git a/packages/daemon/src/kernel-file-watch.ts b/packages/daemon/src/kernel-file-watch.ts index f94b124..f0b353e 100644 --- a/packages/daemon/src/kernel-file-watch.ts +++ b/packages/daemon/src/kernel-file-watch.ts @@ -57,10 +57,7 @@ export function createKernelFileWatchHandlers(deps: KernelFileWatchDeps): Kernel payload: null, timestamp: Date.now(), }); - deps.workflowManager.drainAndRespawn(workflowName).catch((e) => { - const msg = e instanceof Error ? e.message : String(e); - process.stderr.write(`[kernel] drainAndRespawn error for "${workflowName}": ${msg}\n`); - }); + deps.workflowManager.drainWhenIdle(workflowName); } function onConfigFileChange(): void { diff --git a/packages/daemon/src/workflow-manager.ts b/packages/daemon/src/workflow-manager.ts index 799d1bf..dce111e 100644 --- a/packages/daemon/src/workflow-manager.ts +++ b/packages/daemon/src/workflow-manager.ts @@ -57,6 +57,12 @@ export type WorkflowManager = { * Waits up to `drainTimeoutMs` for threads to complete before force-killing. */ drainAndRespawn: (workflowName: string, drainTimeoutMs?: number) => Promise; + /** + * Schedule a drain+respawn that waits for in-flight runs to finish first. + * If no runs are active, drains immediately. Otherwise marks a pending reload + * and drains automatically when the last active run completes. + */ + drainWhenIdle: (workflowName: string) => void; /** Gracefully shut down all workflow workers. */ stop: () => Promise; }; @@ -222,6 +228,7 @@ export function createWorkflowManager( const crashTimestamps = new Map(); let stopped = false; let config = initialConfig; + const pendingDrains = new Set(); function getOrCreateState(workflowName: string): WorkflowState { let state = states.get(workflowName); @@ -328,6 +335,24 @@ export function createWorkflowManager( } } + /** If a hot-reload was deferred, run drain+respawn once there are no active threads. */ + function maybeDeferredHotReloadDrain(workflowName: string): void { + if (!pendingDrains.has(workflowName)) return; + const state = states.get(workflowName); + if (state === undefined || state.active.size !== 0) return; + + pendingDrains.delete(workflowName); + process.stderr.write( + `[workflow-manager] all runs complete for "${workflowName}", executing deferred hot-reload drain\n`, + ); + drainAndRespawn(workflowName).catch((e) => { + const errMsg = e instanceof Error ? e.message : String(e); + process.stderr.write( + `[workflow-manager] deferred drainAndRespawn error for "${workflowName}": ${errMsg}\n`, + ); + }); + } + function handleThreadEvent(workflowName: string, msg: ThreadEventMessage): void { const state = states.get(workflowName); if (state === undefined) return; @@ -337,6 +362,7 @@ export function createWorkflowManager( dequeueNext(workflowName); const exitCode = extractExitCode(msg.payload); logWorkflowEvent(workflowName, msg.runId, msg.eventType, msg.payload, exitCode); + maybeDeferredHotReloadDrain(workflowName); } } @@ -428,6 +454,7 @@ export function createWorkflowManager( state.active.clear(); workers.delete(workflowName); + pendingDrains.delete(workflowName); if (stopped || workflowConfig(workflowName) === null) return; @@ -484,6 +511,7 @@ export function createWorkflowManager( dequeueNext(workflowName); } logWorkflowEvent(workflowName, msg.runId, "failed", { error: msg.error }, msg.exitCode); + maybeDeferredHotReloadDrain(workflowName); return; } @@ -681,9 +709,38 @@ export function createWorkflowManager( await waitForExit(entry.process, drainTimeoutMs); // The exit handler (draining branch) will respawn the worker automatically } + function drainWhenIdle(workflowName: string): void { + const state = states.get(workflowName); + const hasActiveRuns = state !== undefined && state.active.size > 0; + + if (!hasActiveRuns) { + pendingDrains.delete(workflowName); + drainAndRespawn(workflowName).catch((e) => { + const msg = e instanceof Error ? e.message : String(e); + process.stderr.write( + `[workflow-manager] drainAndRespawn error for "${workflowName}": ${msg}\n`, + ); + }); + return; + } + + // Defer until all active runs finish + if (pendingDrains.has(workflowName)) { + process.stderr.write( + `[workflow-manager] hot-reload already pending for "${workflowName}", skipping duplicate\n`, + ); + return; + } + + pendingDrains.add(workflowName); + process.stderr.write( + `[workflow-manager] deferring hot-reload for "${workflowName}" until ${state.active.size} active run(s) complete\n`, + ); + } async function stop(): Promise { stopped = true; + pendingDrains.clear(); const exitPromises: Promise[] = []; for (const entry of workers.values()) { sendShutdown(entry.process, entry); @@ -701,6 +758,7 @@ export function createWorkflowManager( totalActiveCount, updateConfig, drainAndRespawn, + drainWhenIdle, stop, }; }