This repository has been archived on 2026-06-01. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
nerve/packages/daemon/src/worker-runtime.ts
T
xiaomo 4dffcb636b fix: resolve 2 failing tests after WorkerRuntime migration
- Add trySendSync() for synchronous send when worker is ready+connected
- sendCompute uses sync path first, async fallback for cold start
- Add forwardStderr, allowRespawn, hasDisconnectedChild, onReady(key,msg)
- Tests: add connected:true to mocks, flush async fork microtasks
- All 167 daemon tests pass
2026-04-30 13:34:10 +00:00

398 lines
11 KiB
TypeScript

/**
* Generic message-routed worker process manager (RFC-006).
* One forked Node child per key; cold start, crash respawn, drain/evict, shutdown.
*/
import { type ChildProcess, type Serializable, fork } from "node:child_process";
import { isPlainRecord } from "@uncaged/nerve-core";
const STDERR_TAIL_MAX_CHARS = 2048;
export type WorkerRuntimeConfig<K extends string> = {
script: string;
argsForKey: (key: K) => string[];
/** When false, stderr is not captured into `stderrTail` (e.g. tests without a pipe). */
forwardStderr: boolean;
onMessage: (key: K, msg: unknown) => void;
onReady: (key: K, msg: unknown) => void;
onExit: (key: K, code: number | null, signal: string | null) => void;
respawn: {
enabled: boolean;
maxCrashes: number;
windowMs: number;
delayMs: number;
/** When non-null, return false to skip automatic respawn after an unexpected exit. */
allowRespawn: ((key: K) => boolean) | null;
};
shutdownTimeoutMs: number;
};
export type WorkerRuntime<K extends string> = {
send: (key: K, msg: unknown) => Promise<void>;
/** When the worker is already ready and IPC-connected, sends synchronously (returns true). Otherwise false — caller may fall back to `send`. */
trySendSync: (key: K, msg: unknown) => boolean;
start: (key: K) => Promise<void>;
evict: (key: K) => Promise<void>;
drain: (key: K) => Promise<void>;
shutdown: () => Promise<void>;
has: (key: K) => boolean;
/** True when a child exists but IPC is disconnected (legacy pool skipped sends in this case). */
hasDisconnectedChild: (key: K) => boolean;
pid: (key: K) => number | null;
keys: () => K[];
stderrTail: (key: K) => string;
};
type WorkerMachineState = "stopped" | "starting" | "ready" | "draining";
type ReadyWaiter = {
resolve: () => void;
reject: (err: Error) => void;
};
/** Internal: one forked process slot (ManagedWorker). */
type WorkerSlot<K extends string> = {
key: K;
state: WorkerMachineState;
child: ChildProcess | null;
pid: number | null;
stderrTail: string;
crashTimestamps: number[];
expectExit: boolean;
readyWaiters: ReadyWaiter[];
opChain: Promise<void>;
};
function isReadyIpcMessage(raw: unknown): boolean {
return isPlainRecord(raw) && raw.type === "ready";
}
function signalToString(signal: NodeJS.Signals | null): string | null {
if (signal === null) {
return null;
}
return String(signal);
}
function attachStderrTail<K extends string>(child: ChildProcess, slot: WorkerSlot<K>): void {
const stream = child.stderr;
if (stream == null) {
return;
}
stream.setEncoding("utf8");
stream.on("data", (chunk: string | Buffer) => {
const text = typeof chunk === "string" ? chunk : chunk.toString("utf8");
slot.stderrTail = (slot.stderrTail + text).slice(-STDERR_TAIL_MAX_CHARS);
});
}
function enqueueOp<K extends string>(slot: WorkerSlot<K>, fn: () => Promise<void>): Promise<void> {
const run = slot.opChain.then(fn, fn);
slot.opChain = run.then(
() => {},
() => {},
);
return run;
}
function resolveReadyWaiters<K extends string>(slot: WorkerSlot<K>): void {
const waiters = slot.readyWaiters;
slot.readyWaiters = [];
for (const w of waiters) {
w.resolve();
}
}
function rejectReadyWaiters<K extends string>(slot: WorkerSlot<K>, err: Error): void {
const waiters = slot.readyWaiters;
slot.readyWaiters = [];
for (const w of waiters) {
w.reject(err);
}
}
function waitForReady<K extends string>(
slot: WorkerSlot<K>,
shutdownTimeoutMs: number,
): Promise<void> {
if (slot.state === "ready" && slot.child !== null && slot.child.connected) {
return Promise.resolve();
}
return new Promise((resolve, reject) => {
let settled = false;
const timer = setTimeout(() => {
if (!settled) {
settled = true;
reject(new Error(`Worker "${String(slot.key)}" ready timeout`));
}
}, shutdownTimeoutMs);
slot.readyWaiters.push({
resolve: () => {
if (settled) {
return;
}
settled = true;
clearTimeout(timer);
resolve();
},
reject: (err: Error) => {
if (settled) {
return;
}
settled = true;
clearTimeout(timer);
reject(err);
},
});
});
}
async function waitForChildExit(child: ChildProcess, timeoutMs: number): Promise<void> {
await new Promise<void>((resolve) => {
const timer = setTimeout(() => {
child.kill("SIGKILL");
}, timeoutMs);
child.once("exit", () => {
clearTimeout(timer);
resolve();
});
});
}
export function createWorkerRuntime<K extends string>(
config: WorkerRuntimeConfig<K>,
): WorkerRuntime<K> {
const workers = new Map<K, WorkerSlot<K>>();
function getOrCreateSlot(key: K): WorkerSlot<K> {
let slot = workers.get(key);
if (slot === undefined) {
slot = {
key,
state: "stopped",
child: null,
pid: null,
stderrTail: "",
crashTimestamps: [],
expectExit: false,
readyWaiters: [],
opChain: Promise.resolve(),
};
workers.set(key, slot);
}
return slot;
}
function handleWorkerMessage(slot: WorkerSlot<K>, msg: unknown): void {
if (isReadyIpcMessage(msg)) {
if (slot.state === "starting") {
slot.state = "ready";
slot.crashTimestamps = [];
config.onReady(slot.key, msg);
resolveReadyWaiters(slot);
}
return;
}
config.onMessage(slot.key, msg);
}
function onChildExit(
slot: WorkerSlot<K>,
code: number | null,
signal: NodeJS.Signals | null,
): void {
config.onExit(slot.key, code, signalToString(signal));
if (slot.child !== null) {
slot.child.removeAllListeners("message");
slot.child.removeAllListeners("exit");
}
const wasExpect = slot.expectExit;
slot.expectExit = false;
slot.child = null;
slot.pid = null;
if (wasExpect) {
slot.state = "stopped";
return;
}
rejectReadyWaiters(slot, new Error(`Worker "${String(slot.key)}" exited unexpectedly`));
slot.state = "stopped";
void enqueueOp(slot, async () => {
await handleUnexpectedCrashRecovery(slot);
});
}
function registerChild(slot: WorkerSlot<K>, child: ChildProcess): void {
slot.child = child;
slot.pid = child.pid ?? null;
if (config.forwardStderr) {
attachStderrTail(child, slot);
}
child.on("message", (msg: unknown) => {
handleWorkerMessage(slot, msg);
});
child.on("exit", (code, sig) => {
onChildExit(slot, code, sig ?? null);
});
}
async function forkAndWaitReady(slot: WorkerSlot<K>): Promise<void> {
if (slot.state === "ready" && slot.child !== null && slot.child.connected) {
return;
}
slot.state = "starting";
let child: ChildProcess;
try {
child = fork(config.script, config.argsForKey(slot.key), {
stdio: ["ignore", "inherit", "pipe", "ipc"],
env: process.env,
});
} catch (e) {
slot.state = "stopped";
const err = e instanceof Error ? e : new Error(String(e));
rejectReadyWaiters(slot, err);
throw err;
}
registerChild(slot, child);
await waitForReady(slot, config.shutdownTimeoutMs);
}
async function gracefulStop(slot: WorkerSlot<K>): Promise<void> {
if (slot.child === null) {
return;
}
slot.expectExit = true;
slot.state = "draining";
const child = slot.child;
child.send({ type: "shutdown" });
await waitForChildExit(child, config.shutdownTimeoutMs);
}
async function handleUnexpectedCrashRecovery(slot: WorkerSlot<K>): Promise<void> {
if (!config.respawn.enabled) {
return;
}
if (config.respawn.allowRespawn !== null && !config.respawn.allowRespawn(slot.key)) {
return;
}
const now = Date.now();
slot.crashTimestamps.push(now);
slot.crashTimestamps = slot.crashTimestamps.filter((t) => now - t <= config.respawn.windowMs);
if (slot.crashTimestamps.length >= config.respawn.maxCrashes) {
console.error(
`[WorkerRuntime] worker "${String(slot.key)}" exceeded crash limit (${String(config.respawn.maxCrashes)} in ${String(config.respawn.windowMs)}ms); not respawning`,
);
return;
}
await new Promise<void>((resolve) => setTimeout(resolve, config.respawn.delayMs));
await forkAndWaitReady(slot);
}
async function shutdownWorker(slot: WorkerSlot<K>): Promise<void> {
await gracefulStop(slot);
workers.delete(slot.key);
}
function isActive(slot: WorkerSlot<K>): boolean {
return slot.state === "ready" && slot.child !== null && slot.child.connected;
}
return {
send: async (key: K, msg: unknown) => {
const slot = getOrCreateSlot(key);
await enqueueOp(slot, async () => {
await forkAndWaitReady(slot);
const child = slot.child;
if (child === null || !child.connected) {
throw new Error(`Worker "${String(key)}" is not connected`);
}
child.send(msg as Serializable);
});
},
trySendSync: (key: K, msg: unknown): boolean => {
const slot = workers.get(key);
if (slot === undefined || !isActive(slot)) {
return false;
}
const child = slot.child;
if (child === null || !child.connected) {
return false;
}
child.send(msg as Serializable);
return true;
},
start: async (key: K) => {
const slot = getOrCreateSlot(key);
await enqueueOp(slot, async () => {
await forkAndWaitReady(slot);
});
},
evict: async (key: K) => {
const slot = getOrCreateSlot(key);
await enqueueOp(slot, async () => {
await gracefulStop(slot);
workers.delete(key);
});
},
drain: async (key: K) => {
const slot = getOrCreateSlot(key);
await enqueueOp(slot, async () => {
if (slot.child === null) {
await forkAndWaitReady(slot);
return;
}
await gracefulStop(slot);
await forkAndWaitReady(slot);
});
},
shutdown: async () => {
const snapshot = [...workers.values()];
await Promise.all(snapshot.map((slot) => enqueueOp(slot, () => shutdownWorker(slot))));
},
has: (key: K) => {
const slot = workers.get(key);
return slot !== undefined && isActive(slot);
},
hasDisconnectedChild: (key: K): boolean => {
const slot = workers.get(key);
if (slot === undefined || slot.child === null) {
return false;
}
return !slot.child.connected;
},
pid: (key: K) => {
const slot = workers.get(key);
if (slot === undefined || !isActive(slot) || slot.pid === null) {
return null;
}
return slot.pid;
},
keys: () => [...workers.values()].filter((slot) => isActive(slot)).map((slot) => slot.key),
stderrTail: (key: K) => {
const slot = workers.get(key);
return slot === undefined ? "" : slot.stderrTail;
},
};
}