feat(cli,daemon): Phase 4 — Process Manager & Isolation #11
@@ -10,5 +10,9 @@
|
||||
"types": "dist/index.d.ts",
|
||||
"scripts": {
|
||||
"build": "tsup"
|
||||
},
|
||||
"dependencies": {
|
||||
"@uncaged/nerve-core": "workspace:*",
|
||||
"@uncaged/nerve-daemon": "workspace:*"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,104 @@
|
||||
#!/usr/bin/env node
|
||||
|
||||
import { readFileSync } from "node:fs";
|
||||
import { homedir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
|
||||
import { parseNerveConfig } from "@uncaged/nerve-core";
|
||||
import { createKernel } from "@uncaged/nerve-daemon";
|
||||
|
||||
const DEFAULT_NERVE_ROOT = join(homedir(), ".uncaged-nerve");
|
||||
|
||||
function parseArgs(argv: string[]): { root: string } {
|
||||
// Skip argv[0] (node), argv[1] (script), argv[2] ('start' subcommand)
|
||||
const args = argv.slice(3);
|
||||
let root = DEFAULT_NERVE_ROOT;
|
||||
|
||||
for (let i = 0; i < args.length; i++) {
|
||||
if (args[i] === "--root" && i + 1 < args.length) {
|
||||
root = args[i + 1];
|
||||
i++;
|
||||
} else if (!args[i].startsWith("-")) {
|
||||
process.stderr.write("Usage: nerve start [--root <path>]\n");
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
return { root };
|
||||
}
|
||||
|
||||
function readConfig(nerveRoot: string): ReturnType<typeof parseNerveConfig> {
|
||||
const configPath = join(nerveRoot, "nerve.yaml");
|
||||
let raw: string;
|
||||
try {
|
||||
raw = readFileSync(configPath, "utf8");
|
||||
} catch (e) {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
return { ok: false, error: new Error(`Cannot read ${configPath}: ${msg}`) };
|
||||
}
|
||||
return parseNerveConfig(raw);
|
||||
}
|
||||
|
||||
async function main(): Promise<void> {
|
||||
const subcommand = process.argv[2];
|
||||
|
||||
if (subcommand !== "start") {
|
||||
process.stderr.write("Usage: nerve start [--root <path>]\n");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const { root } = parseArgs(process.argv);
|
||||
|
||||
const configResult = readConfig(root);
|
||||
if (!configResult.ok) {
|
||||
process.stderr.write(`[nerve] Config error: ${configResult.error.message}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const config = configResult.value;
|
||||
|
||||
const kernel = createKernel(config, root);
|
||||
|
||||
process.stderr.write(
|
||||
`[nerve] Starting — ${kernel.groups.size} group(s), ${kernel.senseCount} sense(s)\n`,
|
||||
);
|
||||
|
||||
for (const group of kernel.groups) {
|
||||
const groupSenses = Object.entries(config.senses)
|
||||
.filter(([, sc]) => sc.group === group)
|
||||
.map(([name]) => name);
|
||||
process.stderr.write(`[nerve] group "${group}": ${groupSenses.join(", ")}\n`);
|
||||
}
|
||||
|
||||
let shuttingDown = false;
|
||||
|
||||
async function shutdown(): Promise<void> {
|
||||
if (shuttingDown) return;
|
||||
shuttingDown = true;
|
||||
process.stderr.write("\n[nerve] Shutting down…\n");
|
||||
await kernel.stop();
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
process.on("SIGINT", () => {
|
||||
shutdown().catch((e: unknown) => {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`[nerve] Shutdown error: ${msg}\n`);
|
||||
process.exit(1);
|
||||
});
|
||||
});
|
||||
|
||||
process.on("SIGTERM", () => {
|
||||
shutdown().catch((e: unknown) => {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`[nerve] Shutdown error: ${msg}\n`);
|
||||
process.exit(1);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
main().catch((e: unknown) => {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`[nerve] Fatal error: ${msg}\n`);
|
||||
process.exit(1);
|
||||
});
|
||||
@@ -1 +1,2 @@
|
||||
// TODO: implement
|
||||
export { createKernel } from "@uncaged/nerve-daemon";
|
||||
export type { Kernel } from "@uncaged/nerve-daemon";
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { defineConfig } from "tsup";
|
||||
|
||||
export default defineConfig({
|
||||
entry: ["src/index.ts"],
|
||||
entry: ["src/index.ts", "src/cli.ts"],
|
||||
format: ["esm"],
|
||||
dts: true,
|
||||
clean: true,
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
/**
|
||||
* Mock sense worker that implements the IPC protocol for integration testing.
|
||||
*
|
||||
* This file intentionally uses the .mjs extension (rather than .ts) because it
|
||||
* is spawned as a child process via fork() at runtime and must execute without
|
||||
* any TypeScript compilation step. Node.js can run .mjs files directly as
|
||||
* ESModules, whereas .ts files would require ts-node/tsx to be available in the
|
||||
* child process environment.
|
||||
*
|
||||
* Behaviour:
|
||||
* - Sends { type: "ready" } on startup
|
||||
* - On { type: "compute", sense } → sends back { type: "signal", sense, payload: 42 }
|
||||
* - On { type: "shutdown" } → exits cleanly with code 0
|
||||
*/
|
||||
|
||||
process.send({ type: "ready" });
|
||||
|
||||
process.on("message", (msg) => {
|
||||
if (!msg || typeof msg !== "object") return;
|
||||
|
||||
if (msg.type === "shutdown") {
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
if (msg.type === "compute" && typeof msg.sense === "string") {
|
||||
process.send({ type: "signal", sense: msg.sense, payload: 42 });
|
||||
}
|
||||
});
|
||||
@@ -0,0 +1,207 @@
|
||||
/**
|
||||
* Integration tests for the Kernel / Process Manager.
|
||||
*
|
||||
* These tests use REAL child processes via a mock worker fixture that
|
||||
* implements the IPC protocol (fixtures/mock-worker.mjs). No build
|
||||
* artifacts are required.
|
||||
*/
|
||||
|
||||
import { dirname, join } from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
|
||||
import type { Signal } from "@uncaged/nerve-core";
|
||||
import type { NerveConfig } from "@uncaged/nerve-core";
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
|
||||
import { createKernel } from "../kernel.js";
|
||||
import type { Kernel } from "../kernel.js";
|
||||
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dir = dirname(__filename);
|
||||
const MOCK_WORKER = join(__dir, "fixtures", "mock-worker.mjs");
|
||||
|
||||
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
return {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
/** Poll until predicate returns true, with a timeout. */
|
||||
async function pollUntil(
|
||||
predicate: () => boolean,
|
||||
timeoutMs: number,
|
||||
intervalMs = 50,
|
||||
): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const timer = setTimeout(
|
||||
() => reject(new Error(`pollUntil timed out after ${timeoutMs}ms`)),
|
||||
timeoutMs,
|
||||
);
|
||||
const check = setInterval(() => {
|
||||
if (predicate()) {
|
||||
clearTimeout(timer);
|
||||
clearInterval(check);
|
||||
resolve();
|
||||
}
|
||||
}, intervalMs);
|
||||
});
|
||||
}
|
||||
|
||||
describe("kernel integration — real child processes", () => {
|
||||
let kernel: Kernel | null = null;
|
||||
|
||||
afterEach(async () => {
|
||||
if (kernel !== null) {
|
||||
await kernel.stop();
|
||||
kernel = null;
|
||||
}
|
||||
});
|
||||
|
||||
it("returns correct groups and senseCount", () => {
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"disk-io": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
});
|
||||
kernel = createKernel(config, "/tmp/nerve-integration-test", {
|
||||
workerScript: MOCK_WORKER,
|
||||
});
|
||||
|
||||
expect(kernel.groups.size).toBe(2);
|
||||
expect(kernel.groups.has("system")).toBe(true);
|
||||
expect(kernel.groups.has("network")).toBe(true);
|
||||
expect(kernel.senseCount).toBe(3);
|
||||
});
|
||||
|
||||
it("workers start and respond to compute messages with signals", async () => {
|
||||
const config = makeConfig();
|
||||
kernel = createKernel(config, "/tmp/nerve-integration-test", {
|
||||
workerScript: MOCK_WORKER,
|
||||
});
|
||||
|
||||
// Wait for all workers to be ready (event-based, not fixed delay)
|
||||
await kernel.ready;
|
||||
|
||||
// Subscribe to the bus before triggering compute
|
||||
const received: Signal[] = [];
|
||||
const unsub = kernel.bus.subscribe((signal) => {
|
||||
received.push(signal);
|
||||
});
|
||||
|
||||
// Trigger a compute for "cpu-usage" through the kernel's triggerCompute
|
||||
kernel.triggerCompute("cpu-usage");
|
||||
|
||||
// Poll until a signal arrives on the bus (event-driven, no fixed delay)
|
||||
await pollUntil(() => received.length > 0, 3000);
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
|
||||
|
||||
unsub();
|
||||
}, 10_000);
|
||||
|
||||
it("graceful shutdown: stop() resolves after all workers exit", async () => {
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
});
|
||||
kernel = createKernel(config, "/tmp/nerve-integration-test", {
|
||||
workerScript: MOCK_WORKER,
|
||||
});
|
||||
|
||||
// Wait for all workers to be ready (event-based)
|
||||
await kernel.ready;
|
||||
|
||||
const stopPromise = kernel.stop();
|
||||
kernel = null;
|
||||
|
||||
// stop() should resolve within 5s (our shutdown timeout)
|
||||
await expect(stopPromise).resolves.toBeUndefined();
|
||||
}, 10_000);
|
||||
|
||||
it("compute round-trip: worker receives compute and sends signal back through bus", async () => {
|
||||
const config = makeConfig();
|
||||
kernel = createKernel(config, "/tmp/nerve-integration-test", {
|
||||
workerScript: MOCK_WORKER,
|
||||
});
|
||||
|
||||
// Wait for all workers to be ready (event-based, not fixed delay)
|
||||
await kernel.ready;
|
||||
|
||||
const received: Signal[] = [];
|
||||
const unsub = kernel.bus.subscribe((signal) => {
|
||||
received.push(signal);
|
||||
});
|
||||
|
||||
// Trigger compute via the kernel — the kernel sends IPC to the worker,
|
||||
// the mock worker responds with a signal message, and the kernel routes it to the bus.
|
||||
kernel.triggerCompute("cpu-usage");
|
||||
|
||||
// Poll for the signal on the bus (no fixed delay)
|
||||
await pollUntil(() => received.length > 0, 3000);
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
|
||||
|
||||
unsub();
|
||||
}, 10_000);
|
||||
|
||||
it("crash recovery: kernel respawns worker after unexpected exit and new worker is functional", async () => {
|
||||
const config = makeConfig();
|
||||
kernel = createKernel(config, "/tmp/nerve-integration-test", {
|
||||
workerScript: MOCK_WORKER,
|
||||
});
|
||||
|
||||
// Wait for initial worker to be ready (event-based)
|
||||
await kernel.ready;
|
||||
|
||||
const originalPid = kernel.getWorkerPid("system");
|
||||
expect(originalPid).not.toBeNull();
|
||||
expect(originalPid).toBeGreaterThan(0);
|
||||
|
||||
// Kill the kernel's own worker to simulate a crash (SIGKILL, code != 0)
|
||||
process.kill(originalPid as number, "SIGKILL");
|
||||
|
||||
// Poll until the kernel respawns and registers a new worker with a different PID
|
||||
// (respawn delay is 1s, then fork(), then workers.set())
|
||||
await pollUntil(() => {
|
||||
const pid = kernel?.getWorkerPid("system");
|
||||
return pid !== null && pid !== originalPid;
|
||||
}, 5000);
|
||||
|
||||
const newPid = kernel.getWorkerPid("system");
|
||||
expect(newPid).not.toBeNull();
|
||||
expect(newPid).not.toBe(originalPid);
|
||||
|
||||
// Wait a bit for the new worker to send its "ready" message and be fully up.
|
||||
// Poll until the new worker responds to a compute message on the bus.
|
||||
const postRespawnSignals: Signal[] = [];
|
||||
const unsub = kernel.bus.subscribe((signal) => {
|
||||
postRespawnSignals.push(signal);
|
||||
});
|
||||
|
||||
// Trigger compute through the kernel to the new worker
|
||||
kernel.triggerCompute("cpu-usage");
|
||||
|
||||
// Poll for the signal — verifies the new worker is fully functional
|
||||
await pollUntil(() => postRespawnSignals.length > 0, 5000);
|
||||
|
||||
expect(postRespawnSignals).toHaveLength(1);
|
||||
expect(postRespawnSignals[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
|
||||
|
||||
unsub();
|
||||
|
||||
// Kernel should still stop gracefully after respawn
|
||||
await kernel.stop();
|
||||
kernel = null;
|
||||
}, 15_000);
|
||||
});
|
||||
@@ -8,6 +8,8 @@ export type {
|
||||
WorkerToParentMessage,
|
||||
} from "./ipc.js";
|
||||
|
||||
export type { SignalBus, SignalHandler, Unsubscribe } from "./signal-bus.js";
|
||||
|
||||
export type { ComputeFn, DrizzleDB, PeerMap, SenseRuntime } from "./sense-runtime.js";
|
||||
|
||||
export {
|
||||
@@ -17,3 +19,6 @@ export {
|
||||
loadComputeFn,
|
||||
executeCompute,
|
||||
} from "./sense-runtime.js";
|
||||
|
||||
export { createKernel } from "./kernel.js";
|
||||
export type { Kernel, KernelOptions } from "./kernel.js";
|
||||
|
||||
@@ -26,6 +26,15 @@ import type { SignalBus } from "./signal-bus.js";
|
||||
|
||||
export type Kernel = {
|
||||
stop: () => Promise<void>;
|
||||
groups: Set<string>;
|
||||
senseCount: number;
|
||||
bus: SignalBus;
|
||||
/** Resolves when all workers have sent their initial "ready" message. */
|
||||
ready: Promise<void>;
|
||||
/** Returns the PID of the worker process for a given group, or null if not found. */
|
||||
getWorkerPid: (group: string) => number | null;
|
||||
/** Sends a compute message to the worker responsible for the given sense. */
|
||||
triggerCompute: (senseName: string) => void;
|
||||
};
|
||||
|
||||
type WorkerEntry = {
|
||||
@@ -46,13 +55,25 @@ function spawnWorker(nerveRoot: string, group: string, workerScript: string): Ch
|
||||
}
|
||||
|
||||
function sendCompute(worker: ChildProcess, senseName: string): void {
|
||||
// worker.connected is false when the IPC channel has been closed (e.g. worker crashed)
|
||||
if (worker.connected === false) return;
|
||||
const msg: ComputeMessage = { type: "compute", sense: senseName };
|
||||
worker.send(msg);
|
||||
try {
|
||||
worker.send(msg);
|
||||
} catch {
|
||||
// IPC channel closed between connected check and send
|
||||
}
|
||||
}
|
||||
|
||||
function sendShutdown(worker: ChildProcess): void {
|
||||
// worker.connected is false when the IPC channel has been closed (e.g. worker crashed)
|
||||
if (worker.connected === false) return;
|
||||
const msg: ShutdownMessage = { type: "shutdown" };
|
||||
worker.send(msg);
|
||||
try {
|
||||
worker.send(msg);
|
||||
} catch {
|
||||
// IPC channel closed between connected check and send
|
||||
}
|
||||
}
|
||||
|
||||
function groupForSense(config: NerveConfig, senseName: string): string | null {
|
||||
@@ -61,9 +82,21 @@ function groupForSense(config: NerveConfig, senseName: string): string | null {
|
||||
return senseConfig.group;
|
||||
}
|
||||
|
||||
export function createKernel(config: NerveConfig, nerveRoot: string): Kernel {
|
||||
export type KernelOptions = {
|
||||
workerScript: string;
|
||||
};
|
||||
|
||||
function defaultKernelOptions(): KernelOptions {
|
||||
return { workerScript: resolveWorkerScript() };
|
||||
}
|
||||
|
||||
export function createKernel(
|
||||
config: NerveConfig,
|
||||
nerveRoot: string,
|
||||
options: KernelOptions = defaultKernelOptions(),
|
||||
): Kernel {
|
||||
const bus: SignalBus = createSignalBus();
|
||||
const workerScript = resolveWorkerScript();
|
||||
const workerScript = options.workerScript;
|
||||
|
||||
// Signal ID counter is instance-scoped (fix #2)
|
||||
let _signalIdCounter = 0;
|
||||
@@ -79,6 +112,20 @@ export function createKernel(config: NerveConfig, nerveRoot: string): Kernel {
|
||||
|
||||
const workers = new Map<string, WorkerEntry>();
|
||||
let stopped = false;
|
||||
// eslint-disable-next-line prefer-const
|
||||
let scheduler: ReflexScheduler = null as unknown as ReflexScheduler;
|
||||
|
||||
let readyResolve: () => void;
|
||||
const ready = new Promise<void>((resolve) => {
|
||||
readyResolve = resolve;
|
||||
});
|
||||
let pendingReadyCount = groups.size > 0 ? groups.size : 0;
|
||||
|
||||
function sensesForGroup(group: string): string[] {
|
||||
return Object.entries(config.senses)
|
||||
.filter(([, sc]) => sc.group === group)
|
||||
.map(([name]) => name);
|
||||
}
|
||||
|
||||
function handleWorkerMessage(raw: unknown): void {
|
||||
const result = parseWorkerMessage(raw);
|
||||
@@ -89,6 +136,10 @@ export function createKernel(config: NerveConfig, nerveRoot: string): Kernel {
|
||||
const msg = result.value;
|
||||
|
||||
if (msg.type === "ready") {
|
||||
pendingReadyCount -= 1;
|
||||
if (pendingReadyCount <= 0) {
|
||||
readyResolve();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -122,6 +173,10 @@ export function createKernel(config: NerveConfig, nerveRoot: string): Kernel {
|
||||
// Respawn on unexpected exit (fix #4)
|
||||
if (!stopped && code !== 0) {
|
||||
process.stderr.write(`[kernel] respawning worker for group "${group}" in 1s\n`);
|
||||
// Clear any stuck in-flight state for all senses in this group
|
||||
for (const senseName of sensesForGroup(group)) {
|
||||
scheduler.onComputeComplete(senseName);
|
||||
}
|
||||
setTimeout(() => {
|
||||
if (!stopped) {
|
||||
startWorker(group);
|
||||
@@ -147,7 +202,11 @@ export function createKernel(config: NerveConfig, nerveRoot: string): Kernel {
|
||||
sendCompute(entry.process, senseName);
|
||||
}
|
||||
|
||||
const scheduler: ReflexScheduler = createReflexScheduler(config, bus, triggerFn);
|
||||
scheduler = createReflexScheduler(config, bus, triggerFn);
|
||||
|
||||
if (groups.size === 0) {
|
||||
readyResolve();
|
||||
}
|
||||
|
||||
for (const group of groups) {
|
||||
startWorker(group);
|
||||
@@ -178,5 +237,11 @@ export function createKernel(config: NerveConfig, nerveRoot: string): Kernel {
|
||||
await Promise.all(exitPromises);
|
||||
}
|
||||
|
||||
return { stop };
|
||||
function getWorkerPid(group: string): number | null {
|
||||
return workers.get(group)?.process.pid ?? null;
|
||||
}
|
||||
|
||||
const senseCount = Object.keys(config.senses).length;
|
||||
|
||||
return { stop, groups, senseCount, bus, ready, getWorkerPid, triggerCompute: triggerFn };
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { defineConfig } from "tsup";
|
||||
|
||||
export default defineConfig({
|
||||
entry: ["src/index.ts"],
|
||||
entry: ["src/index.ts", "src/sense-worker.ts"],
|
||||
format: ["esm"],
|
||||
dts: true,
|
||||
clean: true,
|
||||
|
||||
Generated
+8
-1
@@ -18,7 +18,14 @@ importers:
|
||||
specifier: ^5.5.0
|
||||
version: 5.9.3
|
||||
|
||||
packages/cli: {}
|
||||
packages/cli:
|
||||
dependencies:
|
||||
'@uncaged/nerve-core':
|
||||
specifier: workspace:*
|
||||
version: link:../core
|
||||
'@uncaged/nerve-daemon':
|
||||
specifier: workspace:*
|
||||
version: link:../daemon
|
||||
|
||||
packages/core:
|
||||
dependencies:
|
||||
|
||||
Reference in New Issue
Block a user