diff --git a/packages/daemon/src/kernel.ts b/packages/daemon/src/kernel.ts index 5e7aa8d..26acabd 100644 --- a/packages/daemon/src/kernel.ts +++ b/packages/daemon/src/kernel.ts @@ -33,6 +33,11 @@ import { createReflexScheduler } from "./reflex-scheduler.js"; import type { ReflexScheduler } from "./reflex-scheduler.js"; import { createSignalBus } from "./signal-bus.js"; import type { SignalBus } from "./signal-bus.js"; +import { + formatCapturedStderrTail, + formatChildExitSummary, + teeCapturedStderr, +} from "./worker-fork-support.js"; import { createWorkflowManager } from "./workflow-manager.js"; import type { WorkflowManager } from "./workflow-manager.js"; @@ -84,10 +89,16 @@ function resolveWorkerScript(): string { return join(__dir, "sense-worker.js"); } -function spawnWorker(nerveRoot: string, group: string, workerScript: string): ChildProcess { +function spawnWorker( + nerveRoot: string, + group: string, + workerScript: string, + stderrTail: { value: string }, +): ChildProcess { const child = fork(workerScript, ["--group", group, "--root", nerveRoot], { - stdio: ["ignore", "inherit", "inherit", "ipc"], + stdio: ["ignore", "inherit", "pipe", "ipc"], }); + teeCapturedStderr(child, stderrTail); // Prevent unhandled EPIPE when writing to a child whose IPC channel closed child.on("error", (err) => { if ((err as NodeJS.ErrnoException).code !== "EPIPE") { @@ -240,7 +251,8 @@ export function createKernel( } function startWorker(group: string): Promise { - const child = spawnWorker(nerveRoot, group, workerScript); + const stderrTail = { value: "" }; + const child = spawnWorker(nerveRoot, group, workerScript, stderrTail); let workerReadyResolve: (() => void) | undefined; const workerReady = new Promise((resolve) => { @@ -255,9 +267,10 @@ export function createKernel( handleWorkerMessage(raw); }); - child.on("exit", (code) => { + child.on("exit", (code, signal) => { + const summary = formatChildExitSummary(code, signal ?? null); process.stderr.write( - `[kernel] worker for group "${group}" exited with code ${code ?? "null"}\n`, + `[kernel] worker for group "${group}" exited (${summary})${formatCapturedStderrTail(stderrTail.value)}\n`, ); // Resolve ready in case the worker exits before sending ready (prevents hangs) workerReadyResolve?.(); diff --git a/packages/daemon/src/sense-worker.ts b/packages/daemon/src/sense-worker.ts index 4a208b3..29fb094 100644 --- a/packages/daemon/src/sense-worker.ts +++ b/packages/daemon/src/sense-worker.ts @@ -25,6 +25,7 @@ import type { WorkerToParentMessage } from "./ipc.js"; import { parseParentMessage } from "./ipc.js"; import { executeCompute, loadComputeFn, openPeerDb, openSenseDb } from "./sense-runtime.js"; import type { DrizzleDB, PeerMap, SenseRuntime } from "./sense-runtime.js"; +import { ignoreSessionBroadcastSignals } from "./worker-fork-support.js"; // --------------------------------------------------------------------------- // IPC helpers @@ -336,6 +337,10 @@ if (!parsed) { process.exit(1); } +if (typeof process.send === "function") { + ignoreSessionBroadcastSignals(); +} + bootstrap(parsed.nerveRoot, parsed.group).catch((e) => { const msg = e instanceof Error ? e.message : String(e); process.stderr.write(`[sense-worker] Unhandled bootstrap error: ${msg}\n`); diff --git a/packages/daemon/src/worker-fork-support.ts b/packages/daemon/src/worker-fork-support.ts new file mode 100644 index 0000000..774a5cc --- /dev/null +++ b/packages/daemon/src/worker-fork-support.ts @@ -0,0 +1,48 @@ +import type { ChildProcess } from "node:child_process"; + +const STDERR_TAIL_MAX_CHARS = 16_384; + +/** + * Forked workers inherit the parent's process group. In foreground `nerve dev`, + * terminal-driven SIGINT/SIGTERM is delivered to the whole group, so workers can exit + * on the default handler before the kernel sends `{ type: "shutdown" }` over IPC. + * Swallow these in worker processes so the parent coordinates shutdown (issue #55). + * Only call when `process.send` is defined (fork IPC); standalone `node …-worker.js` keeps default Ctrl+C behaviour. + */ +export function ignoreSessionBroadcastSignals(): void { + const swallow = (): void => {}; + process.on("SIGINT", swallow); + process.on("SIGTERM", swallow); +} + +export function teeCapturedStderr(child: ChildProcess, tail: { value: string }): void { + const stream = child.stderr; + if (stream === null || stream === undefined) return; + stream.setEncoding("utf8"); + stream.on("data", (chunk: string | Buffer) => { + const text = typeof chunk === "string" ? chunk : chunk.toString("utf8"); + process.stderr.write(text); + tail.value = (tail.value + text).slice(-STDERR_TAIL_MAX_CHARS); + }); +} + +export function formatChildExitSummary( + code: number | null, + signal: NodeJS.Signals | null, +): string { + const codeStr = code === null || code === undefined ? "null" : String(code); + if (signal) { + return `code=${codeStr} signal=${signal}`; + } + return `code=${codeStr}`; +} + +export function formatCapturedStderrTail(tail: string, maxChars = 800): string { + const trimmed = tail.trim(); + if (trimmed.length === 0) return ""; + const normalized = trimmed.replace(/\r?\n/g, "\\n"); + if (normalized.length <= maxChars) { + return ` worker_stderr=${normalized}`; + } + return ` worker_stderr=…${normalized.slice(-maxChars)}`; +} diff --git a/packages/daemon/src/workflow-manager.ts b/packages/daemon/src/workflow-manager.ts index d61c03f..64064a7 100644 --- a/packages/daemon/src/workflow-manager.ts +++ b/packages/daemon/src/workflow-manager.ts @@ -22,6 +22,11 @@ import type { import { parseWorkerMessage } from "./ipc.js"; import type { LogStore } from "./log-store.js"; import type { WorkflowRunStatus } from "./log-store.js"; +import { + formatCapturedStderrTail, + formatChildExitSummary, + teeCapturedStderr, +} from "./worker-fork-support.js"; export type WorkflowManager = { /** Trigger a new workflow thread (called by Reflex scheduler). */ @@ -60,6 +65,7 @@ type WorkerEntry = { stopping: boolean; /** When set, the worker is draining before a hot-reload respawn. */ draining: boolean; + stderrTail: { value: string }; }; // Crash respawn backoff: track crash timestamps per workflow. @@ -85,10 +91,12 @@ function spawnWorkflowWorker( nerveRoot: string, workflowName: string, workerScript: string, + stderrTail: { value: string }, ): ChildProcess { const child = fork(workerScript, ["--workflow", workflowName, "--root", nerveRoot], { - stdio: ["ignore", "inherit", "inherit", "ipc"], + stdio: ["ignore", "inherit", "pipe", "ipc"], }); + teeCapturedStderr(child, stderrTail); // Prevent unhandled EPIPE when writing to a child whose IPC channel closed child.on("error", (err) => { if ((err as NodeJS.ErrnoException).code !== "EPIPE") { @@ -395,7 +403,11 @@ export function createWorkflowManager( state.active.clear(); } - function handleWorkerExit(workflowName: string, code: number | null): void { + function handleWorkerExit( + workflowName: string, + code: number | null, + signal: NodeJS.Signals | null, + ): void { const entry = workers.get(workflowName); if (entry?.draining) { workers.delete(workflowName); @@ -416,8 +428,10 @@ export function createWorkflowManager( } return; } + const summary = formatChildExitSummary(code, signal); + const stderrExtra = entry !== undefined ? formatCapturedStderrTail(entry.stderrTail.value) : ""; process.stderr.write( - `[workflow-manager] worker for "${workflowName}" exited with code ${code ?? "null"}\n`, + `[workflow-manager] worker for "${workflowName}" exited (${summary})${stderrExtra}\n`, ); handleWorkerCrash(workflowName); } @@ -428,17 +442,24 @@ export function createWorkflowManager( return existing; } - const child = spawnWorkflowWorker(nerveRoot, workflowName, workerScript); + const stderrTail = { value: "" }; + const child = spawnWorkflowWorker(nerveRoot, workflowName, workerScript, stderrTail); child.on("message", (raw: unknown) => { handleWorkerMessage(workflowName, raw); }); - child.on("exit", (code) => { - handleWorkerExit(workflowName, code); + child.on("exit", (code, signal) => { + handleWorkerExit(workflowName, code, signal ?? null); }); - const entry: WorkerEntry = { workflowName, process: child, stopping: false, draining: false }; + const entry: WorkerEntry = { + workflowName, + process: child, + stopping: false, + draining: false, + stderrTail, + }; workers.set(workflowName, entry); return entry; } diff --git a/packages/daemon/src/workflow-worker.ts b/packages/daemon/src/workflow-worker.ts index e405900..f829521 100644 --- a/packages/daemon/src/workflow-worker.ts +++ b/packages/daemon/src/workflow-worker.ts @@ -21,6 +21,7 @@ import type { import type { ThreadCommandEventMessage, ThreadEventType, WorkerToParentMessage } from "./ipc.js"; import { parseParentMessage } from "./ipc.js"; +import { ignoreSessionBroadcastSignals } from "./worker-fork-support.js"; // --------------------------------------------------------------------------- // IPC helpers @@ -334,6 +335,10 @@ if (!parsed) { process.exit(1); } +if (typeof process.send === "function") { + ignoreSessionBroadcastSignals(); +} + bootstrap(parsed.nerveRoot, parsed.workflow).catch((e) => { const msg = e instanceof Error ? e.message : String(e); process.stderr.write(`[workflow-worker] Unhandled bootstrap error: ${msg}\n`);