refactor(daemon): split kernel.ts into focused modules (#86)

- Extract worker-pool.ts (211 LOC): sense worker fork/shutdown/restart/crash recovery
- Extract kernel-file-watch.ts (92 LOC): file change handlers for hot reload
- Extract kernel-sense-groups.ts (29 LOC): group lookup utilities
- kernel.ts reduced from 617 → 380 LOC (thin orchestrator)
- Add worker-pool.test.ts with 8 test cases
- No behavior changes, all existing tests unchanged
This commit is contained in:
2026-04-24 19:39:10 +08:00
parent 65012fbb53
commit 47cc49eab4
5 changed files with 623 additions and 293 deletions
@@ -0,0 +1,235 @@
import { EventEmitter } from "node:events";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
const mockChildren: MockChild[] = [];
type MockChild = EventEmitter & {
send: ReturnType<typeof vi.fn>;
kill: ReturnType<typeof vi.fn>;
pid: number;
connected: boolean;
};
function makeMockChild(pid = 1): MockChild {
const child = new EventEmitter() as MockChild;
child.connected = true;
child.send = vi.fn((msg: unknown) => {
if (
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "shutdown"
) {
child.connected = false;
setImmediate(() => child.emit("exit", 0, null));
}
});
child.kill = vi.fn((_signal?: string) => {
child.connected = false;
child.emit("exit", null, _signal ?? "SIGKILL");
});
child.pid = pid;
return child;
}
vi.mock("node:child_process", () => ({
fork: vi.fn((_script: string, _args: string[], _opts: unknown) => {
const child = makeMockChild(mockChildren.length + 1);
mockChildren.push(child);
return child;
}),
}));
const { createSenseWorkerPool } = await import("../worker-pool.js");
async function flushSetImmediate(): Promise<void> {
await new Promise<void>((resolve) => setImmediate(resolve));
}
async function startWorkerWithReady(
pool: ReturnType<typeof createSenseWorkerPool>,
group: string,
): Promise<void> {
const pr = pool.startWorker(group);
const child = mockChildren[mockChildren.length - 1];
child.emit("message", { type: "ready" });
await pr;
}
describe("createSenseWorkerPool", () => {
beforeEach(() => {
mockChildren.length = 0;
});
afterEach(() => {
vi.useRealTimers();
});
it("forks one child per startWorker and routes IPC to onWorkerMessage", async () => {
const onWorkerMessage = vi.fn();
const pool = createSenseWorkerPool({
nerveRoot: "/tmp/n",
workerScript: "/fake/sense-worker.js",
onWorkerMessage,
sensesForGroup: () => [],
onWorkerCrashed: vi.fn(),
onBeforeGroupRestart: vi.fn(),
isStopped: () => false,
});
await startWorkerWithReady(pool, "g1");
expect(mockChildren).toHaveLength(1);
const child = mockChildren[0];
child.emit("message", { type: "signal", sense: "s", payload: 1 });
expect(onWorkerMessage).toHaveBeenCalledWith({ type: "signal", sense: "s", payload: 1 });
});
it("sendCompute delivers to the worker for that group", async () => {
const pool = createSenseWorkerPool({
nerveRoot: "/tmp/n",
workerScript: "/fake/sense-worker.js",
onWorkerMessage: vi.fn(),
sensesForGroup: () => [],
onWorkerCrashed: vi.fn(),
onBeforeGroupRestart: vi.fn(),
isStopped: () => false,
});
await startWorkerWithReady(pool, "sys");
const child = mockChildren[0];
pool.sendCompute("sys", "cpu");
expect(child.send).toHaveBeenCalledWith(
expect.objectContaining({ type: "compute", sense: "cpu" }),
);
});
it("hasWorkerForGroup and getWorkerPid reflect running workers", async () => {
const pool = createSenseWorkerPool({
nerveRoot: "/tmp/n",
workerScript: "/fake/sense-worker.js",
onWorkerMessage: vi.fn(),
sensesForGroup: () => [],
onWorkerCrashed: vi.fn(),
onBeforeGroupRestart: vi.fn(),
isStopped: () => false,
});
expect(pool.hasWorkerForGroup("a")).toBe(false);
expect(pool.getWorkerPid("a")).toBeNull();
await startWorkerWithReady(pool, "a");
expect(pool.hasWorkerForGroup("a")).toBe(true);
expect(pool.getWorkerPid("a")).toBe(1);
expect(pool.activeGroupCount()).toBe(1);
});
it("evictGroup sends shutdown and removes the entry without waiting", async () => {
const pool = createSenseWorkerPool({
nerveRoot: "/tmp/n",
workerScript: "/fake/sense-worker.js",
onWorkerMessage: vi.fn(),
sensesForGroup: () => [],
onWorkerCrashed: vi.fn(),
onBeforeGroupRestart: vi.fn(),
isStopped: () => false,
});
await startWorkerWithReady(pool, "x");
expect(pool.activeGroupCount()).toBe(1);
pool.evictGroup("x");
expect(pool.hasWorkerForGroup("x")).toBe(false);
expect(mockChildren[0].send).toHaveBeenCalledWith(
expect.objectContaining({ type: "shutdown" }),
);
});
it("restartGroup invokes onBeforeGroupRestart then respawns", async () => {
const onBeforeGroupRestart = vi.fn();
const pool = createSenseWorkerPool({
nerveRoot: "/tmp/n",
workerScript: "/fake/sense-worker.js",
onWorkerMessage: vi.fn(),
sensesForGroup: () => ["s1"],
onWorkerCrashed: vi.fn(),
onBeforeGroupRestart,
isStopped: () => false,
});
await startWorkerWithReady(pool, "g");
expect(mockChildren).toHaveLength(1);
const p = pool.restartGroup("g");
expect(onBeforeGroupRestart).toHaveBeenCalledWith("g");
expect(mockChildren[0].send).toHaveBeenCalledWith(
expect.objectContaining({ type: "shutdown" }),
);
await flushSetImmediate();
expect(mockChildren).toHaveLength(2);
mockChildren[1].emit("message", { type: "ready" });
await p;
expect(pool.hasWorkerForGroup("g")).toBe(true);
});
it("onWorkerCrashed runs and schedules respawn after non-zero exit", async () => {
vi.useFakeTimers({ shouldAdvanceTime: true });
const onWorkerCrashed = vi.fn();
const pool = createSenseWorkerPool({
nerveRoot: "/tmp/n",
workerScript: "/fake/sense-worker.js",
onWorkerMessage: vi.fn(),
sensesForGroup: (g) => (g === "g" ? ["a", "b"] : []),
onWorkerCrashed,
onBeforeGroupRestart: vi.fn(),
isStopped: () => false,
});
await startWorkerWithReady(pool, "g");
expect(mockChildren).toHaveLength(1);
mockChildren[0].emit("exit", 1, null);
expect(onWorkerCrashed).toHaveBeenCalledWith("g");
await vi.advanceTimersByTimeAsync(1000);
expect(mockChildren).toHaveLength(2);
});
it("shutdownAll sends shutdown to every worker", async () => {
const pool = createSenseWorkerPool({
nerveRoot: "/tmp/n",
workerScript: "/fake/sense-worker.js",
onWorkerMessage: vi.fn(),
sensesForGroup: () => [],
onWorkerCrashed: vi.fn(),
onBeforeGroupRestart: vi.fn(),
isStopped: () => false,
});
await startWorkerWithReady(pool, "a");
await startWorkerWithReady(pool, "b");
await pool.shutdownAll();
expect(mockChildren[0].send).toHaveBeenCalledWith(
expect.objectContaining({ type: "shutdown" }),
);
expect(mockChildren[1].send).toHaveBeenCalledWith(
expect.objectContaining({ type: "shutdown" }),
);
});
it("does not respawn after crash when isStopped is true", async () => {
vi.useFakeTimers({ shouldAdvanceTime: true });
const pool = createSenseWorkerPool({
nerveRoot: "/tmp/n",
workerScript: "/fake/sense-worker.js",
onWorkerMessage: vi.fn(),
sensesForGroup: () => [],
onWorkerCrashed: vi.fn(),
onBeforeGroupRestart: vi.fn(),
isStopped: () => true,
});
await startWorkerWithReady(pool, "g");
const n = mockChildren.length;
mockChildren[0].emit("exit", 1, null);
await vi.advanceTimersByTimeAsync(1000);
expect(mockChildren.length).toBe(n);
});
});
+92
View File
@@ -0,0 +1,92 @@
/**
* File-watcher callbacks for nerve.yaml / sense / workflow sources (hot reload wiring).
*/
import { readFileSync } from "node:fs";
import { join } from "node:path";
import type { NerveConfig } from "@uncaged/nerve-core";
import { parseNerveConfig } from "@uncaged/nerve-core";
import type { LogStore } from "@uncaged/nerve-store";
import type { WorkflowManager } from "./workflow-manager.js";
export type KernelFileWatchDeps = {
nerveRoot: string;
getConfig: () => NerveConfig;
logStore: LogStore;
workflowManager: WorkflowManager;
restartGroup: (group: string) => Promise<void>;
reloadConfig: (newConfig: NerveConfig) => void;
};
export type KernelFileWatchHandlers = {
onSenseFileChange: (senseName: string) => void;
onWorkflowFileChange: (workflowName: string) => void;
onConfigFileChange: () => void;
};
export function createKernelFileWatchHandlers(deps: KernelFileWatchDeps): KernelFileWatchHandlers {
function onSenseFileChange(senseName: string): void {
const sc = deps.getConfig().senses[senseName];
if (sc === undefined) return;
process.stderr.write(
`[kernel] sense file changed: "${senseName}", restarting group "${sc.group}"\n`,
);
deps.logStore.append({
source: "system",
type: "sense_reload",
refId: senseName,
payload: null,
ts: Date.now(),
});
deps.restartGroup(sc.group).catch((e) => {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[kernel] restartGroup error: ${msg}\n`);
});
}
function onWorkflowFileChange(workflowName: string): void {
process.stderr.write(
`[kernel] workflow file changed: "${workflowName}", draining and respawning worker\n`,
);
deps.logStore.append({
source: "system",
type: "workflow_reload",
refId: workflowName,
payload: null,
ts: Date.now(),
});
deps.workflowManager.drainAndRespawn(workflowName).catch((e) => {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[kernel] drainAndRespawn error for "${workflowName}": ${msg}\n`);
});
}
function onConfigFileChange(): void {
process.stderr.write("[kernel] nerve.yaml changed, reloading config\n");
deps.logStore.append({
source: "system",
type: "config_reload",
refId: null,
payload: null,
ts: Date.now(),
});
try {
const raw = readFileSync(join(deps.nerveRoot, "nerve.yaml"), "utf8");
const parseResult = parseNerveConfig(raw);
if (!parseResult.ok) {
process.stderr.write(
`[kernel] config parse error, keeping current config: ${parseResult.error.message}\n`,
);
return;
}
deps.reloadConfig(parseResult.value);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[kernel] failed to read nerve.yaml, keeping current config: ${msg}\n`);
}
}
return { onSenseFileChange, onWorkflowFileChange, onConfigFileChange };
}
@@ -0,0 +1,29 @@
import type { NerveConfig } from "@uncaged/nerve-core";
export function groupForSense(config: NerveConfig, senseName: string): string | null {
const senseConfig = config.senses[senseName];
if (senseConfig === undefined) return null;
return senseConfig.group;
}
export function senseNamesInGroup(config: NerveConfig, group: string): string[] {
return Object.entries(config.senses)
.filter(([, sc]) => sc.group === group)
.map(([name]) => name);
}
export function collectSenseGroups(cfg: NerveConfig): Set<string> {
const result = new Set<string>();
for (const sc of Object.values(cfg.senses)) {
result.add(sc.group);
}
return result;
}
export function senseNamesInGroupAsSet(cfg: NerveConfig, group: string): Set<string> {
const result = new Set<string>();
for (const [name, sc] of Object.entries(cfg.senses)) {
if (sc.group === group) result.add(name);
}
return result;
}
+56 -293
View File
@@ -1,43 +1,32 @@
/**
* Kernel — the main orchestrator that ties sense workers, signal bus, and
* reflex scheduler together.
*
* Responsibilities:
* - Spawn one child process per sense group (via fork)
* - Route SignalMessage from workers → SignalBus
* - Route ErrorMessage from workers → stderr log
* - Drive compute triggers via ReflexScheduler
* - Graceful shutdown: stop scheduler, send shutdown to all workers
* - Hot reload: restartGroup, reloadConfig, file watcher integration
* - Health reporting: getHealth
* Kernel — ties sense workers, signal bus, reflex scheduler, workflow manager,
* optional file watcher, and daemon IPC.
*/
import { fork } from "node:child_process";
import type { ChildProcess } from "node:child_process";
import { readFileSync } from "node:fs";
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import { join } from "node:path";
import type { NerveConfig, SenseInfo, Signal } from "@uncaged/nerve-core";
import { parseNerveConfig, routeSenseComputeOutput } from "@uncaged/nerve-core";
import { routeSenseComputeOutput } from "@uncaged/nerve-core";
import { createLogStore } from "@uncaged/nerve-store";
import type { LogStore } from "@uncaged/nerve-store";
import { createDaemonIpcServer } from "./daemon-ipc.js";
import type { DaemonIpcServer } from "./daemon-ipc.js";
import { createFileWatcher } from "./file-watcher.js";
import type { FileWatcher } from "./file-watcher.js";
import type { ComputeMessage, ShutdownMessage } from "./ipc.js";
import { parseWorkerMessage } from "./ipc.js";
import { createLogStore } from "@uncaged/nerve-store";
import type { LogStore } from "@uncaged/nerve-store";
import { createKernelFileWatchHandlers } from "./kernel-file-watch.js";
import {
collectSenseGroups,
groupForSense,
senseNamesInGroup,
senseNamesInGroupAsSet,
} from "./kernel-sense-groups.js";
import { createReflexScheduler } from "./reflex-scheduler.js";
import type { ReflexScheduler } from "./reflex-scheduler.js";
import { createSignalBus } from "./signal-bus.js";
import type { SignalBus } from "./signal-bus.js";
import {
formatCapturedStderrTail,
formatChildExitSummary,
teeCapturedStderr,
} from "./worker-fork-support.js";
import { createSenseWorkerPool, resolveWorkerScript } from "./worker-pool.js";
import { createWorkflowManager } from "./workflow-manager.js";
import type { WorkflowManager } from "./workflow-manager.js";
@@ -57,93 +46,19 @@ export type Kernel = {
bus: SignalBus;
logStore: LogStore;
workflowManager: WorkflowManager;
/** Resolves when all workers have sent their initial "ready" message. */
ready: Promise<void>;
/** Returns the PID of the worker process for a given group, or null if not found. */
getWorkerPid: (group: string) => number | null;
/** Sends a compute message to the worker responsible for the given sense. */
triggerCompute: (senseName: string) => void;
/**
* On-demand sense trigger — looks up the group for `senseName`, finds its worker,
* and sends a compute message. Throws if the sense is unknown.
*/
triggerSense: (senseName: string) => void;
/** Gracefully restart a group worker (wait for exit, then respawn). */
restartGroup: (group: string) => Promise<void>;
/** Reload config from a new NerveConfig, incrementally updating scheduler and workers.
* Note: any pending/throttled computes in the old scheduler are silently dropped on reload.
* In-flight state is not preserved across reloadConfig. */
reloadConfig: (newConfig: NerveConfig) => void;
/** Return daemon health info. */
getHealth: () => KernelHealth;
};
type WorkerEntry = {
group: string;
process: ChildProcess;
};
function resolveWorkerScript(): string {
const __filename = fileURLToPath(import.meta.url);
const __dir = dirname(__filename);
return join(__dir, "sense-worker.js");
}
function spawnWorker(
nerveRoot: string,
group: string,
workerScript: string,
stderrTail: { value: string },
): ChildProcess {
const child = fork(workerScript, ["--group", group, "--root", nerveRoot], {
stdio: ["ignore", "inherit", "pipe", "ipc"],
});
teeCapturedStderr(child, stderrTail);
// Prevent unhandled EPIPE when writing to a child whose IPC channel closed
child.on("error", (err) => {
if ((err as NodeJS.ErrnoException).code !== "EPIPE") {
console.error("[worker] error:", err.message);
}
});
return child;
}
function sendCompute(worker: ChildProcess, senseName: string): void {
// worker.connected is false when the IPC channel has been closed (e.g. worker crashed)
if (worker.connected === false) return;
const msg: ComputeMessage = { type: "compute", sense: senseName };
try {
worker.send(msg);
} catch {
// IPC channel closed between connected check and send
}
}
function sendShutdown(worker: ChildProcess): void {
if (worker.connected === false) return;
const msg: ShutdownMessage = { type: "shutdown" };
try {
worker.send(msg);
} catch {
// IPC channel closed between connected check and send
}
}
function groupForSense(config: NerveConfig, senseName: string): string | null {
const senseConfig = config.senses[senseName];
if (senseConfig === undefined) return null;
return senseConfig.group;
}
export type KernelOptions = {
workerScript?: string | null;
enableFileWatcher?: boolean;
/** Override the LogStore instance (useful for testing). */
logStore?: LogStore;
/**
* Unix socket path for the daemon IPC server (used by CLI to send trigger-workflow).
* When null, the IPC server is not started (e.g. during tests).
*/
ipcSocketPath?: string | null;
};
@@ -184,7 +99,6 @@ export function createKernel(
groups.add(senseConfig.group);
}
const workers = new Map<string, WorkerEntry>();
let stopped = false;
let scheduler: ReflexScheduler = null as unknown as ReflexScheduler;
@@ -194,10 +108,10 @@ export function createKernel(
});
let pendingReadyCount = groups.size > 0 ? groups.size : 0;
function sensesForGroup(group: string): string[] {
return Object.entries(config.senses)
.filter(([, sc]) => sc.group === group)
.map(([name]) => name);
function clearSchedulerForGroup(group: string): void {
for (const senseName of senseNamesInGroup(config, group)) {
scheduler.onComputeComplete(senseName);
}
}
function handleWorkerMessage(raw: unknown): void {
@@ -259,50 +173,17 @@ export function createKernel(
}
scheduler.onComputeComplete(msg.sense);
}
// health-response is handled externally by the caller; no action needed here
}
function startWorker(group: string): Promise<void> {
const stderrTail = { value: "" };
const child = spawnWorker(nerveRoot, group, workerScript, stderrTail);
let workerReadyResolve: (() => void) | undefined;
const workerReady = new Promise<void>((resolve) => {
workerReadyResolve = resolve;
});
child.on("message", (raw: unknown) => {
const result = parseWorkerMessage(raw);
if (result.ok && result.value.type === "ready") {
workerReadyResolve?.();
}
handleWorkerMessage(raw);
});
child.on("exit", (code, signal) => {
const summary = formatChildExitSummary(code, signal ?? null);
process.stderr.write(
`[kernel] worker for group "${group}" exited (${summary})${formatCapturedStderrTail(stderrTail.value)}\n`,
);
// Resolve ready in case the worker exits before sending ready (prevents hangs)
workerReadyResolve?.();
if (!stopped && code !== 0) {
process.stderr.write(`[kernel] respawning worker for group "${group}" in 1s\n`);
for (const senseName of sensesForGroup(group)) {
scheduler.onComputeComplete(senseName);
}
setTimeout(() => {
if (!stopped) {
startWorker(group);
}
}, 1000);
}
});
workers.set(group, { group, process: child });
return workerReady;
}
const senseWorkerPool = createSenseWorkerPool({
nerveRoot,
workerScript,
onWorkerMessage: handleWorkerMessage,
sensesForGroup: (group) => senseNamesInGroup(config, group),
onWorkerCrashed: clearSchedulerForGroup,
onBeforeGroupRestart: clearSchedulerForGroup,
isStopped: () => stopped,
});
function triggerFn(senseName: string): void {
const group = groupForSense(config, senseName);
@@ -310,12 +191,7 @@ export function createKernel(
process.stderr.write(`[kernel] triggerFn: unknown sense "${senseName}"\n`);
return;
}
const entry = workers.get(group);
if (entry === undefined) {
process.stderr.write(`[kernel] triggerFn: no worker for group "${group}"\n`);
return;
}
sendCompute(entry.process, senseName);
senseWorkerPool.sendCompute(group, senseName);
}
function triggerSense(senseName: string): void {
@@ -323,11 +199,10 @@ export function createKernel(
if (group === null) {
throw new Error(`Unknown sense: "${senseName}"`);
}
const entry = workers.get(group);
if (entry === undefined) {
if (!senseWorkerPool.hasWorkerForGroup(group)) {
throw new Error(`No worker running for group "${group}" (sense: "${senseName}")`);
}
sendCompute(entry.process, senseName);
senseWorkerPool.sendCompute(group, senseName);
}
scheduler = createReflexScheduler(config, bus, triggerFn, {
@@ -339,63 +214,13 @@ export function createKernel(
}
for (const group of groups) {
startWorker(group);
}
function waitForExit(child: ChildProcess, timeoutMs: number): Promise<void> {
return new Promise((resolve) => {
const timer = setTimeout(() => {
child.kill("SIGKILL");
resolve();
}, timeoutMs);
child.once("exit", () => {
clearTimeout(timer);
resolve();
});
});
}
// --- restartGroup: gracefully stop worker, then respawn and await ready ---
async function restartGroup(group: string): Promise<void> {
const entry = workers.get(group);
if (entry === undefined) return;
for (const senseName of sensesForGroup(group)) {
scheduler.onComputeComplete(senseName);
}
sendShutdown(entry.process);
await waitForExit(entry.process, 5000);
if (!stopped) {
await startWorker(group);
}
}
function collectGroups(cfg: NerveConfig): Set<string> {
const result = new Set<string>();
for (const sc of Object.values(cfg.senses)) {
result.add(sc.group);
}
return result;
}
function sensesForGroupInConfig(cfg: NerveConfig, group: string): Set<string> {
const result = new Set<string>();
for (const [name, sc] of Object.entries(cfg.senses)) {
if (sc.group === group) result.add(name);
}
return result;
senseWorkerPool.startWorker(group);
}
function removeStaleGroups(oldGroups: Set<string>, newGroups: Set<string>): void {
for (const g of oldGroups) {
if (newGroups.has(g)) continue;
const entry = workers.get(g);
if (entry !== undefined) {
sendShutdown(entry.process);
workers.delete(g);
}
senseWorkerPool.evictGroup(g);
groups.delete(g);
}
}
@@ -404,27 +229,25 @@ export function createKernel(
for (const g of newGroups) {
if (oldGroups.has(g)) continue;
groups.add(g);
if (!stopped) startWorker(g);
if (!stopped) {
senseWorkerPool.startWorker(g);
}
}
}
function reloadConfig(newConfig: NerveConfig): void {
const oldGroups = collectGroups(config);
const oldGroups = collectSenseGroups(config);
const oldConfig = config;
const oldWorkflows = config.workflows ?? {};
config = newConfig;
// Note: pending/throttled computes in the old scheduler are silently dropped here.
// In-flight state is not preserved across reloadConfig.
scheduler.stop();
scheduler = createReflexScheduler(config, bus, triggerFn, {
logStore,
});
// Update workflow concurrency/overflow config incrementally — no restart needed
workflowManager.updateConfig(newConfig);
const newWorkflows = newConfig.workflows ?? {};
// Drain + remove workers for deleted workflows
for (const workflowName of Object.keys(oldWorkflows)) {
if (!(workflowName in newWorkflows)) {
process.stderr.write(
@@ -439,20 +262,17 @@ export function createKernel(
}
}
const newGroups = collectGroups(newConfig);
const newGroups = collectSenseGroups(newConfig);
removeStaleGroups(oldGroups, newGroups);
addNewGroups(oldGroups, newGroups);
// Restart existing groups that gained new senses — the running worker process
// was spawned with the old config and will report "Unknown sense" for any newly
// added sense until it is restarted.
for (const g of newGroups) {
if (!oldGroups.has(g)) continue; // already handled by addNewGroups
const oldSenses = sensesForGroupInConfig(oldConfig, g);
const newSenses = sensesForGroupInConfig(newConfig, g);
if (!oldGroups.has(g)) continue;
const oldSenses = senseNamesInGroupAsSet(oldConfig, g);
const newSenses = senseNamesInGroupAsSet(newConfig, g);
const gained = [...newSenses].some((s) => !oldSenses.has(s));
if (gained) {
restartGroup(g).catch((e) => {
senseWorkerPool.restartGroup(g).catch((e) => {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[kernel] reloadConfig restartGroup error for "${g}": ${msg}\n`);
});
@@ -464,80 +284,28 @@ export function createKernel(
return {
uptime: Date.now() - startTime,
activeSenses: Object.keys(config.senses).length,
activeGroups: workers.size,
activeGroups: senseWorkerPool.activeGroupCount(),
pendingComputes: 0,
activeWorkflows: workflowManager.totalActiveCount(),
memoryUsage: process.memoryUsage(),
};
}
function handleSenseFileChange(senseName: string): void {
const sc = config.senses[senseName];
if (sc === undefined) return;
process.stderr.write(
`[kernel] sense file changed: "${senseName}", restarting group "${sc.group}"\n`,
);
logStore.append({
source: "system",
type: "sense_reload",
refId: senseName,
payload: null,
ts: Date.now(),
});
restartGroup(sc.group).catch((e) => {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[kernel] restartGroup error: ${msg}\n`);
});
}
function handleWorkflowFileChange(workflowName: string): void {
process.stderr.write(
`[kernel] workflow file changed: "${workflowName}", draining and respawning worker\n`,
);
logStore.append({
source: "system",
type: "workflow_reload",
refId: workflowName,
payload: null,
ts: Date.now(),
});
workflowManager.drainAndRespawn(workflowName).catch((e) => {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[kernel] drainAndRespawn error for "${workflowName}": ${msg}\n`);
});
}
function handleConfigFileChange(): void {
process.stderr.write("[kernel] nerve.yaml changed, reloading config\n");
logStore.append({
source: "system",
type: "config_reload",
refId: null,
payload: null,
ts: Date.now(),
});
try {
const raw = readFileSync(join(nerveRoot, "nerve.yaml"), "utf8");
const parseResult = parseNerveConfig(raw);
if (!parseResult.ok) {
process.stderr.write(
`[kernel] config parse error, keeping current config: ${parseResult.error.message}\n`,
);
return;
}
reloadConfig(parseResult.value);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[kernel] failed to read nerve.yaml, keeping current config: ${msg}\n`);
}
}
const fileWatchHandlers = createKernelFileWatchHandlers({
nerveRoot,
getConfig: () => config,
logStore,
workflowManager,
restartGroup: (group) => senseWorkerPool.restartGroup(group),
reloadConfig,
});
let fileWatcher: FileWatcher | null = null;
if (options.enableFileWatcher) {
fileWatcher = createFileWatcher(nerveRoot, (change) => {
if (change.kind === "sense") handleSenseFileChange(change.senseName);
if (change.kind === "config") handleConfigFileChange();
if (change.kind === "workflow") handleWorkflowFileChange(change.workflowName);
if (change.kind === "sense") fileWatchHandlers.onSenseFileChange(change.senseName);
if (change.kind === "config") fileWatchHandlers.onConfigFileChange();
if (change.kind === "workflow") fileWatchHandlers.onWorkflowFileChange(change.workflowName);
});
}
@@ -577,12 +345,7 @@ export function createKernel(
}
scheduler.stop();
await workflowManager.stop();
const exitPromises: Promise<void>[] = [];
for (const entry of workers.values()) {
sendShutdown(entry.process);
exitPromises.push(waitForExit(entry.process, 5000));
}
await Promise.all(exitPromises);
await senseWorkerPool.shutdownAll();
logStore.append({
source: "system",
type: "stop",
@@ -594,7 +357,7 @@ export function createKernel(
}
function getWorkerPid(group: string): number | null {
return workers.get(group)?.process.pid ?? null;
return senseWorkerPool.getWorkerPid(group);
}
const senseCount = Object.keys(config.senses).length;
@@ -610,7 +373,7 @@ export function createKernel(
getWorkerPid,
triggerCompute: triggerFn,
triggerSense,
restartGroup,
restartGroup: (group) => senseWorkerPool.restartGroup(group),
reloadConfig,
getHealth,
};
+211
View File
@@ -0,0 +1,211 @@
/**
* Sense worker pool — forked child processes per sense group (IPC lifecycle).
*/
import { fork } from "node:child_process";
import type { ChildProcess } from "node:child_process";
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import type { ComputeMessage, ShutdownMessage } from "./ipc.js";
import { parseWorkerMessage } from "./ipc.js";
import {
formatCapturedStderrTail,
formatChildExitSummary,
teeCapturedStderr,
} from "./worker-fork-support.js";
export function resolveWorkerScript(): string {
const __filename = fileURLToPath(import.meta.url);
const __dir = dirname(__filename);
return join(__dir, "sense-worker.js");
}
type WorkerEntry = {
group: string;
process: ChildProcess;
};
export type SenseWorkerPoolOptions = {
nerveRoot: string;
workerScript: string;
/** Invoked for every IPC message from a worker (including ready / signal / error). */
onWorkerMessage: (raw: unknown) => void;
/** Sense names in a group — used when clearing scheduler state on crash or restart. */
sensesForGroup: (group: string) => string[];
/**
* Called when a worker exits with non-zero code before scheduling a respawn
* (scheduler should release pending computes for senses in that group).
*/
onWorkerCrashed: (group: string) => void;
/**
* Called at the beginning of `restartGroup` before shutdown
* (same scheduler cleanup as crash path).
*/
onBeforeGroupRestart: (group: string) => void;
isStopped: () => boolean;
};
export type SenseWorkerPool = {
startWorker: (group: string) => Promise<void>;
restartGroup: (group: string) => Promise<void>;
/** Send shutdown and drop the entry without waiting (matches reloadConfig stale-group removal). */
evictGroup: (group: string) => void;
shutdownAll: () => Promise<void>;
sendCompute: (group: string, senseName: string) => void;
getWorkerPid: (group: string) => number | null;
hasWorkerForGroup: (group: string) => boolean;
activeGroupCount: () => number;
};
function spawnWorker(
nerveRoot: string,
group: string,
workerScript: string,
stderrTail: { value: string },
): ChildProcess {
const child = fork(workerScript, ["--group", group, "--root", nerveRoot], {
stdio: ["ignore", "inherit", "pipe", "ipc"],
});
teeCapturedStderr(child, stderrTail);
child.on("error", (err) => {
if ((err as NodeJS.ErrnoException).code !== "EPIPE") {
console.error("[worker] error:", err.message);
}
});
return child;
}
function sendComputeToProcess(worker: ChildProcess, senseName: string): void {
if (worker.connected === false) return;
const msg: ComputeMessage = { type: "compute", sense: senseName };
try {
worker.send(msg);
} catch {
// IPC channel closed between connected check and send
}
}
function sendShutdownToProcess(worker: ChildProcess): void {
if (worker.connected === false) return;
const msg: ShutdownMessage = { type: "shutdown" };
try {
worker.send(msg);
} catch {
// IPC channel closed between connected check and send
}
}
function waitForExit(child: ChildProcess, timeoutMs: number): Promise<void> {
return new Promise((resolve) => {
const timer = setTimeout(() => {
child.kill("SIGKILL");
resolve();
}, timeoutMs);
child.once("exit", () => {
clearTimeout(timer);
resolve();
});
});
}
export function createSenseWorkerPool(options: SenseWorkerPoolOptions): SenseWorkerPool {
const workers = new Map<string, WorkerEntry>();
function startWorker(group: string): Promise<void> {
const stderrTail = { value: "" };
const child = spawnWorker(options.nerveRoot, group, options.workerScript, stderrTail);
let workerReadyResolve: (() => void) | undefined;
const workerReady = new Promise<void>((resolve) => {
workerReadyResolve = resolve;
});
child.on("message", (raw: unknown) => {
const result = parseWorkerMessage(raw);
if (result.ok && result.value.type === "ready") {
workerReadyResolve?.();
}
options.onWorkerMessage(raw);
});
child.on("exit", (code, signal) => {
const summary = formatChildExitSummary(code, signal ?? null);
process.stderr.write(
`[kernel] worker for group "${group}" exited (${summary})${formatCapturedStderrTail(stderrTail.value)}\n`,
);
workerReadyResolve?.();
if (!options.isStopped() && code !== 0) {
process.stderr.write(`[kernel] respawning worker for group "${group}" in 1s\n`);
options.onWorkerCrashed(group);
setTimeout(() => {
if (!options.isStopped()) {
startWorker(group);
}
}, 1000);
}
});
workers.set(group, { group, process: child });
return workerReady;
}
async function restartGroup(group: string): Promise<void> {
const entry = workers.get(group);
if (entry === undefined) return;
options.onBeforeGroupRestart(group);
sendShutdownToProcess(entry.process);
await waitForExit(entry.process, 5000);
if (!options.isStopped()) {
await startWorker(group);
}
}
function evictGroup(group: string): void {
const entry = workers.get(group);
if (entry === undefined) return;
sendShutdownToProcess(entry.process);
workers.delete(group);
}
async function shutdownAll(): Promise<void> {
const exitPromises: Promise<void>[] = [];
for (const entry of workers.values()) {
sendShutdownToProcess(entry.process);
exitPromises.push(waitForExit(entry.process, 5000));
}
await Promise.all(exitPromises);
}
function sendCompute(group: string, senseName: string): void {
const entry = workers.get(group);
if (entry === undefined) return;
sendComputeToProcess(entry.process, senseName);
}
function getWorkerPid(group: string): number | null {
return workers.get(group)?.process.pid ?? null;
}
function hasWorkerForGroup(group: string): boolean {
return workers.has(group);
}
function activeGroupCount(): number {
return workers.size;
}
return {
startWorker,
restartGroup,
evictGroup,
shutdownAll,
sendCompute,
getWorkerPid,
hasWorkerForGroup,
activeGroupCount,
};
}