diff --git a/packages/daemon/src/__tests__/fixtures/crash-worker.js b/packages/daemon/src/__tests__/fixtures/crash-worker.js new file mode 100644 index 0000000..c9bd61e --- /dev/null +++ b/packages/daemon/src/__tests__/fixtures/crash-worker.js @@ -0,0 +1,9 @@ +// Ready then crashes on a timer; still echoes IPC so parent tests can send after respawn +process.on("message", (msg) => { + if (msg && msg.type === "shutdown") { + process.exit(0); + } + process.send({ type: "echo", payload: msg }); +}); +process.send({ type: "ready" }); +setTimeout(() => process.exit(1), 50); diff --git a/packages/daemon/src/__tests__/fixtures/echo-worker.js b/packages/daemon/src/__tests__/fixtures/echo-worker.js new file mode 100644 index 0000000..990953d --- /dev/null +++ b/packages/daemon/src/__tests__/fixtures/echo-worker.js @@ -0,0 +1,9 @@ +// Simple test worker: sends ready, echoes messages, handles shutdown +process.on("message", (msg) => { + if (msg && msg.type === "shutdown") { + process.exit(0); + } + // Echo back with 'echo' type + process.send({ type: "echo", payload: msg }); +}); +process.send({ type: "ready" }); diff --git a/packages/daemon/src/__tests__/fixtures/stderr-worker.js b/packages/daemon/src/__tests__/fixtures/stderr-worker.js new file mode 100644 index 0000000..127850c --- /dev/null +++ b/packages/daemon/src/__tests__/fixtures/stderr-worker.js @@ -0,0 +1,9 @@ +// Like echo-worker but writes stderr for tail diagnostics +console.error("stderr-marker"); +process.on("message", (msg) => { + if (msg && msg.type === "shutdown") { + process.exit(0); + } + process.send({ type: "echo", payload: msg }); +}); +process.send({ type: "ready" }); diff --git a/packages/daemon/src/__tests__/worker-runtime.test.ts b/packages/daemon/src/__tests__/worker-runtime.test.ts new file mode 100644 index 0000000..8786685 --- /dev/null +++ b/packages/daemon/src/__tests__/worker-runtime.test.ts @@ -0,0 +1,177 @@ +import { dirname, join } from "node:path"; +import { fileURLToPath } from "node:url"; +import { afterEach, describe, expect, it, vi } from "vitest"; +import { createWorkerRuntime } from "../worker-runtime.js"; + +const fixturesDir = join(dirname(fileURLToPath(import.meta.url)), "fixtures"); +const echoWorkerPath = join(fixturesDir, "echo-worker.js"); +const crashWorkerPath = join(fixturesDir, "crash-worker.js"); +const stderrWorkerPath = join(fixturesDir, "stderr-worker.js"); + +function baseConfig(script: string) { + return { + script, + argsForKey: () => [], + onMessage: vi.fn(), + onReady: vi.fn(), + onExit: vi.fn(), + respawn: { + enabled: true, + maxCrashes: 6, + windowMs: 60_000, + delayMs: 80, + }, + shutdownTimeoutMs: 5000, + }; +} + +describe("createWorkerRuntime", () => { + const runtimes: Array<{ shutdown: () => Promise }> = []; + + afterEach(async () => { + await Promise.all(runtimes.splice(0).map((r) => r.shutdown())); + }); + + function track Promise }>(r: R): R { + runtimes.push(r); + return r; + } + + it("start + send message + receive echo", async () => { + const incoming: unknown[] = []; + const rt = track( + createWorkerRuntime({ + ...baseConfig(echoWorkerPath), + onMessage: (_key, msg) => { + incoming.push(msg); + }, + }), + ); + + await rt.start("a"); + expect(rt.has("a")).toBe(true); + await rt.send("a", { type: "ping", n: 1 }); + + await vi.waitFor(() => { + expect(incoming.some((m) => isEchoOf(m, { type: "ping", n: 1 }))).toBe(true); + }); + await rt.shutdown(); + }); + + it("cold start on send (no explicit start)", async () => { + const incoming: unknown[] = []; + const rt = track( + createWorkerRuntime({ + ...baseConfig(echoWorkerPath), + onMessage: (_key, msg) => { + incoming.push(msg); + }, + }), + ); + + expect(rt.has("x")).toBe(false); + await rt.send("x", { type: "hi" }); + await vi.waitFor(() => { + expect(rt.has("x")).toBe(true); + expect(incoming.some((m) => isEchoOf(m, { type: "hi" }))).toBe(true); + }); + await rt.shutdown(); + }); + + it("evict stops worker; has() is false", async () => { + const rt = track(createWorkerRuntime(baseConfig(echoWorkerPath))); + await rt.start("k"); + expect(rt.has("k")).toBe(true); + await rt.evict("k"); + expect(rt.has("k")).toBe(false); + await rt.shutdown(); + }); + + it("drain stops and respawns (new pid)", async () => { + const rt = track(createWorkerRuntime(baseConfig(echoWorkerPath))); + await rt.start("k"); + const before = rt.pid("k"); + expect(before).not.toBeNull(); + await rt.drain("k"); + const after = rt.pid("k"); + expect(after).not.toBeNull(); + expect(after).not.toBe(before); + await rt.shutdown(); + }); + + it("crash triggers auto-respawn", async () => { + const incoming: unknown[] = []; + const onExit = vi.fn(); + const rt = track( + createWorkerRuntime({ + ...baseConfig(crashWorkerPath), + onExit, + onMessage: (_key, msg) => { + incoming.push(msg); + }, + }), + ); + + await rt.start("c"); + + await vi.waitFor(() => expect(onExit.mock.calls.length).toBeGreaterThanOrEqual(1), { + timeout: 3000, + }); + await vi.waitFor(() => expect(rt.has("c")).toBe(true), { timeout: 3000 }); + + await rt.send("c", { type: "after-crash" }); + await vi.waitFor(() => { + expect(incoming.some((m) => isEchoOf(m, { type: "after-crash" }))).toBe(true); + }); + await rt.shutdown(); + }); + + it("crash limit reached → no more automatic respawns", async () => { + const rt = track( + createWorkerRuntime({ + ...baseConfig(crashWorkerPath), + respawn: { + enabled: true, + maxCrashes: 2, + windowMs: 60_000, + delayMs: 50, + }, + }), + ); + + await rt.start("z"); + + await vi.waitFor(() => expect(rt.has("z")).toBe(false), { timeout: 8000 }); + + await rt.shutdown(); + }); + + it("shutdown stops all workers", async () => { + const rt = track(createWorkerRuntime(baseConfig(echoWorkerPath))); + await rt.start("a"); + await rt.start("b"); + expect(rt.keys().sort()).toEqual(["a", "b"].sort()); + await rt.shutdown(); + expect(rt.keys()).toEqual([]); + expect(rt.has("a")).toBe(false); + expect(rt.has("b")).toBe(false); + }); + + it("stderrTail captures stderr output", async () => { + const rt = track(createWorkerRuntime(baseConfig(stderrWorkerPath))); + await rt.start("s"); + await vi.waitFor(() => { + expect(rt.stderrTail("s")).toContain("stderr-marker"); + }); + await rt.shutdown(); + }); +}); + +function isEchoOf(msg: unknown, payload: unknown): boolean { + return ( + typeof msg === "object" && + msg !== null && + (msg as Record).type === "echo" && + JSON.stringify((msg as Record).payload) === JSON.stringify(payload) + ); +} diff --git a/packages/daemon/src/worker-runtime.ts b/packages/daemon/src/worker-runtime.ts new file mode 100644 index 0000000..b550c68 --- /dev/null +++ b/packages/daemon/src/worker-runtime.ts @@ -0,0 +1,363 @@ +/** + * Generic message-routed worker process manager (RFC-006). + * One forked Node child per key; cold start, crash respawn, drain/evict, shutdown. + */ + +import { type ChildProcess, type Serializable, fork } from "node:child_process"; +import { isPlainRecord } from "@uncaged/nerve-core"; + +const STDERR_TAIL_MAX_CHARS = 2048; + +export type WorkerRuntimeConfig = { + script: string; + argsForKey: (key: K) => string[]; + onMessage: (key: K, msg: unknown) => void; + onReady: (key: K) => void; + onExit: (key: K, code: number | null, signal: string | null) => void; + respawn: { + enabled: boolean; + maxCrashes: number; + windowMs: number; + delayMs: number; + }; + shutdownTimeoutMs: number; +}; + +export type WorkerRuntime = { + send: (key: K, msg: unknown) => Promise; + start: (key: K) => Promise; + evict: (key: K) => Promise; + drain: (key: K) => Promise; + shutdown: () => Promise; + has: (key: K) => boolean; + pid: (key: K) => number | null; + keys: () => K[]; + stderrTail: (key: K) => string; +}; + +type WorkerMachineState = "stopped" | "starting" | "ready" | "draining"; + +type ReadyWaiter = { + resolve: () => void; + reject: (err: Error) => void; +}; + +/** Internal: one forked process slot (ManagedWorker). */ +type WorkerSlot = { + key: K; + state: WorkerMachineState; + child: ChildProcess | null; + pid: number | null; + stderrTail: string; + crashTimestamps: number[]; + expectExit: boolean; + readyWaiters: ReadyWaiter[]; + opChain: Promise; +}; + +function isReadyIpcMessage(raw: unknown): boolean { + return isPlainRecord(raw) && raw.type === "ready"; +} + +function signalToString(signal: NodeJS.Signals | null): string | null { + if (signal === null) { + return null; + } + return String(signal); +} + +function attachStderrTail(child: ChildProcess, slot: WorkerSlot): void { + const stream = child.stderr; + if (stream === null) { + return; + } + stream.setEncoding("utf8"); + stream.on("data", (chunk: string | Buffer) => { + const text = typeof chunk === "string" ? chunk : chunk.toString("utf8"); + slot.stderrTail = (slot.stderrTail + text).slice(-STDERR_TAIL_MAX_CHARS); + }); +} + +function enqueueOp(slot: WorkerSlot, fn: () => Promise): Promise { + const run = slot.opChain.then(fn, fn); + slot.opChain = run.then( + () => {}, + () => {}, + ); + return run; +} + +function resolveReadyWaiters(slot: WorkerSlot): void { + const waiters = slot.readyWaiters; + slot.readyWaiters = []; + for (const w of waiters) { + w.resolve(); + } +} + +function rejectReadyWaiters(slot: WorkerSlot, err: Error): void { + const waiters = slot.readyWaiters; + slot.readyWaiters = []; + for (const w of waiters) { + w.reject(err); + } +} + +function waitForReady( + slot: WorkerSlot, + shutdownTimeoutMs: number, +): Promise { + if (slot.state === "ready" && slot.child !== null && slot.child.connected) { + return Promise.resolve(); + } + return new Promise((resolve, reject) => { + let settled = false; + const timer = setTimeout(() => { + if (!settled) { + settled = true; + reject(new Error(`Worker "${String(slot.key)}" ready timeout`)); + } + }, shutdownTimeoutMs); + slot.readyWaiters.push({ + resolve: () => { + if (settled) { + return; + } + settled = true; + clearTimeout(timer); + resolve(); + }, + reject: (err: Error) => { + if (settled) { + return; + } + settled = true; + clearTimeout(timer); + reject(err); + }, + }); + }); +} + +async function waitForChildExit(child: ChildProcess, timeoutMs: number): Promise { + await new Promise((resolve) => { + const timer = setTimeout(() => { + child.kill("SIGKILL"); + }, timeoutMs); + child.once("exit", () => { + clearTimeout(timer); + resolve(); + }); + }); +} + +export function createWorkerRuntime( + config: WorkerRuntimeConfig, +): WorkerRuntime { + const workers = new Map>(); + + function getOrCreateSlot(key: K): WorkerSlot { + let slot = workers.get(key); + if (slot === undefined) { + slot = { + key, + state: "stopped", + child: null, + pid: null, + stderrTail: "", + crashTimestamps: [], + expectExit: false, + readyWaiters: [], + opChain: Promise.resolve(), + }; + workers.set(key, slot); + } + return slot; + } + + function handleWorkerMessage(slot: WorkerSlot, msg: unknown): void { + if (isReadyIpcMessage(msg)) { + if (slot.state === "starting") { + slot.state = "ready"; + slot.crashTimestamps = []; + config.onReady(slot.key); + resolveReadyWaiters(slot); + } + return; + } + config.onMessage(slot.key, msg); + } + + function onChildExit( + slot: WorkerSlot, + code: number | null, + signal: NodeJS.Signals | null, + ): void { + config.onExit(slot.key, code, signalToString(signal)); + + if (slot.child !== null) { + slot.child.removeAllListeners("message"); + slot.child.removeAllListeners("exit"); + } + + const wasExpect = slot.expectExit; + slot.expectExit = false; + + slot.child = null; + slot.pid = null; + + if (wasExpect) { + slot.state = "stopped"; + return; + } + + rejectReadyWaiters(slot, new Error(`Worker "${String(slot.key)}" exited unexpectedly`)); + slot.state = "stopped"; + + void enqueueOp(slot, async () => { + await handleUnexpectedCrashRecovery(slot); + }); + } + + function registerChild(slot: WorkerSlot, child: ChildProcess): void { + slot.child = child; + slot.pid = child.pid ?? null; + attachStderrTail(child, slot); + child.on("message", (msg: unknown) => { + handleWorkerMessage(slot, msg); + }); + child.on("exit", (code, sig) => { + onChildExit(slot, code, sig ?? null); + }); + } + + async function forkAndWaitReady(slot: WorkerSlot): Promise { + if (slot.state === "ready" && slot.child !== null && slot.child.connected) { + return; + } + + slot.state = "starting"; + + let child: ChildProcess; + try { + child = fork(config.script, config.argsForKey(slot.key), { + stdio: ["ignore", "inherit", "pipe", "ipc"], + env: process.env, + }); + } catch (e) { + slot.state = "stopped"; + const err = e instanceof Error ? e : new Error(String(e)); + rejectReadyWaiters(slot, err); + throw err; + } + + registerChild(slot, child); + await waitForReady(slot, config.shutdownTimeoutMs); + } + + async function gracefulStop(slot: WorkerSlot): Promise { + if (slot.child === null) { + return; + } + slot.expectExit = true; + slot.state = "draining"; + const child = slot.child; + child.send({ type: "shutdown" }); + await waitForChildExit(child, config.shutdownTimeoutMs); + } + + async function handleUnexpectedCrashRecovery(slot: WorkerSlot): Promise { + if (!config.respawn.enabled) { + return; + } + + const now = Date.now(); + slot.crashTimestamps.push(now); + slot.crashTimestamps = slot.crashTimestamps.filter((t) => now - t <= config.respawn.windowMs); + + if (slot.crashTimestamps.length >= config.respawn.maxCrashes) { + console.error( + `[WorkerRuntime] worker "${String(slot.key)}" exceeded crash limit (${String(config.respawn.maxCrashes)} in ${String(config.respawn.windowMs)}ms); not respawning`, + ); + return; + } + + await new Promise((resolve) => setTimeout(resolve, config.respawn.delayMs)); + await forkAndWaitReady(slot); + } + + async function shutdownWorker(slot: WorkerSlot): Promise { + await gracefulStop(slot); + workers.delete(slot.key); + } + + function isActive(slot: WorkerSlot): boolean { + return slot.state === "ready" && slot.child !== null && slot.child.connected; + } + + return { + send: async (key: K, msg: unknown) => { + const slot = getOrCreateSlot(key); + await enqueueOp(slot, async () => { + await forkAndWaitReady(slot); + const child = slot.child; + if (child === null || !child.connected) { + throw new Error(`Worker "${String(key)}" is not connected`); + } + child.send(msg as Serializable); + }); + }, + + start: async (key: K) => { + const slot = getOrCreateSlot(key); + await enqueueOp(slot, async () => { + await forkAndWaitReady(slot); + }); + }, + + evict: async (key: K) => { + const slot = getOrCreateSlot(key); + await enqueueOp(slot, async () => { + await gracefulStop(slot); + workers.delete(key); + }); + }, + + drain: async (key: K) => { + const slot = getOrCreateSlot(key); + await enqueueOp(slot, async () => { + if (slot.child === null) { + await forkAndWaitReady(slot); + return; + } + await gracefulStop(slot); + await forkAndWaitReady(slot); + }); + }, + + shutdown: async () => { + const snapshot = [...workers.values()]; + await Promise.all(snapshot.map((slot) => enqueueOp(slot, () => shutdownWorker(slot)))); + }, + + has: (key: K) => { + const slot = workers.get(key); + return slot !== undefined && isActive(slot); + }, + + pid: (key: K) => { + const slot = workers.get(key); + if (slot === undefined || !isActive(slot) || slot.pid === null) { + return null; + } + return slot.pid; + }, + + keys: () => [...workers.values()].filter((slot) => isActive(slot)).map((slot) => slot.key), + + stderrTail: (key: K) => { + const slot = workers.get(key); + return slot === undefined ? "" : slot.stderrTail; + }, + }; +}