Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f1458f8353 |
@@ -33,6 +33,11 @@ import { createReflexScheduler } from "./reflex-scheduler.js";
|
||||
import type { ReflexScheduler } from "./reflex-scheduler.js";
|
||||
import { createSignalBus } from "./signal-bus.js";
|
||||
import type { SignalBus } from "./signal-bus.js";
|
||||
import {
|
||||
formatCapturedStderrTail,
|
||||
formatChildExitSummary,
|
||||
teeCapturedStderr,
|
||||
} from "./worker-fork-support.js";
|
||||
import { createWorkflowManager } from "./workflow-manager.js";
|
||||
import type { WorkflowManager } from "./workflow-manager.js";
|
||||
|
||||
@@ -84,10 +89,16 @@ function resolveWorkerScript(): string {
|
||||
return join(__dir, "sense-worker.js");
|
||||
}
|
||||
|
||||
function spawnWorker(nerveRoot: string, group: string, workerScript: string): ChildProcess {
|
||||
function spawnWorker(
|
||||
nerveRoot: string,
|
||||
group: string,
|
||||
workerScript: string,
|
||||
stderrTail: { value: string },
|
||||
): ChildProcess {
|
||||
const child = fork(workerScript, ["--group", group, "--root", nerveRoot], {
|
||||
stdio: ["ignore", "inherit", "inherit", "ipc"],
|
||||
stdio: ["ignore", "inherit", "pipe", "ipc"],
|
||||
});
|
||||
teeCapturedStderr(child, stderrTail);
|
||||
// Prevent unhandled EPIPE when writing to a child whose IPC channel closed
|
||||
child.on("error", (err) => {
|
||||
if ((err as NodeJS.ErrnoException).code !== "EPIPE") {
|
||||
@@ -240,7 +251,8 @@ export function createKernel(
|
||||
}
|
||||
|
||||
function startWorker(group: string): Promise<void> {
|
||||
const child = spawnWorker(nerveRoot, group, workerScript);
|
||||
const stderrTail = { value: "" };
|
||||
const child = spawnWorker(nerveRoot, group, workerScript, stderrTail);
|
||||
|
||||
let workerReadyResolve: (() => void) | undefined;
|
||||
const workerReady = new Promise<void>((resolve) => {
|
||||
@@ -255,9 +267,10 @@ export function createKernel(
|
||||
handleWorkerMessage(raw);
|
||||
});
|
||||
|
||||
child.on("exit", (code) => {
|
||||
child.on("exit", (code, signal) => {
|
||||
const summary = formatChildExitSummary(code, signal ?? null);
|
||||
process.stderr.write(
|
||||
`[kernel] worker for group "${group}" exited with code ${code ?? "null"}\n`,
|
||||
`[kernel] worker for group "${group}" exited (${summary})${formatCapturedStderrTail(stderrTail.value)}\n`,
|
||||
);
|
||||
// Resolve ready in case the worker exits before sending ready (prevents hangs)
|
||||
workerReadyResolve?.();
|
||||
|
||||
@@ -25,6 +25,7 @@ import type { WorkerToParentMessage } from "./ipc.js";
|
||||
import { parseParentMessage } from "./ipc.js";
|
||||
import { executeCompute, loadComputeFn, openPeerDb, openSenseDb } from "./sense-runtime.js";
|
||||
import type { DrizzleDB, PeerMap, SenseRuntime } from "./sense-runtime.js";
|
||||
import { ignoreSessionBroadcastSignals } from "./worker-fork-support.js";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// IPC helpers
|
||||
@@ -336,6 +337,10 @@ if (!parsed) {
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
if (typeof process.send === "function") {
|
||||
ignoreSessionBroadcastSignals();
|
||||
}
|
||||
|
||||
bootstrap(parsed.nerveRoot, parsed.group).catch((e) => {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`[sense-worker] Unhandled bootstrap error: ${msg}\n`);
|
||||
|
||||
@@ -0,0 +1,48 @@
|
||||
import type { ChildProcess } from "node:child_process";
|
||||
|
||||
const STDERR_TAIL_MAX_CHARS = 16_384;
|
||||
|
||||
/**
|
||||
* Forked workers inherit the parent's process group. In foreground `nerve dev`,
|
||||
* terminal-driven SIGINT/SIGTERM is delivered to the whole group, so workers can exit
|
||||
* on the default handler before the kernel sends `{ type: "shutdown" }` over IPC.
|
||||
* Swallow these in worker processes so the parent coordinates shutdown (issue #55).
|
||||
* Only call when `process.send` is defined (fork IPC); standalone `node …-worker.js` keeps default Ctrl+C behaviour.
|
||||
*/
|
||||
export function ignoreSessionBroadcastSignals(): void {
|
||||
const swallow = (): void => {};
|
||||
process.on("SIGINT", swallow);
|
||||
process.on("SIGTERM", swallow);
|
||||
}
|
||||
|
||||
export function teeCapturedStderr(child: ChildProcess, tail: { value: string }): void {
|
||||
const stream = child.stderr;
|
||||
if (stream === null || stream === undefined) return;
|
||||
stream.setEncoding("utf8");
|
||||
stream.on("data", (chunk: string | Buffer) => {
|
||||
const text = typeof chunk === "string" ? chunk : chunk.toString("utf8");
|
||||
process.stderr.write(text);
|
||||
tail.value = (tail.value + text).slice(-STDERR_TAIL_MAX_CHARS);
|
||||
});
|
||||
}
|
||||
|
||||
export function formatChildExitSummary(
|
||||
code: number | null,
|
||||
signal: NodeJS.Signals | null,
|
||||
): string {
|
||||
const codeStr = code === null || code === undefined ? "null" : String(code);
|
||||
if (signal) {
|
||||
return `code=${codeStr} signal=${signal}`;
|
||||
}
|
||||
return `code=${codeStr}`;
|
||||
}
|
||||
|
||||
export function formatCapturedStderrTail(tail: string, maxChars = 800): string {
|
||||
const trimmed = tail.trim();
|
||||
if (trimmed.length === 0) return "";
|
||||
const normalized = trimmed.replace(/\r?\n/g, "\\n");
|
||||
if (normalized.length <= maxChars) {
|
||||
return ` worker_stderr=${normalized}`;
|
||||
}
|
||||
return ` worker_stderr=…${normalized.slice(-maxChars)}`;
|
||||
}
|
||||
@@ -22,6 +22,11 @@ import type {
|
||||
import { parseWorkerMessage } from "./ipc.js";
|
||||
import type { LogStore } from "./log-store.js";
|
||||
import type { WorkflowRunStatus } from "./log-store.js";
|
||||
import {
|
||||
formatCapturedStderrTail,
|
||||
formatChildExitSummary,
|
||||
teeCapturedStderr,
|
||||
} from "./worker-fork-support.js";
|
||||
|
||||
export type WorkflowManager = {
|
||||
/** Trigger a new workflow thread (called by Reflex scheduler). */
|
||||
@@ -60,6 +65,7 @@ type WorkerEntry = {
|
||||
stopping: boolean;
|
||||
/** When set, the worker is draining before a hot-reload respawn. */
|
||||
draining: boolean;
|
||||
stderrTail: { value: string };
|
||||
};
|
||||
|
||||
// Crash respawn backoff: track crash timestamps per workflow.
|
||||
@@ -85,10 +91,12 @@ function spawnWorkflowWorker(
|
||||
nerveRoot: string,
|
||||
workflowName: string,
|
||||
workerScript: string,
|
||||
stderrTail: { value: string },
|
||||
): ChildProcess {
|
||||
const child = fork(workerScript, ["--workflow", workflowName, "--root", nerveRoot], {
|
||||
stdio: ["ignore", "inherit", "inherit", "ipc"],
|
||||
stdio: ["ignore", "inherit", "pipe", "ipc"],
|
||||
});
|
||||
teeCapturedStderr(child, stderrTail);
|
||||
// Prevent unhandled EPIPE when writing to a child whose IPC channel closed
|
||||
child.on("error", (err) => {
|
||||
if ((err as NodeJS.ErrnoException).code !== "EPIPE") {
|
||||
@@ -395,7 +403,11 @@ export function createWorkflowManager(
|
||||
state.active.clear();
|
||||
}
|
||||
|
||||
function handleWorkerExit(workflowName: string, code: number | null): void {
|
||||
function handleWorkerExit(
|
||||
workflowName: string,
|
||||
code: number | null,
|
||||
signal: NodeJS.Signals | null,
|
||||
): void {
|
||||
const entry = workers.get(workflowName);
|
||||
if (entry?.draining) {
|
||||
workers.delete(workflowName);
|
||||
@@ -416,8 +428,10 @@ export function createWorkflowManager(
|
||||
}
|
||||
return;
|
||||
}
|
||||
const summary = formatChildExitSummary(code, signal);
|
||||
const stderrExtra = entry !== undefined ? formatCapturedStderrTail(entry.stderrTail.value) : "";
|
||||
process.stderr.write(
|
||||
`[workflow-manager] worker for "${workflowName}" exited with code ${code ?? "null"}\n`,
|
||||
`[workflow-manager] worker for "${workflowName}" exited (${summary})${stderrExtra}\n`,
|
||||
);
|
||||
handleWorkerCrash(workflowName);
|
||||
}
|
||||
@@ -428,17 +442,24 @@ export function createWorkflowManager(
|
||||
return existing;
|
||||
}
|
||||
|
||||
const child = spawnWorkflowWorker(nerveRoot, workflowName, workerScript);
|
||||
const stderrTail = { value: "" };
|
||||
const child = spawnWorkflowWorker(nerveRoot, workflowName, workerScript, stderrTail);
|
||||
|
||||
child.on("message", (raw: unknown) => {
|
||||
handleWorkerMessage(workflowName, raw);
|
||||
});
|
||||
|
||||
child.on("exit", (code) => {
|
||||
handleWorkerExit(workflowName, code);
|
||||
child.on("exit", (code, signal) => {
|
||||
handleWorkerExit(workflowName, code, signal ?? null);
|
||||
});
|
||||
|
||||
const entry: WorkerEntry = { workflowName, process: child, stopping: false, draining: false };
|
||||
const entry: WorkerEntry = {
|
||||
workflowName,
|
||||
process: child,
|
||||
stopping: false,
|
||||
draining: false,
|
||||
stderrTail,
|
||||
};
|
||||
workers.set(workflowName, entry);
|
||||
return entry;
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ import type {
|
||||
|
||||
import type { ThreadCommandEventMessage, ThreadEventType, WorkerToParentMessage } from "./ipc.js";
|
||||
import { parseParentMessage } from "./ipc.js";
|
||||
import { ignoreSessionBroadcastSignals } from "./worker-fork-support.js";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// IPC helpers
|
||||
@@ -334,6 +335,10 @@ if (!parsed) {
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
if (typeof process.send === "function") {
|
||||
ignoreSessionBroadcastSignals();
|
||||
}
|
||||
|
||||
bootstrap(parsed.nerveRoot, parsed.workflow).catch((e) => {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`[workflow-worker] Unhandled bootstrap error: ${msg}\n`);
|
||||
|
||||
Reference in New Issue
Block a user