diff --git a/packages/daemon/src/__tests__/fixtures/crash-worker.js b/packages/daemon/src/__tests__/fixtures/crash-worker.js new file mode 100644 index 0000000..c9bd61e --- /dev/null +++ b/packages/daemon/src/__tests__/fixtures/crash-worker.js @@ -0,0 +1,9 @@ +// Ready then crashes on a timer; still echoes IPC so parent tests can send after respawn +process.on("message", (msg) => { + if (msg && msg.type === "shutdown") { + process.exit(0); + } + process.send({ type: "echo", payload: msg }); +}); +process.send({ type: "ready" }); +setTimeout(() => process.exit(1), 50); diff --git a/packages/daemon/src/__tests__/fixtures/echo-worker.js b/packages/daemon/src/__tests__/fixtures/echo-worker.js new file mode 100644 index 0000000..990953d --- /dev/null +++ b/packages/daemon/src/__tests__/fixtures/echo-worker.js @@ -0,0 +1,9 @@ +// Simple test worker: sends ready, echoes messages, handles shutdown +process.on("message", (msg) => { + if (msg && msg.type === "shutdown") { + process.exit(0); + } + // Echo back with 'echo' type + process.send({ type: "echo", payload: msg }); +}); +process.send({ type: "ready" }); diff --git a/packages/daemon/src/__tests__/fixtures/stderr-worker.js b/packages/daemon/src/__tests__/fixtures/stderr-worker.js new file mode 100644 index 0000000..127850c --- /dev/null +++ b/packages/daemon/src/__tests__/fixtures/stderr-worker.js @@ -0,0 +1,9 @@ +// Like echo-worker but writes stderr for tail diagnostics +console.error("stderr-marker"); +process.on("message", (msg) => { + if (msg && msg.type === "shutdown") { + process.exit(0); + } + process.send({ type: "echo", payload: msg }); +}); +process.send({ type: "ready" }); diff --git a/packages/daemon/src/__tests__/kernel-phase6.test.ts b/packages/daemon/src/__tests__/kernel-phase6.test.ts index a2e0b27..cdc66ff 100644 --- a/packages/daemon/src/__tests__/kernel-phase6.test.ts +++ b/packages/daemon/src/__tests__/kernel-phase6.test.ts @@ -70,6 +70,14 @@ const { createKernel } = await import("../kernel.js"); // Helpers // --------------------------------------------------------------------------- +/** Sense worker `fork` runs on the next microtask per scheduled `start`. */ +async function flushSenseWorkerForkMicrotasks(kernel: { groups: Set }): Promise { + const n = kernel.groups.size; + for (let i = 0; i < n; i++) { + await Promise.resolve(); + } +} + function makeConfig(overrides: Partial = {}): NerveConfig { return { senses: { @@ -142,6 +150,8 @@ describe("kernel — getHealth", () => { }, }); const kernel = createKernel(config, nerveRoot); + await flushSenseWorkerForkMicrotasks(kernel); + await vi.runAllTimersAsync(); const health = kernel.getHealth(); expect(health.activeSenses).toBe(3); @@ -171,6 +181,8 @@ describe("kernel — restartGroup", () => { it("sends shutdown to old worker and spawns new one", async () => { const config = makeConfig(); const kernel = createKernel(config, nerveRoot); + await flushSenseWorkerForkMicrotasks(kernel); + await vi.runAllTimersAsync(); expect(mockChildren.length).toBe(1); const oldChild = mockChildren[0]; @@ -178,6 +190,7 @@ describe("kernel — restartGroup", () => { const restartPromise = kernel.restartGroup("system"); // The shutdown message triggers exit in the mock await restartPromise; + await vi.runAllTimersAsync(); // A new child should have been spawned expect(mockChildren.length).toBe(2); @@ -191,6 +204,8 @@ describe("kernel — restartGroup", () => { it("restartGroup on unknown group does nothing", async () => { const config = makeConfig(); const kernel = createKernel(config, nerveRoot); + await flushSenseWorkerForkMicrotasks(kernel); + await vi.runAllTimersAsync(); expect(mockChildren.length).toBe(1); await kernel.restartGroup("nonexistent"); @@ -218,6 +233,8 @@ describe("kernel — reloadConfig", () => { it("adds new group worker when new sense group appears", async () => { const config = makeConfig(); const kernel = createKernel(config, nerveRoot); + await flushSenseWorkerForkMicrotasks(kernel); + await vi.runAllTimersAsync(); expect(mockChildren.length).toBe(1); // only system group expect(kernel.groups.has("network")).toBe(false); @@ -249,6 +266,9 @@ describe("kernel — reloadConfig", () => { api: { port: null, token: null, host: "127.0.0.1" }, }); + await Promise.resolve(); + await vi.runAllTimersAsync(); + expect(kernel.groups.has("network")).toBe(true); expect(mockChildren.length).toBe(2); // system + network @@ -283,6 +303,8 @@ describe("kernel — reloadConfig", () => { api: { port: null, token: null, host: "127.0.0.1" }, }; const kernel = createKernel(config, nerveRoot); + await flushSenseWorkerForkMicrotasks(kernel); + await vi.runAllTimersAsync(); expect(mockChildren.length).toBe(2); expect(kernel.groups.has("network")).toBe(true); @@ -308,6 +330,7 @@ describe("kernel — reloadConfig", () => { }); expect(kernel.groups.has("network")).toBe(false); + await vi.runAllTimersAsync(); // Network child should have received shutdown expect(networkChild.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" })); @@ -317,6 +340,8 @@ describe("kernel — reloadConfig", () => { it("health reflects updated sense count after reloadConfig", async () => { const config = makeConfig(); const kernel = createKernel(config, nerveRoot); + await flushSenseWorkerForkMicrotasks(kernel); + await vi.runAllTimersAsync(); expect(kernel.getHealth().activeSenses).toBe(1); diff --git a/packages/daemon/src/__tests__/kernel-trigger-sense.test.ts b/packages/daemon/src/__tests__/kernel-trigger-sense.test.ts index c3bf72b..9688a7a 100644 --- a/packages/daemon/src/__tests__/kernel-trigger-sense.test.ts +++ b/packages/daemon/src/__tests__/kernel-trigger-sense.test.ts @@ -29,6 +29,9 @@ type MockChild = EventEmitter & { function makeMockChild(pid = 1): MockChild { const child = new EventEmitter() as MockChild; child.connected = true; + setImmediate(() => { + child.emit("message", { type: "ready" }); + }); child.send = vi.fn((msg: unknown) => { if ( msg !== null && @@ -136,6 +139,7 @@ describe("kernel.triggerSense()", () => { logStore: makeMockLogStore() as never, }); + await vi.runAllTimersAsync(); expect(() => kernel.triggerSense("no-such-sense")).toThrow(/Unknown sense/); await kernel.stop(); @@ -169,6 +173,7 @@ describe("kernel.triggerSense()", () => { logStore: makeMockLogStore() as never, }); + await vi.runAllTimersAsync(); // Two groups → two workers expect(mockChildren.length).toBe(2); @@ -214,6 +219,7 @@ describe("kernel.triggerSense()", () => { logStore: makeMockLogStore() as never, }); + await vi.runAllTimersAsync(); // Both senses share the "system" group → one worker only expect(mockChildren.length).toBe(1); const worker = mockChildren[0]; @@ -237,6 +243,7 @@ describe("kernel.triggerSense()", () => { logStore: makeMockLogStore() as never, }); + await new Promise((resolve) => setImmediate(resolve)); const worker = mockChildren[0]; worker.connected = false; diff --git a/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts b/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts index dadcd50..a5579b6 100644 --- a/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts +++ b/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts @@ -102,6 +102,13 @@ function makeLogStore() { }; } +async function flushSenseWorkerForkMicrotasks(kernel: { groups: Set }): Promise { + const n = kernel.groups.size; + for (let i = 0; i < n; i++) { + await Promise.resolve(); + } +} + function makeConfig(overrides: Partial = {}): NerveConfig { return { senses: { @@ -164,6 +171,8 @@ describe("kernel + workflowManager integration", () => { workerScript: "fake-worker.js", logStore, }); + await flushSenseWorkerForkMicrotasks(kernel); + await vi.runAllTimersAsync(); // Simulate a sense worker sending a signal with workflow launch payload // The kernel's handleWorkerMessage processes "signal" type messages @@ -222,6 +231,8 @@ describe("kernel + workflowManager integration", () => { workerScript: "fake-worker.js", logStore, }); + await flushSenseWorkerForkMicrotasks(kernel); + await vi.runAllTimersAsync(); // Simulate sense worker returning a signal plus workflow launch const workerPool = mockChildren[0]; @@ -275,6 +286,8 @@ describe("kernel + workflowManager integration", () => { workerScript: "fake-worker.js", logStore, }); + await flushSenseWorkerForkMicrotasks(kernel); + await vi.runAllTimersAsync(); const workerPool = mockChildren[0]; if (workerPool) { @@ -337,6 +350,8 @@ describe("kernel + workflowManager integration", () => { workerScript: "fake-worker.js", logStore, }); + await flushSenseWorkerForkMicrotasks(kernel); + await vi.runAllTimersAsync(); // Emit a regular signal (shorthand payload) — should NOT trigger any workflow const workerPool = mockChildren[0]; @@ -387,6 +402,8 @@ describe("kernel + workflowManager integration", () => { workerScript: "fake-worker.js", logStore, }); + await flushSenseWorkerForkMicrotasks(kernel); + await vi.runAllTimersAsync(); // Simulate sense compute returning a signal plus workflow launch const workerPool = mockChildren[0]; @@ -440,6 +457,8 @@ describe("kernel + workflowManager integration", () => { workerScript: "fake-worker.js", logStore, }); + await flushSenseWorkerForkMicrotasks(kernel); + await vi.runAllTimersAsync(); // Reload with a workflow added const newConfig: NerveConfig = { @@ -517,6 +536,8 @@ describe("kernel + workflowManager integration", () => { workerScript: "fake-worker.js", logStore, }); + await flushSenseWorkerForkMicrotasks(kernel); + await vi.runAllTimersAsync(); // Reload with the workflow removed const newConfig: NerveConfig = { @@ -600,6 +621,8 @@ describe("kernel + workflowManager integration", () => { workerScript: "fake-worker.js", logStore, }); + await flushSenseWorkerForkMicrotasks(kernel); + await vi.runAllTimersAsync(); // Trigger a workflow via sense compute return value const workerPool = mockChildren[0]; @@ -664,6 +687,8 @@ describe("kernel + workflowManager integration", () => { workerScript: "fake-worker.js", logStore, }); + await flushSenseWorkerForkMicrotasks(kernel); + await vi.runAllTimersAsync(); const health = kernel.getHealth(); expect(health).toHaveProperty("activeWorkflows"); diff --git a/packages/daemon/src/__tests__/kernel.test.ts b/packages/daemon/src/__tests__/kernel.test.ts index 65e0fb1..591f795 100644 --- a/packages/daemon/src/__tests__/kernel.test.ts +++ b/packages/daemon/src/__tests__/kernel.test.ts @@ -16,10 +16,12 @@ type MockChild = EventEmitter & { send: ReturnType; kill: ReturnType; pid: number; + connected: boolean; }; function makeMockChild(pid = 1): MockChild { const child = new EventEmitter() as MockChild; + child.connected = true; setImmediate(() => { child.emit("message", { type: "ready" }); }); @@ -27,7 +29,10 @@ function makeMockChild(pid = 1): MockChild { if (msg === null || typeof msg !== "object") return; const m = msg as Record; if (m.type === "shutdown") { - setImmediate(() => child.emit("exit", 0, null)); + setImmediate(() => { + child.connected = false; + child.emit("exit", 0, null); + }); return; } if (m.type === "compute" && typeof m.sense === "string") { @@ -37,6 +42,7 @@ function makeMockChild(pid = 1): MockChild { } }); child.kill = vi.fn((_signal?: string) => { + child.connected = false; child.emit("exit", null, _signal ?? "SIGKILL"); }); child.pid = pid; @@ -59,6 +65,14 @@ const { createLogStore } = await import("@uncaged/nerve-store"); // Helpers // --------------------------------------------------------------------------- +/** `WorkerRuntime.start` schedules `fork` on the next microtask — flush one tick per initial group. */ +async function flushSenseWorkerForkMicrotasks(kernel: { groups: Set }): Promise { + const n = kernel.groups.size; + for (let i = 0; i < n; i++) { + await Promise.resolve(); + } +} + function makeConfig(overrides: Partial = {}): NerveConfig { return { senses: { @@ -173,6 +187,7 @@ describe("kernel — message routing", () => { }, }); const kernel = createKernel(config, nerveRoot); + await flushSenseWorkerForkMicrotasks(kernel); const child = mockChildren[0]; child.emit("message", { type: "error", sense: "cpu-usage", error: "compute failed" }); @@ -201,6 +216,7 @@ describe("kernel — message routing", () => { }, }); const kernel = createKernel(config, nerveRoot); + await flushSenseWorkerForkMicrotasks(kernel); const child = mockChildren[0]; const callsBefore = stderrSpy.mock.calls.length; @@ -228,6 +244,7 @@ describe("kernel — message routing", () => { }, }); const kernel = createKernel(config, nerveRoot); + await flushSenseWorkerForkMicrotasks(kernel); const child = mockChildren[0]; expect(() => child.emit("message", { type: "unknown-type" })).not.toThrow(); @@ -290,6 +307,7 @@ describe("kernel — groupForSense mapping", () => { api: { port: null, token: null, host: "127.0.0.1" }, }; const kernel = createKernel(config, nerveRoot); + await flushSenseWorkerForkMicrotasks(kernel); // system and network = 2 unique groups expect(mockChildren.length).toBe(2); @@ -311,8 +329,10 @@ describe("kernel — groupForSense mapping", () => { }, }); const kernel = createKernel(config, nerveRoot); - + await flushSenseWorkerForkMicrotasks(kernel); const child = mockChildren[0]; + child.emit("message", { type: "ready" }); + vi.advanceTimersByTime(500); expect(child.send).toHaveBeenCalledWith( diff --git a/packages/daemon/src/__tests__/worker-pool.test.ts b/packages/daemon/src/__tests__/worker-pool.test.ts index 6d2b9a8..3f1347e 100644 --- a/packages/daemon/src/__tests__/worker-pool.test.ts +++ b/packages/daemon/src/__tests__/worker-pool.test.ts @@ -50,6 +50,7 @@ async function startWorkerWithReady( group: string, ): Promise { const pr = pool.startWorker(group); + await Promise.resolve(); const child = mockChildren[mockChildren.length - 1]; child.emit("message", { type: "ready" }); await pr; @@ -137,6 +138,7 @@ describe("createSenseWorkerPool", () => { expect(pool.activeGroupCount()).toBe(1); pool.evictGroup("x"); expect(pool.hasWorkerForGroup("x")).toBe(false); + await Promise.resolve(); expect(mockChildren[0].send).toHaveBeenCalledWith( expect.objectContaining({ type: "shutdown" }), ); @@ -159,6 +161,7 @@ describe("createSenseWorkerPool", () => { const p = pool.restartGroup("g"); expect(onBeforeGroupRestart).toHaveBeenCalledWith("g"); + await Promise.resolve(); expect(mockChildren[0].send).toHaveBeenCalledWith( expect.objectContaining({ type: "shutdown" }), ); @@ -171,7 +174,7 @@ describe("createSenseWorkerPool", () => { }); it("onWorkerCrashed runs and schedules respawn after non-zero exit", async () => { - vi.useFakeTimers({ shouldAdvanceTime: true }); + vi.useFakeTimers(); const onWorkerCrashed = vi.fn(); const pool = createSenseWorkerPool({ nerveRoot: "/tmp/n", diff --git a/packages/daemon/src/__tests__/worker-runtime.test.ts b/packages/daemon/src/__tests__/worker-runtime.test.ts new file mode 100644 index 0000000..a428bff --- /dev/null +++ b/packages/daemon/src/__tests__/worker-runtime.test.ts @@ -0,0 +1,180 @@ +import { dirname, join } from "node:path"; +import { fileURLToPath } from "node:url"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { createWorkerRuntime } from "../worker-runtime.js"; + +const fixturesDir = join(dirname(fileURLToPath(import.meta.url)), "fixtures"); +const echoWorkerPath = join(fixturesDir, "echo-worker.js"); +const crashWorkerPath = join(fixturesDir, "crash-worker.js"); +const stderrWorkerPath = join(fixturesDir, "stderr-worker.js"); + +function baseConfig(script: string) { + return { + script, + argsForKey: () => [], + forwardStderr: true, + onMessage: vi.fn(), + onReady: vi.fn(), + onExit: vi.fn(), + respawn: { + enabled: true, + maxCrashes: 6, + windowMs: 60_000, + delayMs: 80, + allowRespawn: null, + }, + shutdownTimeoutMs: 5000, + }; +} + +describe("createWorkerRuntime", () => { + const runtimes: Array<{ shutdown: () => Promise }> = []; + + afterEach(async () => { + await Promise.all(runtimes.splice(0).map((r) => r.shutdown())); + }); + + function track Promise }>(r: R): R { + runtimes.push(r); + return r; + } + + it("start + send message + receive echo", async () => { + const incoming: unknown[] = []; + const rt = track( + createWorkerRuntime({ + ...baseConfig(echoWorkerPath), + onMessage: (_key, msg) => { + incoming.push(msg); + }, + }), + ); + + await rt.start("a"); + expect(rt.has("a")).toBe(true); + await rt.send("a", { type: "ping", n: 1 }); + + await vi.waitFor(() => { + expect(incoming.some((m) => isEchoOf(m, { type: "ping", n: 1 }))).toBe(true); + }); + await rt.shutdown(); + }); + + it("cold start on send (no explicit start)", async () => { + const incoming: unknown[] = []; + const rt = track( + createWorkerRuntime({ + ...baseConfig(echoWorkerPath), + onMessage: (_key, msg) => { + incoming.push(msg); + }, + }), + ); + + expect(rt.has("x")).toBe(false); + await rt.send("x", { type: "hi" }); + await vi.waitFor(() => { + expect(rt.has("x")).toBe(true); + expect(incoming.some((m) => isEchoOf(m, { type: "hi" }))).toBe(true); + }); + await rt.shutdown(); + }); + + it("evict stops worker; has() is false", async () => { + const rt = track(createWorkerRuntime(baseConfig(echoWorkerPath))); + await rt.start("k"); + expect(rt.has("k")).toBe(true); + await rt.evict("k"); + expect(rt.has("k")).toBe(false); + await rt.shutdown(); + }); + + it("drain stops and respawns (new pid)", async () => { + const rt = track(createWorkerRuntime(baseConfig(echoWorkerPath))); + await rt.start("k"); + const before = rt.pid("k"); + expect(before).not.toBeNull(); + await rt.drain("k"); + const after = rt.pid("k"); + expect(after).not.toBeNull(); + expect(after).not.toBe(before); + await rt.shutdown(); + }); + + it("crash triggers auto-respawn", async () => { + const incoming: unknown[] = []; + const onExit = vi.fn(); + const rt = track( + createWorkerRuntime({ + ...baseConfig(crashWorkerPath), + onExit, + onMessage: (_key, msg) => { + incoming.push(msg); + }, + }), + ); + + await rt.start("c"); + + await vi.waitFor(() => expect(onExit.mock.calls.length).toBeGreaterThanOrEqual(1), { + timeout: 3000, + }); + await vi.waitFor(() => expect(rt.has("c")).toBe(true), { timeout: 3000 }); + + await rt.send("c", { type: "after-crash" }); + await vi.waitFor(() => { + expect(incoming.some((m) => isEchoOf(m, { type: "after-crash" }))).toBe(true); + }); + await rt.shutdown(); + }); + + it("crash limit reached → no more automatic respawns", async () => { + const rt = track( + createWorkerRuntime({ + ...baseConfig(crashWorkerPath), + respawn: { + enabled: true, + maxCrashes: 2, + windowMs: 60_000, + delayMs: 50, + allowRespawn: null, + }, + }), + ); + + await rt.start("z"); + + await vi.waitFor(() => expect(rt.has("z")).toBe(false), { timeout: 8000 }); + + await rt.shutdown(); + }); + + it("shutdown stops all workers", async () => { + const rt = track(createWorkerRuntime(baseConfig(echoWorkerPath))); + await rt.start("a"); + await rt.start("b"); + expect(rt.keys().sort()).toEqual(["a", "b"].sort()); + await rt.shutdown(); + expect(rt.keys()).toEqual([]); + expect(rt.has("a")).toBe(false); + expect(rt.has("b")).toBe(false); + }); + + it("stderrTail captures stderr output", async () => { + const rt = track(createWorkerRuntime(baseConfig(stderrWorkerPath))); + await rt.start("s"); + await vi.waitFor(() => { + expect(rt.stderrTail("s")).toContain("stderr-marker"); + }); + await rt.shutdown(); + }); +}); + +function isEchoOf(msg: unknown, payload: unknown): boolean { + return ( + typeof msg === "object" && + msg !== null && + (msg as Record).type === "echo" && + JSON.stringify((msg as Record).payload) === JSON.stringify(payload) + ); +} diff --git a/packages/daemon/src/worker-pool.ts b/packages/daemon/src/worker-pool.ts index 60a2d0a..9b4218b 100644 --- a/packages/daemon/src/worker-pool.ts +++ b/packages/daemon/src/worker-pool.ts @@ -1,19 +1,13 @@ /** - * Sense worker pool — forked child processes per sense group (IPC lifecycle). + * Sense worker pool — thin wrapper around WorkerRuntime (RFC-006): one fork per sense group. */ -import { fork } from "node:child_process"; -import type { ChildProcess } from "node:child_process"; import { dirname, join } from "node:path"; import { fileURLToPath } from "node:url"; -import type { ComputeMessage, ShutdownMessage } from "./ipc.js"; -import { parseWorkerMessage } from "./ipc.js"; -import { - formatCapturedStderrTail, - formatChildExitSummary, - teeCapturedStderr, -} from "./worker-fork-support.js"; +import type { ComputeMessage } from "./ipc.js"; +import { formatCapturedStderrTail, formatChildExitSummary } from "./worker-fork-support.js"; +import { createWorkerRuntime } from "./worker-runtime.js"; export function resolveWorkerScript(): string { const __filename = fileURLToPath(import.meta.url); @@ -21,17 +15,12 @@ export function resolveWorkerScript(): string { return join(__dir, "sense-worker.js"); } -type WorkerEntry = { - group: string; - process: ChildProcess; -}; - export type SenseWorkerPoolOptions = { nerveRoot: string; workerScript: string; /** Invoked for every IPC message from a worker (including ready / signal / error). */ onWorkerMessage: (raw: unknown) => void; - /** Sense names in a group — used when clearing scheduler state on crash or restart. */ + /** Sense names in a group — reserved for scheduler-aligned cleanup (kernel passes current config). */ sensesForGroup: (group: string) => string[]; /** * Called when a worker exits with non-zero code before scheduling a respawn @@ -58,144 +47,106 @@ export type SenseWorkerPool = { activeGroupCount: () => number; }; -function spawnWorker( - nerveRoot: string, - group: string, - workerScript: string, - stderrTail: { value: string }, -): ChildProcess { - const child = fork(workerScript, ["--group", group, "--root", nerveRoot], { - stdio: ["ignore", "inherit", "pipe", "ipc"], - }); - teeCapturedStderr(child, stderrTail); - child.on("error", (err) => { - if ((err as NodeJS.ErrnoException).code !== "EPIPE") { - console.error("[worker] error:", err.message); - } - }); - return child; -} - -function sendComputeToProcess(worker: ChildProcess, senseName: string): void { - if (worker.connected === false) return; - const msg: ComputeMessage = { type: "compute", sense: senseName }; - try { - worker.send(msg); - } catch { - // IPC channel closed between connected check and send - } -} - -function sendShutdownToProcess(worker: ChildProcess): void { - if (worker.connected === false) return; - const msg: ShutdownMessage = { type: "shutdown" }; - try { - worker.send(msg); - } catch { - // IPC channel closed between connected check and send - } -} - -function waitForExit(child: ChildProcess, timeoutMs: number): Promise { - return new Promise((resolve) => { - const timer = setTimeout(() => { - child.kill("SIGKILL"); - resolve(); - }, timeoutMs); - child.once("exit", () => { - clearTimeout(timer); - resolve(); - }); - }); -} +/** Matches legacy pool: long crash window, 1s respawn delay, practical unlimited respawns. */ +const SENSE_WORKER_RESPAWN = { + enabled: true, + maxCrashes: 100_000, + windowMs: 86_400_000, + delayMs: 1000, +} as const; export function createSenseWorkerPool(options: SenseWorkerPoolOptions): SenseWorkerPool { - const workers = new Map(); - - function startWorker(group: string): Promise { - const stderrTail = { value: "" }; - const child = spawnWorker(options.nerveRoot, group, options.workerScript, stderrTail); - - let workerReadyResolve: (() => void) | undefined; - const workerReady = new Promise((resolve) => { - workerReadyResolve = resolve; - }); - - child.on("message", (raw: unknown) => { - const result = parseWorkerMessage(raw); - if (result.ok && result.value.type === "ready") { - workerReadyResolve?.(); - } + const runtime = createWorkerRuntime({ + script: options.workerScript, + argsForKey: (group) => ["--group", group, "--root", options.nerveRoot], + forwardStderr: true, + onMessage: (_key, raw) => { options.onWorkerMessage(raw); - }); - - child.on("exit", (code, signal) => { - const summary = formatChildExitSummary(code, signal ?? null); + }, + onReady: (_key, msg) => { + options.onWorkerMessage(msg); + }, + onExit: (group, code, signal) => { + const sig = + signal === null || signal === undefined || signal === "" + ? null + : (signal as NodeJS.Signals); + const summary = formatChildExitSummary(code, sig); process.stderr.write( - `[kernel] worker for group "${group}" exited (${summary})${formatCapturedStderrTail(stderrTail.value)}\n`, + `[kernel] worker for group "${group}" exited (${summary})${formatCapturedStderrTail(runtime.stderrTail(group))}\n`, ); - workerReadyResolve?.(); if (!options.isStopped() && code !== 0) { process.stderr.write(`[kernel] respawning worker for group "${group}" in 1s\n`); options.onWorkerCrashed(group); - setTimeout(() => { - if (!options.isStopped()) { - startWorker(group); - } - }, 1000); } - }); + }, + respawn: { + ...SENSE_WORKER_RESPAWN, + allowRespawn: (_group) => !options.isStopped(), + }, + shutdownTimeoutMs: 5000, + }); - workers.set(group, { group, process: child }); - return workerReady; + /** Groups we have ever started — mirrors legacy Map presence for `restartGroup` no-op when unknown. */ + const trackedGroups = new Set(); + /** Marks groups mid-evict so `hasWorkerForGroup` drops immediately (legacy synchronous eviction). */ + const evicting = new Set(); + + async function startWorker(group: string): Promise { + trackedGroups.add(group); + await runtime.start(group); } async function restartGroup(group: string): Promise { - const entry = workers.get(group); - if (entry === undefined) return; - - options.onBeforeGroupRestart(group); - - sendShutdownToProcess(entry.process); - await waitForExit(entry.process, 5000); - - if (!options.isStopped()) { - await startWorker(group); + if (!trackedGroups.has(group)) { + return; } + options.onBeforeGroupRestart(group); + await runtime.drain(group); } function evictGroup(group: string): void { - const entry = workers.get(group); - if (entry === undefined) return; - sendShutdownToProcess(entry.process); - workers.delete(group); + trackedGroups.delete(group); + evicting.add(group); + void runtime.evict(group).finally(() => { + evicting.delete(group); + }); } async function shutdownAll(): Promise { - const exitPromises: Promise[] = []; - for (const entry of workers.values()) { - sendShutdownToProcess(entry.process); - exitPromises.push(waitForExit(entry.process, 5000)); - } - await Promise.all(exitPromises); + await runtime.shutdown(); + trackedGroups.clear(); + evicting.clear(); } function sendCompute(group: string, senseName: string): void { - const entry = workers.get(group); - if (entry === undefined) return; - sendComputeToProcess(entry.process, senseName); + if (!trackedGroups.has(group) || evicting.has(group)) { + return; + } + // Legacy pool: `child.send` no-op when IPC is closed (still allow cold start: child === null). + if (runtime.hasDisconnectedChild(group)) { + return; + } + const msg: ComputeMessage = { type: "compute", sense: senseName }; + if (!runtime.trySendSync(group, msg)) { + void runtime.send(group, msg).catch(() => { + // IPC channel may close between scheduling and send — same as legacy try/catch on child.send + }); + } } function getWorkerPid(group: string): number | null { - return workers.get(group)?.process.pid ?? null; + return runtime.pid(group); } + /** True once `startWorker` has been called for the group and it is not mid-evict (matches legacy Map key). */ function hasWorkerForGroup(group: string): boolean { - return workers.has(group); + return trackedGroups.has(group) && !evicting.has(group); } + /** Count of sense groups with a worker slot (includes not-yet-ready), excluding evicted keys. */ function activeGroupCount(): number { - return workers.size; + return trackedGroups.size; } return { diff --git a/packages/daemon/src/worker-runtime.ts b/packages/daemon/src/worker-runtime.ts new file mode 100644 index 0000000..2be1068 --- /dev/null +++ b/packages/daemon/src/worker-runtime.ts @@ -0,0 +1,404 @@ +/** + * Generic message-routed worker process manager (RFC-006). + * One forked Node child per key; cold start, crash respawn, drain/evict, shutdown. + */ + +import { type ChildProcess, type Serializable, fork } from "node:child_process"; +import { isPlainRecord } from "@uncaged/nerve-core"; + +const STDERR_TAIL_MAX_CHARS = 2048; + +export type WorkerRuntimeConfig = { + script: string; + argsForKey: (key: K) => string[]; + /** When false, stderr is not captured into `stderrTail` (e.g. tests without a pipe). */ + forwardStderr: boolean; + onMessage: (key: K, msg: unknown) => void; + onReady: (key: K, msg: unknown) => void; + onExit: (key: K, code: number | null, signal: string | null) => void; + respawn: { + enabled: boolean; + maxCrashes: number; + windowMs: number; + delayMs: number; + /** When non-null, return false to skip automatic respawn after an unexpected exit. */ + allowRespawn: ((key: K) => boolean) | null; + }; + shutdownTimeoutMs: number; +}; + +export type WorkerRuntime = { + send: (key: K, msg: unknown) => Promise; + /** When the worker is already ready and IPC-connected, sends synchronously (returns true). Otherwise false — caller may fall back to `send`. */ + trySendSync: (key: K, msg: unknown) => boolean; + start: (key: K) => Promise; + evict: (key: K) => Promise; + drain: (key: K) => Promise; + shutdown: () => Promise; + has: (key: K) => boolean; + /** True when a child exists but IPC is disconnected (legacy pool skipped sends in this case). */ + hasDisconnectedChild: (key: K) => boolean; + pid: (key: K) => number | null; + keys: () => K[]; + stderrTail: (key: K) => string; +}; + +type WorkerMachineState = "stopped" | "starting" | "ready" | "draining"; + +type ReadyWaiter = { + resolve: () => void; + reject: (err: Error) => void; +}; + +/** Internal: one forked process slot (ManagedWorker). */ +type WorkerSlot = { + key: K; + state: WorkerMachineState; + child: ChildProcess | null; + pid: number | null; + stderrTail: string; + crashTimestamps: number[]; + expectExit: boolean; + readyWaiters: ReadyWaiter[]; + opChain: Promise; +}; + +function isReadyIpcMessage(raw: unknown): boolean { + return isPlainRecord(raw) && raw.type === "ready"; +} + +function signalToString(signal: NodeJS.Signals | null): string | null { + if (signal === null) { + return null; + } + return String(signal); +} + +function attachStderrTail(child: ChildProcess, slot: WorkerSlot): void { + const stream = child.stderr; + if (stream == null) { + return; + } + stream.setEncoding("utf8"); + stream.on("data", (chunk: string | Buffer) => { + const text = typeof chunk === "string" ? chunk : chunk.toString("utf8"); + slot.stderrTail = (slot.stderrTail + text).slice(-STDERR_TAIL_MAX_CHARS); + }); +} + +function enqueueOp(slot: WorkerSlot, fn: () => Promise): Promise { + const run = slot.opChain.then(fn, fn); + slot.opChain = run.then( + () => {}, + () => {}, + ); + return run; +} + +function resolveReadyWaiters(slot: WorkerSlot): void { + const waiters = slot.readyWaiters; + slot.readyWaiters = []; + for (const w of waiters) { + w.resolve(); + } +} + +function rejectReadyWaiters(slot: WorkerSlot, err: Error): void { + const waiters = slot.readyWaiters; + slot.readyWaiters = []; + for (const w of waiters) { + w.reject(err); + } +} + +function waitForReady( + slot: WorkerSlot, + shutdownTimeoutMs: number, +): Promise { + if (slot.state === "ready" && slot.child !== null && slot.child.connected) { + return Promise.resolve(); + } + return new Promise((resolve, reject) => { + let settled = false; + const timer = setTimeout(() => { + if (!settled) { + settled = true; + reject(new Error(`Worker "${String(slot.key)}" ready timeout`)); + } + }, shutdownTimeoutMs); + slot.readyWaiters.push({ + resolve: () => { + if (settled) { + return; + } + settled = true; + clearTimeout(timer); + resolve(); + }, + reject: (err: Error) => { + if (settled) { + return; + } + settled = true; + clearTimeout(timer); + reject(err); + }, + }); + }); +} + +async function waitForChildExit(child: ChildProcess, timeoutMs: number): Promise { + await new Promise((resolve) => { + const timer = setTimeout(() => { + child.kill("SIGKILL"); + }, timeoutMs); + child.once("exit", () => { + clearTimeout(timer); + resolve(); + }); + }); +} + +export function createWorkerRuntime( + config: WorkerRuntimeConfig, +): WorkerRuntime { + const workers = new Map>(); + + function getOrCreateSlot(key: K): WorkerSlot { + let slot = workers.get(key); + if (slot === undefined) { + slot = { + key, + state: "stopped", + child: null, + pid: null, + stderrTail: "", + crashTimestamps: [], + expectExit: false, + readyWaiters: [], + opChain: Promise.resolve(), + }; + workers.set(key, slot); + } + return slot; + } + + function handleWorkerMessage(slot: WorkerSlot, msg: unknown): void { + if (isReadyIpcMessage(msg)) { + if (slot.state === "starting") { + slot.state = "ready"; + config.onReady(slot.key, msg); + resolveReadyWaiters(slot); + } + return; + } + config.onMessage(slot.key, msg); + } + + function onChildExit( + slot: WorkerSlot, + code: number | null, + signal: NodeJS.Signals | null, + ): void { + config.onExit(slot.key, code, signalToString(signal)); + + if (slot.child !== null) { + slot.child.removeAllListeners("message"); + slot.child.removeAllListeners("exit"); + } + + const wasExpect = slot.expectExit; + slot.expectExit = false; + + slot.child = null; + slot.pid = null; + + if (wasExpect) { + slot.state = "stopped"; + return; + } + + rejectReadyWaiters(slot, new Error(`Worker "${String(slot.key)}" exited unexpectedly`)); + slot.state = "stopped"; + + void enqueueOp(slot, async () => { + await handleUnexpectedCrashRecovery(slot); + }); + } + + function registerChild(slot: WorkerSlot, child: ChildProcess): void { + slot.child = child; + slot.pid = child.pid ?? null; + if (config.forwardStderr) { + attachStderrTail(child, slot); + } + child.on("message", (msg: unknown) => { + handleWorkerMessage(slot, msg); + }); + child.on("exit", (code, sig) => { + onChildExit(slot, code, sig ?? null); + }); + } + + async function forkAndWaitReady(slot: WorkerSlot): Promise { + if (slot.state === "ready" && slot.child !== null && slot.child.connected) { + return; + } + + slot.state = "starting"; + + let child: ChildProcess; + try { + child = fork(config.script, config.argsForKey(slot.key), { + stdio: ["ignore", "inherit", "pipe", "ipc"], + env: process.env, + }); + } catch (e) { + slot.state = "stopped"; + const err = e instanceof Error ? e : new Error(String(e)); + rejectReadyWaiters(slot, err); + throw err; + } + + registerChild(slot, child); + await waitForReady(slot, config.shutdownTimeoutMs); + } + + async function gracefulStop(slot: WorkerSlot): Promise { + if (slot.child === null) { + return; + } + slot.expectExit = true; + slot.state = "draining"; + const child = slot.child; + try { + child.send({ type: "shutdown" }); + } catch { + // IPC channel may have closed between null-check and send + } + await waitForChildExit(child, config.shutdownTimeoutMs); + } + + async function handleUnexpectedCrashRecovery(slot: WorkerSlot): Promise { + if (!config.respawn.enabled) { + return; + } + if (config.respawn.allowRespawn !== null && !config.respawn.allowRespawn(slot.key)) { + return; + } + + const now = Date.now(); + slot.crashTimestamps.push(now); + slot.crashTimestamps = slot.crashTimestamps.filter((t) => now - t <= config.respawn.windowMs); + + if (slot.crashTimestamps.length >= config.respawn.maxCrashes) { + console.error( + `[WorkerRuntime] worker "${String(slot.key)}" exceeded crash limit (${String(config.respawn.maxCrashes)} in ${String(config.respawn.windowMs)}ms); not respawning`, + ); + return; + } + + await new Promise((resolve) => setTimeout(resolve, config.respawn.delayMs)); + await forkAndWaitReady(slot); + } + + async function shutdownWorker(slot: WorkerSlot): Promise { + await gracefulStop(slot); + workers.delete(slot.key); + } + + function isActive(slot: WorkerSlot): boolean { + return slot.state === "ready" && slot.child !== null && slot.child.connected; + } + + return { + send: async (key: K, msg: unknown) => { + const slot = getOrCreateSlot(key); + await enqueueOp(slot, async () => { + await forkAndWaitReady(slot); + const child = slot.child; + if (child === null || !child.connected) { + throw new Error(`Worker "${String(key)}" is not connected`); + } + child.send(msg as Serializable); + }); + }, + + trySendSync: (key: K, msg: unknown): boolean => { + const slot = workers.get(key); + if (slot === undefined || !isActive(slot)) { + return false; + } + const child = slot.child; + if (child === null || !child.connected) { + return false; + } + try { + child.send(msg as Serializable); + return true; + } catch { + return false; + } + }, + + start: async (key: K) => { + const slot = getOrCreateSlot(key); + await enqueueOp(slot, async () => { + await forkAndWaitReady(slot); + }); + }, + + evict: async (key: K) => { + const slot = getOrCreateSlot(key); + await enqueueOp(slot, async () => { + await gracefulStop(slot); + workers.delete(key); + }); + }, + + drain: async (key: K) => { + const slot = getOrCreateSlot(key); + await enqueueOp(slot, async () => { + if (slot.child === null) { + await forkAndWaitReady(slot); + return; + } + await gracefulStop(slot); + await forkAndWaitReady(slot); + }); + }, + + shutdown: async () => { + const snapshot = [...workers.values()]; + await Promise.all(snapshot.map((slot) => enqueueOp(slot, () => shutdownWorker(slot)))); + }, + + has: (key: K) => { + const slot = workers.get(key); + return slot !== undefined && isActive(slot); + }, + + hasDisconnectedChild: (key: K): boolean => { + const slot = workers.get(key); + if (slot === undefined || slot.child === null) { + return false; + } + return !slot.child.connected; + }, + + pid: (key: K) => { + const slot = workers.get(key); + if (slot === undefined || !isActive(slot) || slot.pid === null) { + return null; + } + return slot.pid; + }, + + keys: () => [...workers.values()].filter((slot) => isActive(slot)).map((slot) => slot.key), + + stderrTail: (key: K) => { + const slot = workers.get(key); + return slot === undefined ? "" : slot.stderrTail; + }, + }; +}