diff --git a/packages/daemon/src/__tests__/crash-recovery.test.ts b/packages/daemon/src/__tests__/crash-recovery.test.ts index c29f76c..4891997 100644 --- a/packages/daemon/src/__tests__/crash-recovery.test.ts +++ b/packages/daemon/src/__tests__/crash-recovery.test.ts @@ -28,6 +28,11 @@ function makeMockChild(pid = 1): MockChild { child.connected = true; child.exitCode = null; child.pid = pid; + setImmediate(() => { + if (child.connected) { + child.emit("message", { type: "ready" }); + } + }); child.send = vi.fn((msg: unknown) => { if ( msg !== null && @@ -132,6 +137,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => { mgr.startWorkflow("my-wf", { prompt: "test 1", maxRounds: 10, dryRun: false }); mgr.startWorkflow("my-wf", { prompt: "test 2", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); expect(mgr.activeCount("my-wf")).toBe(2); // Simulate unexpected exit (not shutdown) @@ -159,6 +165,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => { mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); expect(mgr.activeCount("my-wf")).toBe(2); const child = mockChildren[0]; @@ -183,6 +190,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => { const mgr = createWorkflowManager("/nerve-root", config, logStore); mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); expect(mockChildren).toHaveLength(1); const child = mockChildren[0]; @@ -216,6 +224,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => { const mgr = createWorkflowManager("/nerve-root", config, logStore); mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); const firstChild = mockChildren[0]; firstChild.exitCode = 1; firstChild.connected = false; @@ -260,6 +269,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => { // Start one thread to fill the concurrency slot (so queued run stays queued on respawn) mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); const firstChild = mockChildren[0]; firstChild.exitCode = 1; firstChild.connected = false; @@ -285,6 +295,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => { const mgr = createWorkflowManager("/nerve-root", config, logStore); mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); const child = mockChildren[0]; const startCall = (child.send as ReturnType).mock.calls[0]; @@ -322,6 +333,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => { const launch = { prompt: "build-docker for myrepo", maxRounds: 10, dryRun: false }; mgr.startWorkflow("my-wf", launch); + await vi.runAllTimersAsync(); const startedCall = logStore.upsertWorkflowRun.mock.calls.find( (args: any[]) => (args[0] as { type: string }).type === "started", @@ -357,6 +369,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => { // Start one thread to fill the concurrency slot mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); const firstChild = mockChildren[0]; // Crash once → respawn → crash again → second respawn @@ -398,6 +411,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => { const mgr = createWorkflowManager("/nerve-root", config, logStore); mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); const firstChild = mockChildren[0]; firstChild.exitCode = 1; firstChild.connected = false; @@ -428,6 +442,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => { const mgr = createWorkflowManager("/nerve-root", config, logStore); mgr.startWorkflow("crash-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); // Crash the worker 6 times in rapid succession (within CRASH_WINDOW_MS = 60s) for (let i = 0; i < 6; i++) { diff --git a/packages/daemon/src/__tests__/hot-reload.test.ts b/packages/daemon/src/__tests__/hot-reload.test.ts index 2bc092c..678affe 100644 --- a/packages/daemon/src/__tests__/hot-reload.test.ts +++ b/packages/daemon/src/__tests__/hot-reload.test.ts @@ -33,6 +33,11 @@ function makeMockChild(pid = 1): MockChild { child.connected = true; child.exitCode = null; child.pid = pid; + setImmediate(() => { + if (child.connected) { + child.emit("message", { type: "ready" }); + } + }); child.send = vi.fn((msg: unknown) => { if ( msg !== null && @@ -114,6 +119,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => { const mgr = createWorkflowManager("/nerve-root", config, logStore); mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); expect(mockChildren).toHaveLength(1); // Remove workflow from config before drain completes @@ -134,6 +140,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => { mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); expect(mgr.activeCount("my-wf")).toBe(2); const drainPromise = mgr.drainAndRespawn("my-wf", 5000); @@ -165,6 +172,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => { const mgr = createWorkflowManager("/nerve-root", config, logStore); mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); expect(mockChildren).toHaveLength(1); const drainPromise = mgr.drainAndRespawn("my-wf", 5000); @@ -181,6 +189,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => { const mgr = createWorkflowManager("/nerve-root", config, logStore); mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); expect(mockChildren).toHaveLength(1); const drainPromise = mgr.drainAndRespawn("my-wf", 5000); @@ -198,6 +207,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => { const mgr = createWorkflowManager("/nerve-root", config, logStore); mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); const drainPromise = mgr.drainAndRespawn("my-wf", 5000); await vi.runAllTimersAsync(); @@ -223,6 +233,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => { const mgr = createWorkflowManager("/nerve-root", config, logStore); mgr.startWorkflow("my-wf", { prompt: "first", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); const drainPromise = mgr.drainAndRespawn("my-wf", 5000); await vi.runAllTimersAsync(); @@ -230,6 +241,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => { // Start a new thread on the fresh worker mgr.startWorkflow("my-wf", { prompt: "second", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); const newChild = mockChildren[1]; const startCalls = (newChild.send as ReturnType).mock.calls.filter( @@ -257,12 +269,13 @@ describe("WorkflowManager — drainWhenIdle (hot reload without interrupting in- vi.clearAllMocks(); }); - it("does not send shutdown while a thread is still active", () => { + it("does not send shutdown while a thread is still active", async () => { const logStore = makeLogStore(); const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } }); const mgr = createWorkflowManager("/nerve-root", config, logStore); mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); const child = mockChildren[0]; mgr.drainWhenIdle("my-wf"); @@ -282,6 +295,7 @@ describe("WorkflowManager — drainWhenIdle (hot reload without interrupting in- const mgr = createWorkflowManager("/nerve-root", config, logStore); mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); const child = mockChildren[0]; const runId = (child.send as ReturnType).mock.calls[0][0] as { runId: string }; @@ -311,6 +325,7 @@ describe("WorkflowManager — drainWhenIdle (hot reload without interrupting in- mgr.startWorkflow("my-wf", { prompt: "a", maxRounds: 10, dryRun: false }); mgr.startWorkflow("my-wf", { prompt: "b", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); const child = mockChildren[0]; const sendMock = child.send as ReturnType; const runIdA = (sendMock.mock.calls[0][0] as { runId: string }).runId; @@ -355,6 +370,7 @@ describe("WorkflowManager — drainWhenIdle (hot reload without interrupting in- const mgr = createWorkflowManager("/nerve-root", config, logStore); mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); const child = mockChildren[0]; const runId = (child.send as ReturnType).mock.calls[0][0] as { runId: string }; @@ -388,6 +404,7 @@ describe("WorkflowManager — drainWhenIdle (hot reload without interrupting in- const mgr = createWorkflowManager("/nerve-root", config, logStore); mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); const child = mockChildren[0]; const runId = (child.send as ReturnType).mock.calls[0][0] as { runId: string }; @@ -414,6 +431,7 @@ describe("WorkflowManager — drainWhenIdle (hot reload without interrupting in- const mgr = createWorkflowManager("/nerve-root", config, logStore); mgr.startWorkflow("my-wf", { prompt: "once", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); const firstChild = mockChildren[0]; const runId = (firstChild.send as ReturnType).mock.calls[0][0] as { runId: string; @@ -471,6 +489,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => { // Trigger a workflow thread so a worker is spawned kernel.workflowManager.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); // Manually call drainAndRespawn (simulating what kernel does on workflow file change) const drainPromise = kernel.workflowManager.drainAndRespawn("my-wf", 1000); @@ -511,6 +530,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => { maxRounds: 10, dryRun: false, }); + await vi.runAllTimersAsync(); expect(mockChildren).toHaveLength(1); // Reload config without old-wf @@ -551,6 +571,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => { }); kernel.workflowManager.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); const workersBefore = mockChildren.length; // Reload with updated concurrency — should NOT spawn a new workflow worker @@ -573,6 +594,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => { // Can now start up to 5 concurrent threads (previously only 1) kernel.workflowManager.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); kernel.workflowManager.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); expect(kernel.workflowManager.activeCount("my-wf")).toBe(3); const stopPromise = kernel.stop(); diff --git a/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts b/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts index a5579b6..d9cc1bb 100644 --- a/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts +++ b/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts @@ -194,6 +194,8 @@ describe("kernel + workflowManager integration", () => { }); } + await vi.runAllTimersAsync(); + // A workflow worker should be spawned and a start-thread message sent const workflowWorker = mockChildren.find((c) => (c.send as ReturnType).mock.calls.some( @@ -252,6 +254,8 @@ describe("kernel + workflowManager integration", () => { }); } + await vi.runAllTimersAsync(); + // Find the start-thread call and verify triggerPayload const startThreadCall = mockChildren .flatMap((c) => (c.send as ReturnType).mock.calls as [unknown][]) @@ -306,6 +310,8 @@ describe("kernel + workflowManager integration", () => { }); } + await vi.runAllTimersAsync(); + const senseEntries = logStore.append.mock.calls .map((c) => c[0] as { source: string; type: string; refId: string | null }) .filter((e) => e.source === "sense" && e.refId === "cpu-usage"); @@ -423,6 +429,8 @@ describe("kernel + workflowManager integration", () => { }); } + await vi.runAllTimersAsync(); + expect(logStore.upsertWorkflowRun).toHaveBeenCalledWith( expect.objectContaining({ source: "workflow", type: "started" }), expect.objectContaining({ workflow: "log-test-workflow", status: "started" }), @@ -498,6 +506,8 @@ describe("kernel + workflowManager integration", () => { }); } + await vi.runAllTimersAsync(); + const startThreadCall = mockChildren .flatMap((c) => (c.send as ReturnType).mock.calls as [unknown][]) .find( @@ -582,6 +592,8 @@ describe("kernel + workflowManager integration", () => { }); } + await vi.runAllTimersAsync(); + const startThreadCall = mockChildren .flatMap((c) => (c.send as ReturnType).mock.calls as [unknown][]) .find( @@ -642,6 +654,8 @@ describe("kernel + workflowManager integration", () => { }); } + await vi.runAllTimersAsync(); + const stopPromise = kernel.stop(); await vi.runAllTimersAsync(); await expect(stopPromise).resolves.toBeUndefined(); diff --git a/packages/daemon/src/__tests__/worker-runtime.test.ts b/packages/daemon/src/__tests__/worker-runtime.test.ts index a428bff..642a33e 100644 --- a/packages/daemon/src/__tests__/worker-runtime.test.ts +++ b/packages/daemon/src/__tests__/worker-runtime.test.ts @@ -16,6 +16,7 @@ function baseConfig(script: string) { onMessage: vi.fn(), onReady: vi.fn(), onExit: vi.fn(), + onCrashLimitReached: null, respawn: { enabled: true, maxCrashes: 6, @@ -84,7 +85,7 @@ describe("createWorkerRuntime", () => { const rt = track(createWorkerRuntime(baseConfig(echoWorkerPath))); await rt.start("k"); expect(rt.has("k")).toBe(true); - await rt.evict("k"); + await rt.evict("k", null); expect(rt.has("k")).toBe(false); await rt.shutdown(); }); @@ -94,7 +95,7 @@ describe("createWorkerRuntime", () => { await rt.start("k"); const before = rt.pid("k"); expect(before).not.toBeNull(); - await rt.drain("k"); + await rt.drain("k", null); const after = rt.pid("k"); expect(after).not.toBeNull(); expect(after).not.toBe(before); diff --git a/packages/daemon/src/__tests__/workflow-manager.test.ts b/packages/daemon/src/__tests__/workflow-manager.test.ts index db86982..280ad41 100644 --- a/packages/daemon/src/__tests__/workflow-manager.test.ts +++ b/packages/daemon/src/__tests__/workflow-manager.test.ts @@ -26,6 +26,11 @@ function makeMockChild(pid = 1): MockChild { child.connected = true; child.exitCode = null; child.pid = pid; + setImmediate(() => { + if (child.connected) { + child.emit("message", { type: "ready" }); + } + }); child.send = vi.fn((msg: unknown) => { if ( msg !== null && @@ -110,7 +115,7 @@ describe("WorkflowManager", () => { }); describe("startWorkflow under concurrency limit dispatches thread", () => { - it("forks a worker and sends start-thread when active < concurrency", () => { + it("forks a worker and sends start-thread when active < concurrency", async () => { const logStore = makeLogStore(); const config = makeConfig({ "my-workflow": { concurrency: 2, overflow: "drop" }, @@ -118,6 +123,7 @@ describe("WorkflowManager", () => { const mgr = createWorkflowManager("/nerve-root", config, logStore); mgr.startWorkflow("my-workflow", { prompt: "test", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); expect(mockChildren).toHaveLength(1); expect(mockChildren[0].send).toHaveBeenCalledWith( @@ -126,7 +132,7 @@ describe("WorkflowManager", () => { expect(mgr.activeCount("my-workflow")).toBe(1); }); - it("reuses the same worker for a second thread under the limit", () => { + it("reuses the same worker for a second thread under the limit", async () => { const logStore = makeLogStore(); const config = makeConfig({ "my-workflow": { concurrency: 3, overflow: "drop" }, @@ -135,6 +141,7 @@ describe("WorkflowManager", () => { mgr.startWorkflow("my-workflow", { prompt: "test 1", maxRounds: 10, dryRun: false }); mgr.startWorkflow("my-workflow", { prompt: "test 2", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); // Only one forked child — worker is reused expect(mockChildren).toHaveLength(1); @@ -142,7 +149,7 @@ describe("WorkflowManager", () => { expect(mgr.activeCount("my-workflow")).toBe(2); }); - it("logs a 'started' event for each dispatched thread", () => { + it("logs a 'started' event for each dispatched thread", async () => { const logStore = makeLogStore(); const config = makeConfig({ "my-workflow": { concurrency: 2, overflow: "drop" }, @@ -150,6 +157,7 @@ describe("WorkflowManager", () => { const mgr = createWorkflowManager("/nerve-root", config, logStore); mgr.startWorkflow("my-workflow", { prompt: "test", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); expect(logStore.upsertWorkflowRun).toHaveBeenCalledWith( expect.objectContaining({ source: "workflow", type: "started" }), @@ -159,7 +167,7 @@ describe("WorkflowManager", () => { }); describe("startWorkflow at limit with drop overflow drops the request", () => { - it("does NOT send start-thread when at concurrency limit with overflow=drop", () => { + it("does NOT send start-thread when at concurrency limit with overflow=drop", async () => { const logStore = makeLogStore(); const config = makeConfig({ "drop-wf": { concurrency: 1, overflow: "drop" }, @@ -169,6 +177,7 @@ describe("WorkflowManager", () => { mgr.startWorkflow("drop-wf", { prompt: "first", maxRounds: 10, dryRun: false }); // now at limit — second call should be dropped mgr.startWorkflow("drop-wf", { prompt: "second", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); expect(mgr.activeCount("drop-wf")).toBe(1); expect(mgr.queueLength("drop-wf")).toBe(0); @@ -254,7 +263,7 @@ describe("WorkflowManager", () => { }); describe("completing a thread dequeues the next one", () => { - it("dispatches the next queued thread when the active thread sends completed", () => { + it("dispatches the next queued thread when the active thread sends completed", async () => { const logStore = makeLogStore(); const config = makeConfig({ "queue-wf": { concurrency: 1, overflow: "queue", maxQueue: 5 }, @@ -263,6 +272,7 @@ describe("WorkflowManager", () => { mgr.startWorkflow("queue-wf", { prompt: "first", maxRounds: 10, dryRun: false }); mgr.startWorkflow("queue-wf", { prompt: "second", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); expect(mgr.activeCount("queue-wf")).toBe(1); expect(mgr.queueLength("queue-wf")).toBe(1); @@ -289,7 +299,7 @@ describe("WorkflowManager", () => { ); }); - it("dispatches next queued thread when active thread sends failed", () => { + it("dispatches next queued thread when active thread sends failed", async () => { const logStore = makeLogStore(); const config = makeConfig({ "queue-wf": { concurrency: 1, overflow: "queue", maxQueue: 5 }, @@ -298,6 +308,7 @@ describe("WorkflowManager", () => { mgr.startWorkflow("queue-wf", { prompt: "first", maxRounds: 10, dryRun: false }); mgr.startWorkflow("queue-wf", { prompt: "second", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); const child = mockChildren[0]; const firstRunId = (child.send as ReturnType).mock.calls[0][0].runId as string; @@ -325,6 +336,7 @@ describe("WorkflowManager", () => { mgr.startWorkflow("wf-a", { prompt: "test", maxRounds: 10, dryRun: false }); mgr.startWorkflow("wf-b", { prompt: "test", maxRounds: 10, dryRun: false }); + await vi.runAllTimersAsync(); // Two distinct workers should have been forked expect(mockChildren).toHaveLength(2); diff --git a/packages/daemon/src/worker-pool.ts b/packages/daemon/src/worker-pool.ts index 9b4218b..67d68c8 100644 --- a/packages/daemon/src/worker-pool.ts +++ b/packages/daemon/src/worker-pool.ts @@ -66,6 +66,7 @@ export function createSenseWorkerPool(options: SenseWorkerPoolOptions): SenseWor onReady: (_key, msg) => { options.onWorkerMessage(msg); }, + onCrashLimitReached: null, onExit: (group, code, signal) => { const sig = signal === null || signal === undefined || signal === "" @@ -102,13 +103,13 @@ export function createSenseWorkerPool(options: SenseWorkerPoolOptions): SenseWor return; } options.onBeforeGroupRestart(group); - await runtime.drain(group); + await runtime.drain(group, null); } function evictGroup(group: string): void { trackedGroups.delete(group); evicting.add(group); - void runtime.evict(group).finally(() => { + void runtime.evict(group, null).finally(() => { evicting.delete(group); }); } diff --git a/packages/daemon/src/worker-runtime.ts b/packages/daemon/src/worker-runtime.ts index 2be1068..aaaff47 100644 --- a/packages/daemon/src/worker-runtime.ts +++ b/packages/daemon/src/worker-runtime.ts @@ -8,6 +8,10 @@ import { isPlainRecord } from "@uncaged/nerve-core"; const STDERR_TAIL_MAX_CHARS = 2048; +export type WorkerDrainOpts = { + shutdownTimeoutMs: number | null; +}; + export type WorkerRuntimeConfig = { script: string; argsForKey: (key: K) => string[]; @@ -16,6 +20,8 @@ export type WorkerRuntimeConfig = { onMessage: (key: K, msg: unknown) => void; onReady: (key: K, msg: unknown) => void; onExit: (key: K, code: number | null, signal: string | null) => void; + /** Invoked when automatic respawn is skipped because `maxCrashes` was exceeded in `windowMs`. */ + onCrashLimitReached: ((key: K) => void) | null; respawn: { enabled: boolean; maxCrashes: number; @@ -32,8 +38,8 @@ export type WorkerRuntime = { /** When the worker is already ready and IPC-connected, sends synchronously (returns true). Otherwise false — caller may fall back to `send`. */ trySendSync: (key: K, msg: unknown) => boolean; start: (key: K) => Promise; - evict: (key: K) => Promise; - drain: (key: K) => Promise; + evict: (key: K, opts: WorkerDrainOpts | null) => Promise; + drain: (key: K, opts: WorkerDrainOpts | null) => Promise; shutdown: () => Promise; has: (key: K) => boolean; /** True when a child exists but IPC is disconnected (legacy pool skipped sends in this case). */ @@ -264,7 +270,14 @@ export function createWorkerRuntime( await waitForReady(slot, config.shutdownTimeoutMs); } - async function gracefulStop(slot: WorkerSlot): Promise { + function resolveShutdownTimeoutMs(opts: WorkerDrainOpts | null): number { + if (opts !== null && opts.shutdownTimeoutMs !== null) { + return opts.shutdownTimeoutMs; + } + return config.shutdownTimeoutMs; + } + + async function gracefulStop(slot: WorkerSlot, shutdownTimeoutMs: number): Promise { if (slot.child === null) { return; } @@ -276,7 +289,7 @@ export function createWorkerRuntime( } catch { // IPC channel may have closed between null-check and send } - await waitForChildExit(child, config.shutdownTimeoutMs); + await waitForChildExit(child, shutdownTimeoutMs); } async function handleUnexpectedCrashRecovery(slot: WorkerSlot): Promise { @@ -295,6 +308,9 @@ export function createWorkerRuntime( console.error( `[WorkerRuntime] worker "${String(slot.key)}" exceeded crash limit (${String(config.respawn.maxCrashes)} in ${String(config.respawn.windowMs)}ms); not respawning`, ); + if (config.onCrashLimitReached !== null) { + config.onCrashLimitReached(slot.key); + } return; } @@ -303,7 +319,7 @@ export function createWorkerRuntime( } async function shutdownWorker(slot: WorkerSlot): Promise { - await gracefulStop(slot); + await gracefulStop(slot, config.shutdownTimeoutMs); workers.delete(slot.key); } @@ -348,22 +364,24 @@ export function createWorkerRuntime( }); }, - evict: async (key: K) => { + evict: async (key: K, opts: WorkerDrainOpts | null) => { const slot = getOrCreateSlot(key); + const shutdownMs = resolveShutdownTimeoutMs(opts); await enqueueOp(slot, async () => { - await gracefulStop(slot); + await gracefulStop(slot, shutdownMs); workers.delete(key); }); }, - drain: async (key: K) => { + drain: async (key: K, opts: WorkerDrainOpts | null) => { const slot = getOrCreateSlot(key); + const shutdownMs = resolveShutdownTimeoutMs(opts); await enqueueOp(slot, async () => { if (slot.child === null) { await forkAndWaitReady(slot); return; } - await gracefulStop(slot); + await gracefulStop(slot, shutdownMs); await forkAndWaitReady(slot); }); }, diff --git a/packages/daemon/src/workflow-manager-support.ts b/packages/daemon/src/workflow-manager-support.ts new file mode 100644 index 0000000..800030c --- /dev/null +++ b/packages/daemon/src/workflow-manager-support.ts @@ -0,0 +1,256 @@ +/** + * Pure helpers and IPC branching for workflow-manager (keeps workflow-manager.ts lean). + */ + +import { dirname, join } from "node:path"; +import { fileURLToPath } from "node:url"; + +import type { WorkflowMessage } from "@uncaged/nerve-core"; +import { START, isPlainRecord } from "@uncaged/nerve-core"; + +import type { LogStore, WorkflowRunStatus } from "@uncaged/nerve-store"; +import type { ResumeThreadMessage, ThreadEventMessage } from "./ipc.js"; +import type { WorkerToParentMessage } from "./ipc.js"; + +export type PendingThread = { + runId: string; + prompt: string; + maxRounds: number; + dryRun: boolean; +}; + +export type WorkflowState = { + active: Set; + queue: PendingThread[]; +}; + +/** Matches legacy manager: 6 crashes within 60s stops respawn (was `length > 5`). */ +export const WORKFLOW_WORKER_RESPAWN = { + enabled: true, + maxCrashes: 6, + windowMs: 60_000, + delayMs: 0, +} as const; + +/** + * Worker shutdown timeout — must stay in sync with SHUTDOWN_TIMEOUT_MS in workflow-worker.ts. + * The drain timeout passed to drainAndRespawn must be >= this value so the worker has + * enough time to finish in-flight threads before the parent force-kills it. + */ +export const WORKER_SHUTDOWN_TIMEOUT_MS = 10_000; + +export const DEFAULT_MAX_QUEUE = 100; + +export function readLaunchFromTriggerPayload( + raw: unknown, + engineDefaultMaxRounds: number, +): { prompt: string; maxRounds: number; dryRun: boolean } { + if (isPlainRecord(raw)) { + const o = raw; + if (typeof o.prompt === "string" && typeof o.maxRounds === "number") { + const dryRun = typeof o.dryRun === "boolean" ? o.dryRun : false; + return { prompt: o.prompt, maxRounds: o.maxRounds, dryRun }; + } + } + return { prompt: "", maxRounds: engineDefaultMaxRounds, dryRun: false }; +} + +export function ensureThreadMessagesWithStart( + messages: Array<{ role: string; content: string; meta: unknown; timestamp: number }>, + threadId: string, + fallbackPrompt: string, + fallbackMaxRounds: number, +): WorkflowMessage[] { + const mapped: WorkflowMessage[] = messages.map((m) => ({ + role: m.role, + content: m.content, + meta: m.meta, + timestamp: m.timestamp, + })); + if (mapped.length > 0 && mapped[0].role === START) { + return mapped; + } + const start: WorkflowMessage = { + role: START, + content: fallbackPrompt, + meta: { maxRounds: fallbackMaxRounds, threadId }, + timestamp: Date.now(), + }; + return [start, ...mapped]; +} + +export function resolveWorkflowWorkerScript(): string { + const __filename = fileURLToPath(import.meta.url); + const __dir = dirname(__filename); + return join(__dir, "workflow-worker.js"); +} + +export function mapWorkflowRunStatus(eventType: string): WorkflowRunStatus | null { + const map: Record = { + started: "started", + queued: "queued", + completed: "completed", + failed: "failed", + crashed: "crashed", + dropped: "dropped", + interrupted: "interrupted", + killed: "killed", + }; + return map[eventType] ?? null; +} + +export function extractExitCodeFromPayload(payload: unknown): number | null { + if (isPlainRecord(payload) && typeof payload.exitCode === "number") { + return payload.exitCode; + } + return null; +} + +export function appendWorkflowRunLog( + logStore: LogStore, + workflowName: string, + runId: string, + eventType: string, + payload: unknown | undefined, + exitCode: number | null, +): void { + const timestamp = Date.now(); + const serialised = payload !== undefined ? JSON.stringify(payload) : null; + const status = mapWorkflowRunStatus(eventType); + + if (status !== null) { + logStore.upsertWorkflowRun( + { + source: "workflow", + type: eventType, + refId: runId, + payload: serialised, + timestamp, + }, + { runId, workflow: workflowName, status, timestamp, exitCode }, + ); + } else { + logStore.append({ + source: "workflow", + type: eventType, + refId: runId, + payload: serialised, + timestamp, + }); + } +} + +export function recoverQueuedRun( + workflowName: string, + runId: string, + state: WorkflowState, + logStore: LogStore, + engineMaxRounds: number, +): void { + if (state.queue.some((q) => q.runId === runId)) return; + const launch = readLaunchFromTriggerPayload(logStore.getTriggerPayload(runId), engineMaxRounds); + state.queue.push({ + runId, + prompt: launch.prompt, + maxRounds: launch.maxRounds, + dryRun: launch.dryRun, + }); + process.stderr.write( + `[workflow-manager] crash-recovery: re-queued thread "${runId}" for "${workflowName}"\n`, + ); +} + +export function recoverStartedRun( + workflowName: string, + runId: string, + state: WorkflowState, + logStore: LogStore, + engineMaxRounds: number, + sendResume: (wf: string, msg: ResumeThreadMessage) => void, +): void { + if (state.active.has(runId)) return; + const rawMessages = logStore.getThreadMessages(runId); + const launch = readLaunchFromTriggerPayload(logStore.getTriggerPayload(runId), engineMaxRounds); + const messages = ensureThreadMessagesWithStart( + rawMessages, + runId, + launch.prompt, + launch.maxRounds, + ); + state.active.add(runId); + const msg: ResumeThreadMessage = { + type: "resume-thread", + runId, + messages, + maxRounds: launch.maxRounds, + dryRun: launch.dryRun, + }; + sendResume(workflowName, msg); + process.stderr.write( + `[workflow-manager] crash-recovery: resuming thread "${runId}" for "${workflowName}" (${String(messages.length)} messages)\n`, + ); +} + +export function recoverThreadsFromStore( + workflowName: string, + logStore: LogStore, + engineMaxRounds: number, + getOrCreateState: (name: string) => WorkflowState, + sendResume: (wf: string, msg: ResumeThreadMessage) => void, +): void { + const activeRuns = logStore.getActiveWorkflowRuns(workflowName); + const state = getOrCreateState(workflowName); + + for (const run of activeRuns) { + if (run.status === "queued") { + recoverQueuedRun(workflowName, run.runId, state, logStore, engineMaxRounds); + } else if (run.status === "started") { + recoverStartedRun(workflowName, run.runId, state, logStore, engineMaxRounds, sendResume); + } + } +} + +export type WorkflowManagerMessageDeps = { + logStore: LogStore; + handleThreadEvent: (workflowName: string, msg: ThreadEventMessage) => void; + onWorkflowRoleError: ( + workflowName: string, + runId: string, + error: string, + exitCode: number, + ) => void; +}; + +export function dispatchWorkflowWorkerMessage( + workflowName: string, + msg: WorkerToParentMessage, + deps: WorkflowManagerMessageDeps, +): void { + if (msg.type === "thread-event") { + deps.handleThreadEvent(workflowName, msg); + return; + } + + if (msg.type === "thread-workflow-message") { + deps.logStore.append({ + source: "workflow", + type: "thread_workflow_message", + refId: msg.runId, + payload: JSON.stringify(msg.message), + timestamp: Date.now(), + }); + return; + } + + if (msg.type === "workflow-error") { + process.stderr.write( + `[workflow-manager] workflow-error for runId "${msg.runId}" in "${workflowName}": ${msg.error}\n`, + ); + deps.onWorkflowRoleError(workflowName, msg.runId, msg.error, msg.exitCode); + return; + } + + if (msg.type === "error") { + process.stderr.write(`[workflow-manager] error from "${workflowName}" worker: ${msg.error}\n`); + } +} diff --git a/packages/daemon/src/workflow-manager.ts b/packages/daemon/src/workflow-manager.ts index 4c6b3ab..79c4578 100644 --- a/packages/daemon/src/workflow-manager.ts +++ b/packages/daemon/src/workflow-manager.ts @@ -6,33 +6,24 @@ * Concurrency and overflow (drop/queue) are enforced here in the parent process. */ -import { fork } from "node:child_process"; -import type { ChildProcess } from "node:child_process"; -import { dirname, join } from "node:path"; -import { fileURLToPath } from "node:url"; +import type { NerveConfig, WorkflowConfig, WorkflowStatus } from "@uncaged/nerve-core"; -import type { - NerveConfig, - WorkflowConfig, - WorkflowMessage, - WorkflowStatus, -} from "@uncaged/nerve-core"; -import { START, isPlainRecord } from "@uncaged/nerve-core"; - -import type { LogStore, WorkflowRunStatus } from "@uncaged/nerve-store"; -import type { - KillThreadMessage, - ResumeThreadMessage, - ShutdownMessage, - StartThreadMessage, - ThreadEventMessage, -} from "./ipc.js"; +import type { LogStore } from "@uncaged/nerve-store"; +import type { KillThreadMessage, StartThreadMessage, ThreadEventMessage } from "./ipc.js"; import { parseWorkerMessage } from "./ipc.js"; +import { formatCapturedStderrTail, formatChildExitSummary } from "./worker-fork-support.js"; +import { createWorkerRuntime } from "./worker-runtime.js"; import { - formatCapturedStderrTail, - formatChildExitSummary, - teeCapturedStderr, -} from "./worker-fork-support.js"; + DEFAULT_MAX_QUEUE, + WORKER_SHUTDOWN_TIMEOUT_MS, + WORKFLOW_WORKER_RESPAWN, + type WorkflowState, + appendWorkflowRunLog, + dispatchWorkflowWorkerMessage, + extractExitCodeFromPayload, + recoverThreadsFromStore, + resolveWorkflowWorkerScript, +} from "./workflow-manager-support.js"; export type WorkflowLaunchParams = { prompt: string; @@ -74,169 +65,107 @@ export type WorkflowManager = { stop: () => Promise; }; -type PendingThread = { - runId: string; - prompt: string; - maxRounds: number; - dryRun: boolean; -}; - -type WorkflowState = { - active: Set; - queue: PendingThread[]; -}; - -type WorkerEntry = { - workflowName: string; - process: ChildProcess; - stopping: boolean; - /** When set, the worker is draining before a hot-reload respawn. */ - draining: boolean; - stderrTail: { value: string }; -}; - -// Crash respawn backoff: track crash timestamps per workflow. -const MAX_CRASHES_IN_WINDOW = 5; -const CRASH_WINDOW_MS = 60_000; - -/** - * Worker shutdown timeout — must stay in sync with SHUTDOWN_TIMEOUT_MS in workflow-worker.ts. - * The drain timeout passed to drainAndRespawn must be >= this value so the worker has - * enough time to finish in-flight threads before the parent force-kills it. - */ -const WORKER_SHUTDOWN_TIMEOUT_MS = 10_000; - -const DEFAULT_MAX_QUEUE = 100; - -function readLaunchFromTriggerPayload( - raw: unknown, - engineDefaultMaxRounds: number, -): { prompt: string; maxRounds: number; dryRun: boolean } { - if (isPlainRecord(raw)) { - const o = raw; - if (typeof o.prompt === "string" && typeof o.maxRounds === "number") { - const dryRun = typeof o.dryRun === "boolean" ? o.dryRun : false; - return { prompt: o.prompt, maxRounds: o.maxRounds, dryRun }; - } - } - return { prompt: "", maxRounds: engineDefaultMaxRounds, dryRun: false }; -} - -function ensureThreadMessagesWithStart( - messages: Array<{ role: string; content: string; meta: unknown; timestamp: number }>, - threadId: string, - fallbackPrompt: string, - fallbackMaxRounds: number, -): WorkflowMessage[] { - const mapped: WorkflowMessage[] = messages.map((m) => ({ - role: m.role, - content: m.content, - meta: m.meta, - timestamp: m.timestamp, - })); - if (mapped.length > 0 && mapped[0].role === START) { - return mapped; - } - const start: WorkflowMessage = { - role: START, - content: fallbackPrompt, - meta: { maxRounds: fallbackMaxRounds, threadId }, - timestamp: Date.now(), - }; - return [start, ...mapped]; -} - -function resolveWorkerScript(): string { - const __filename = fileURLToPath(import.meta.url); - const __dir = dirname(__filename); - return join(__dir, "workflow-worker.js"); -} - -function spawnWorkflowWorker( - nerveRoot: string, - workflowName: string, - workerScript: string, - stderrTail: { value: string }, -): ChildProcess { - const child = fork(workerScript, ["--workflow", workflowName, "--root", nerveRoot], { - stdio: ["ignore", "inherit", "pipe", "ipc"], - }); - teeCapturedStderr(child, stderrTail); - // Prevent unhandled EPIPE when writing to a child whose IPC channel closed - child.on("error", (err) => { - if ((err as NodeJS.ErrnoException).code !== "EPIPE") { - console.error("[worker] error:", err.message); - } - }); - return child; -} - -function sendStartThread(worker: ChildProcess, msg: StartThreadMessage): void { - if (worker.connected === false) return; - try { - worker.send(msg); - } catch { - // IPC channel closed between connected check and send - } -} - -function sendShutdown(worker: ChildProcess, entry: WorkerEntry): void { - entry.stopping = true; - if (worker.connected === false) return; - const msg: ShutdownMessage = { type: "shutdown" }; - try { - worker.send(msg); - } catch { - // IPC channel closed between connected check and send - } -} - -function sendResumeThread(worker: ChildProcess, msg: ResumeThreadMessage): void { - if (worker.connected === false) return; - try { - worker.send(msg); - } catch { - // IPC channel closed between connected check and send - } -} - -function sendKillThread(worker: ChildProcess, runId: string): void { - if (worker.connected === false) return; - const msg: KillThreadMessage = { type: "kill-thread", runId }; - try { - worker.send(msg); - } catch { - // IPC channel closed between connected check and send - } -} - -function waitForExit(child: ChildProcess, timeoutMs: number): Promise { - return new Promise((resolve) => { - const timer = setTimeout(() => { - child.kill("SIGKILL"); - resolve(); - }, timeoutMs); - child.once("exit", () => { - clearTimeout(timer); - resolve(); - }); - }); -} - export function createWorkflowManager( nerveRoot: string, initialConfig: NerveConfig, logStore: LogStore, ): WorkflowManager { - const workerScript = resolveWorkerScript(); + const workerScript = resolveWorkflowWorkerScript(); + + /** + * Default drain timeout must be at least WORKER_SHUTDOWN_TIMEOUT_MS so the worker + * has enough time to finish in-flight threads before the parent force-kills it. + */ + const DEFAULT_DRAIN_TIMEOUT_MS = Math.max(30_000, WORKER_SHUTDOWN_TIMEOUT_MS + 5_000); const states = new Map(); - const workers = new Map(); - const crashTimestamps = new Map(); + const trackedWorkflows = new Set(); + const hotReloadEvicting = new Set(); + const crashRecoveryPending = new Set(); let stopped = false; let config = initialConfig; const pendingDrains = new Set(); + function logWorkflowEvent( + workflowName: string, + runId: string, + eventType: string, + payload?: unknown, + exitCode: number | null = null, + ): void { + appendWorkflowRunLog(logStore, workflowName, runId, eventType, payload, exitCode); + } + + const runtime = createWorkerRuntime({ + script: workerScript, + argsForKey: (workflowName) => ["--workflow", workflowName, "--root", nerveRoot], + forwardStderr: true, + onMessage: (workflowName, raw) => { + handleWorkerMessage(workflowName, raw); + }, + onReady: (workflowName, _msg) => { + if (crashRecoveryPending.has(workflowName)) { + crashRecoveryPending.delete(workflowName); + recoverThreadsFromStore( + workflowName, + logStore, + config.maxRounds, + getOrCreateState, + (wf, msg) => { + sendToWorker(wf, msg); + }, + ); + } + }, + onExit: (workflowName, code, signalStr) => { + const sig = + signalStr === null || signalStr === undefined || signalStr === "" + ? null + : (signalStr as NodeJS.Signals); + + if (hotReloadEvicting.has(workflowName)) { + hotReloadEvicting.delete(workflowName); + markActiveRunsInterrupted(workflowName); + if (!stopped && workflowConfig(workflowName) !== null) { + process.stderr.write( + `[workflow-manager] worker for "${workflowName}" drained, respawning\n`, + ); + } + return; + } + + if (stopped) { + const state = states.get(workflowName); + if (state !== undefined) { + state.active.clear(); + } + crashRecoveryPending.delete(workflowName); + return; + } + + const summary = formatChildExitSummary(code, sig); + const stderrExtra = formatCapturedStderrTail(runtime.stderrTail(workflowName)); + process.stderr.write( + `[workflow-manager] worker for "${workflowName}" exited (${summary})${stderrExtra}\n`, + ); + + cleanupAfterUnexpectedWorkerExit(workflowName); + crashRecoveryPending.add(workflowName); + }, + onCrashLimitReached: (workflowName) => { + crashRecoveryPending.delete(workflowName); + trackedWorkflows.delete(workflowName); + process.stderr.write( + `[workflow-manager] worker for "${workflowName}" exceeded crash limit (${String(WORKFLOW_WORKER_RESPAWN.maxCrashes)} in ${String(WORKFLOW_WORKER_RESPAWN.windowMs)}ms) — stopping respawn\n`, + ); + }, + respawn: { + ...WORKFLOW_WORKER_RESPAWN, + allowRespawn: (_wf) => !stopped, + }, + shutdownTimeoutMs: DEFAULT_DRAIN_TIMEOUT_MS, + }); + function getOrCreateState(workflowName: string): WorkflowState { let state = states.get(workflowName); if (state === undefined) { @@ -250,56 +179,15 @@ export function createWorkflowManager( return config.workflows[workflowName] ?? null; } - function toWorkflowRunStatus(eventType: string): WorkflowRunStatus | null { - const map: Record = { - started: "started", - queued: "queued", - completed: "completed", - failed: "failed", - crashed: "crashed", - dropped: "dropped", - interrupted: "interrupted", - killed: "killed", - }; - return map[eventType] ?? null; - } - - function extractExitCode(payload: unknown): number | null { - if (isPlainRecord(payload) && typeof payload.exitCode === "number") { - return payload.exitCode; + /** IPC send — matches legacy pool: no-op when IPC is disconnected; cold-start via WorkerRuntime.send. */ + function sendToWorker(workflowName: string, msg: unknown): void { + trackedWorkflows.add(workflowName); + if (runtime.hasDisconnectedChild(workflowName)) { + return; } - return null; - } - - function logWorkflowEvent( - workflowName: string, - runId: string, - eventType: string, - payload?: unknown, - exitCode: number | null = null, - ): void { - const timestamp = Date.now(); - const serialised = payload !== undefined ? JSON.stringify(payload) : null; - const status = toWorkflowRunStatus(eventType); - - if (status !== null) { - logStore.upsertWorkflowRun( - { - source: "workflow", - type: eventType, - refId: runId, - payload: serialised, - timestamp, - }, - { runId, workflow: workflowName, status, timestamp, exitCode }, - ); - } else { - logStore.append({ - source: "workflow", - type: eventType, - refId: runId, - payload: serialised, - timestamp, + if (!runtime.trySendSync(workflowName, msg)) { + void runtime.send(workflowName, msg).catch(() => { + // IPC channel may close between scheduling and send }); } } @@ -314,7 +202,6 @@ export function createWorkflowManager( const state = getOrCreateState(workflowName); state.active.add(runId); - const worker = getOrSpawnWorker(workflowName); const msg: StartThreadMessage = { type: "start-thread", runId, @@ -323,7 +210,7 @@ export function createWorkflowManager( maxRounds, dryRun, }; - sendStartThread(worker.process, msg); + sendToWorker(workflowName, msg); logWorkflowEvent(workflowName, runId, "started", { prompt, maxRounds, dryRun }); } @@ -367,92 +254,20 @@ export function createWorkflowManager( if (msg.eventType === "completed" || msg.eventType === "failed" || msg.eventType === "killed") { state.active.delete(msg.runId); dequeueNext(workflowName); - const exitCode = extractExitCode(msg.payload); + const exitCode = extractExitCodeFromPayload(msg.payload); logWorkflowEvent(workflowName, msg.runId, msg.eventType, msg.payload, exitCode); maybeDeferredHotReloadDrain(workflowName); } } - function recoverQueuedRun(workflowName: string, runId: string, state: WorkflowState): void { - if (state.queue.some((q) => q.runId === runId)) return; - const launch = readLaunchFromTriggerPayload( - logStore.getTriggerPayload(runId), - config.maxRounds, - ); - state.queue.push({ - runId, - prompt: launch.prompt, - maxRounds: launch.maxRounds, - dryRun: launch.dryRun, - }); - process.stderr.write( - `[workflow-manager] crash-recovery: re-queued thread "${runId}" for "${workflowName}"\n`, - ); - } - - function recoverStartedRun( - workflowName: string, - runId: string, - state: WorkflowState, - worker: WorkerEntry, - ): void { - if (state.active.has(runId)) return; - const rawMessages = logStore.getThreadMessages(runId); - const launch = readLaunchFromTriggerPayload( - logStore.getTriggerPayload(runId), - config.maxRounds, - ); - const messages = ensureThreadMessagesWithStart( - rawMessages, - runId, - launch.prompt, - launch.maxRounds, - ); - state.active.add(runId); - const msg: ResumeThreadMessage = { - type: "resume-thread", - runId, - messages, - maxRounds: launch.maxRounds, - dryRun: launch.dryRun, - }; - sendResumeThread(worker.process, msg); - process.stderr.write( - `[workflow-manager] crash-recovery: resuming thread "${runId}" for "${workflowName}" (${messages.length} messages)\n`, - ); - } - - function recoverThreadsForWorker(workflowName: string, worker: WorkerEntry): void { - const activeRuns = logStore.getActiveWorkflowRuns(workflowName); - const state = getOrCreateState(workflowName); - - for (const run of activeRuns) { - if (run.status === "queued") { - recoverQueuedRun(workflowName, run.runId, state); - } else if (run.status === "started") { - recoverStartedRun(workflowName, run.runId, state, worker); - } - } - } - - function recordCrashAndCheckLimit(workflowName: string): boolean { - const now = Date.now(); - const timestamps = (crashTimestamps.get(workflowName) ?? []).filter( - (t) => now - t < CRASH_WINDOW_MS, - ); - timestamps.push(now); - crashTimestamps.set(workflowName, timestamps); - return timestamps.length > MAX_CRASHES_IN_WINDOW; - } - - function handleWorkerCrash(workflowName: string): void { + function cleanupAfterUnexpectedWorkerExit(workflowName: string): void { const state = states.get(workflowName); if (state === undefined) return; const crashedCount = state.active.size; if (crashedCount > 0) { process.stderr.write( - `[workflow-manager] worker for "${workflowName}" crashed with ${crashedCount} active thread(s)\n`, + `[workflow-manager] worker for "${workflowName}" crashed with ${String(crashedCount)} active thread(s)\n`, ); for (const runId of state.active) { logWorkflowEvent(workflowName, runId, "crashed", undefined, 255); @@ -460,26 +275,13 @@ export function createWorkflowManager( } state.active.clear(); - workers.delete(workflowName); pendingDrains.delete(workflowName); - if (stopped || workflowConfig(workflowName) === null) return; - - if (recordCrashAndCheckLimit(workflowName)) { - const count = crashTimestamps.get(workflowName)?.length ?? 0; + if (!stopped && workflowConfig(workflowName) !== null) { process.stderr.write( - `[workflow-manager] worker for "${workflowName}" crashed ${count} times in ${CRASH_WINDOW_MS}ms — stopping respawn\n`, + `[workflow-manager] respawning worker for "${workflowName}" after crash\n`, ); - return; } - - process.stderr.write( - `[workflow-manager] respawning worker for "${workflowName}" after crash\n`, - ); - const newWorker = getOrSpawnWorker(workflowName); - setImmediate(() => { - recoverThreadsForWorker(workflowName, newWorker); - }); } function handleWorkerMessage(workflowName: string, raw: unknown): void { @@ -490,43 +292,19 @@ export function createWorkflowManager( ); return; } - const msg = result.value; - - if (msg.type === "thread-event") { - handleThreadEvent(workflowName, msg); - return; - } - - if (msg.type === "thread-workflow-message") { - logStore.append({ - source: "workflow", - type: "thread_workflow_message", - refId: msg.runId, - payload: JSON.stringify(msg.message), - timestamp: Date.now(), - }); - return; - } - - if (msg.type === "workflow-error") { - process.stderr.write( - `[workflow-manager] workflow-error for runId "${msg.runId}" in "${workflowName}": ${msg.error}\n`, - ); - const state = states.get(workflowName); - if (state !== undefined) { - state.active.delete(msg.runId); - dequeueNext(workflowName); - } - logWorkflowEvent(workflowName, msg.runId, "failed", { error: msg.error }, msg.exitCode); - maybeDeferredHotReloadDrain(workflowName); - return; - } - - if (msg.type === "error") { - process.stderr.write( - `[workflow-manager] error from "${workflowName}" worker: ${msg.error}\n`, - ); - } + dispatchWorkflowWorkerMessage(workflowName, result.value, { + logStore, + handleThreadEvent, + onWorkflowRoleError: (wf, runId, error, exitCode) => { + const state = states.get(wf); + if (state !== undefined) { + state.active.delete(runId); + dequeueNext(wf); + } + logWorkflowEvent(wf, runId, "failed", { error }, exitCode); + maybeDeferredHotReloadDrain(wf); + }, + }); } function markActiveRunsInterrupted(workflowName: string): void { @@ -538,67 +316,6 @@ export function createWorkflowManager( state.active.clear(); } - function handleWorkerExit( - workflowName: string, - code: number | null, - signal: NodeJS.Signals | null, - ): void { - const entry = workers.get(workflowName); - if (entry?.draining) { - workers.delete(workflowName); - markActiveRunsInterrupted(workflowName); - if (!stopped && workflowConfig(workflowName) !== null) { - process.stderr.write( - `[workflow-manager] worker for "${workflowName}" drained, respawning\n`, - ); - getOrSpawnWorker(workflowName); - } - return; - } - if (entry?.stopping) { - workers.delete(workflowName); - const state = states.get(workflowName); - if (state !== undefined) { - state.active.clear(); - } - return; - } - const summary = formatChildExitSummary(code, signal); - const stderrExtra = entry !== undefined ? formatCapturedStderrTail(entry.stderrTail.value) : ""; - process.stderr.write( - `[workflow-manager] worker for "${workflowName}" exited (${summary})${stderrExtra}\n`, - ); - handleWorkerCrash(workflowName); - } - - function getOrSpawnWorker(workflowName: string): WorkerEntry { - const existing = workers.get(workflowName); - if (existing !== undefined && existing.process.exitCode === null) { - return existing; - } - - const stderrTail = { value: "" }; - const child = spawnWorkflowWorker(nerveRoot, workflowName, workerScript, stderrTail); - - child.on("message", (raw: unknown) => { - handleWorkerMessage(workflowName, raw); - }); - - child.on("exit", (code, signal) => { - handleWorkerExit(workflowName, code, signal ?? null); - }); - - const entry: WorkerEntry = { - workflowName, - process: child, - stopping: false, - draining: false, - stderrTail, - }; - workers.set(workflowName, entry); - return entry; - } - function killThread(runId: string): boolean { for (const [workflowName, state] of states) { const queueIdx = state.queue.findIndex((q) => q.runId === runId); @@ -609,10 +326,8 @@ export function createWorkflowManager( } if (state.active.has(runId)) { - const workerEntry = workers.get(workflowName); - if (workerEntry !== undefined) { - sendKillThread(workerEntry.process, runId); - } + const msg: KillThreadMessage = { type: "kill-thread", runId }; + sendToWorker(workflowName, msg); return true; } } @@ -663,7 +378,7 @@ export function createWorkflowManager( state.queue.push({ runId, prompt, maxRounds, dryRun }); logWorkflowEvent(workflowName, runId, "queued"); process.stderr.write( - `[workflow-manager] queued thread for "${workflowName}" runId "${runId}" (queue length: ${state.queue.length})\n`, + `[workflow-manager] queued thread for "${workflowName}" runId "${runId}" (queue length: ${String(state.queue.length)})\n`, ); } @@ -707,35 +422,29 @@ export function createWorkflowManager( config = newConfig; } - /** - * Default drain timeout must be at least WORKER_SHUTDOWN_TIMEOUT_MS so the worker - * has enough time to finish in-flight threads before the parent force-kills it. - */ - const DEFAULT_DRAIN_TIMEOUT_MS = Math.max(30_000, WORKER_SHUTDOWN_TIMEOUT_MS + 5_000); - async function drainAndRespawn( workflowName: string, drainTimeoutMs: number = DEFAULT_DRAIN_TIMEOUT_MS, ): Promise { - const entry = workers.get(workflowName); - if (entry === undefined) { - // No active worker — nothing to drain + if (!trackedWorkflows.has(workflowName)) { return; } - entry.draining = true; - // Send shutdown without setting stopping=true (so the exit handler uses the draining branch) - if (entry.process.connected) { - const msg: ShutdownMessage = { type: "shutdown" }; - try { - entry.process.send(msg); - } catch { - // IPC closed + const shutdownMs = Math.max(drainTimeoutMs, WORKER_SHUTDOWN_TIMEOUT_MS); + hotReloadEvicting.add(workflowName); + try { + await runtime.evict(workflowName, { shutdownTimeoutMs: shutdownMs }); + trackedWorkflows.delete(workflowName); + + if (!stopped && workflowConfig(workflowName) !== null) { + trackedWorkflows.add(workflowName); + await runtime.start(workflowName); } + } finally { + hotReloadEvicting.delete(workflowName); } - await waitForExit(entry.process, drainTimeoutMs); - // The exit handler (draining branch) will respawn the worker automatically } + function drainWhenIdle(workflowName: string): void { const state = states.get(workflowName); const hasActiveRuns = state !== undefined && state.active.size > 0; @@ -761,20 +470,17 @@ export function createWorkflowManager( pendingDrains.add(workflowName); process.stderr.write( - `[workflow-manager] deferring hot-reload for "${workflowName}" until ${state.active.size} active run(s) complete\n`, + `[workflow-manager] deferring hot-reload for "${workflowName}" until ${String(state.active.size)} active run(s) complete\n`, ); } async function stop(): Promise { stopped = true; pendingDrains.clear(); - const exitPromises: Promise[] = []; - for (const entry of workers.values()) { - sendShutdown(entry.process, entry); - exitPromises.push(waitForExit(entry.process, 5000)); - } - await Promise.all(exitPromises); - workers.clear(); + hotReloadEvicting.clear(); + crashRecoveryPending.clear(); + await runtime.shutdown(); + trackedWorkflows.clear(); } return {