feat: Phase 6 — hot reload, error isolation, grace period, nerve-health
- file-watcher.ts: watch nerveRoot for .ts and nerve.yaml changes
- kernel.ts: restartGroup(), reloadConfig(), getHealth(), auto-respawn on crash
- sense-worker.ts: compute try/catch error isolation, grace_period hard kill
- ipc.ts: new message types for health, restart, reload
- examples/senses/nerve-health.ts: built-in daemon health sense
- Integration tests for hot reload, error isolation, grace period
小橘 🍊(NEKO Team)
This commit is contained in:
@@ -0,0 +1,70 @@
|
||||
/**
|
||||
* nerve-health — built-in sense that reports daemon health via IPC.
|
||||
*
|
||||
* When running inside a sense worker, this compute function sends a
|
||||
* "health-request" to the parent kernel process and returns the
|
||||
* health-response payload as its signal.
|
||||
*
|
||||
* Usage in nerve.yaml:
|
||||
* senses:
|
||||
* nerve-health:
|
||||
* group: internal
|
||||
* throttle: 30s
|
||||
* timeout: 5s
|
||||
*
|
||||
* reflexes:
|
||||
* - sense: nerve-health
|
||||
* interval: 30s
|
||||
*/
|
||||
|
||||
export type NerveHealth = {
|
||||
uptime: number;
|
||||
activeSenses: string[];
|
||||
inFlightCount: number;
|
||||
workerMemoryUsage: NodeJS.MemoryUsage;
|
||||
workerUptime: number;
|
||||
};
|
||||
|
||||
export async function compute(): Promise<NerveHealth | null> {
|
||||
const health = await requestHealthFromKernel();
|
||||
return health;
|
||||
}
|
||||
|
||||
function requestHealthFromKernel(): Promise<NerveHealth> {
|
||||
return new Promise((resolve, reject) => {
|
||||
if (!process.send) {
|
||||
reject(new Error("nerve-health: not running as a child process with IPC"));
|
||||
return;
|
||||
}
|
||||
|
||||
const timeout = setTimeout(() => {
|
||||
process.removeListener("message", onMessage);
|
||||
reject(new Error("nerve-health: health-request timed out"));
|
||||
}, 5000);
|
||||
|
||||
function onMessage(msg: unknown): void {
|
||||
if (
|
||||
msg !== null &&
|
||||
typeof msg === "object" &&
|
||||
(msg as Record<string, unknown>).type === "health-response"
|
||||
) {
|
||||
clearTimeout(timeout);
|
||||
process.removeListener("message", onMessage);
|
||||
const resp = msg as {
|
||||
senses: string[];
|
||||
inFlightCount: number;
|
||||
};
|
||||
resolve({
|
||||
uptime: process.uptime(),
|
||||
activeSenses: resp.senses,
|
||||
inFlightCount: resp.inFlightCount,
|
||||
workerMemoryUsage: process.memoryUsage(),
|
||||
workerUptime: process.uptime(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
process.on("message", onMessage);
|
||||
process.send({ type: "health-request" });
|
||||
});
|
||||
}
|
||||
@@ -0,0 +1,126 @@
|
||||
/**
|
||||
* Unit tests for file-watcher.ts
|
||||
*/
|
||||
|
||||
import { mkdirSync, mkdtempSync, writeFileSync } from "node:fs";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
|
||||
import { createFileWatcher } from "../file-watcher.js";
|
||||
import type { FileChange, FileWatcher } from "../file-watcher.js";
|
||||
|
||||
function makeTempNerveRoot(): string {
|
||||
const dir = mkdtempSync(join(tmpdir(), "nerve-fw-test-"));
|
||||
mkdirSync(join(dir, "senses", "cpu-usage"), { recursive: true });
|
||||
writeFileSync(join(dir, "nerve.yaml"), "senses: {}\nreflexes: []\n");
|
||||
writeFileSync(
|
||||
join(dir, "senses", "cpu-usage", "index.ts"),
|
||||
"export async function compute() { return null; }",
|
||||
);
|
||||
return dir;
|
||||
}
|
||||
|
||||
async function waitFor(
|
||||
predicate: () => boolean,
|
||||
timeoutMs: number,
|
||||
intervalMs = 50,
|
||||
): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const timer = setTimeout(
|
||||
() => reject(new Error(`waitFor timed out after ${timeoutMs}ms`)),
|
||||
timeoutMs,
|
||||
);
|
||||
const check = setInterval(() => {
|
||||
if (predicate()) {
|
||||
clearTimeout(timer);
|
||||
clearInterval(check);
|
||||
resolve();
|
||||
}
|
||||
}, intervalMs);
|
||||
});
|
||||
}
|
||||
|
||||
describe("createFileWatcher", () => {
|
||||
let watcher: FileWatcher | null = null;
|
||||
|
||||
afterEach(() => {
|
||||
if (watcher !== null) {
|
||||
watcher.close();
|
||||
watcher = null;
|
||||
}
|
||||
});
|
||||
|
||||
it("detects nerve.yaml changes", async () => {
|
||||
const root = makeTempNerveRoot();
|
||||
const changes: FileChange[] = [];
|
||||
|
||||
watcher = createFileWatcher(root, (change) => changes.push(change), 50);
|
||||
|
||||
// Wait a bit for watcher to initialize, then modify nerve.yaml
|
||||
await new Promise((r) => setTimeout(r, 100));
|
||||
writeFileSync(join(root, "nerve.yaml"), "senses: {}\nreflexes: []\n# changed\n");
|
||||
|
||||
await waitFor(() => changes.length > 0, 3000);
|
||||
|
||||
expect(changes.length).toBeGreaterThanOrEqual(1);
|
||||
expect(changes.some((c) => c.kind === "config")).toBe(true);
|
||||
}, 10_000);
|
||||
|
||||
it("detects sense .ts file changes", async () => {
|
||||
const root = makeTempNerveRoot();
|
||||
const changes: FileChange[] = [];
|
||||
|
||||
watcher = createFileWatcher(root, (change) => changes.push(change), 50);
|
||||
|
||||
await new Promise((r) => setTimeout(r, 100));
|
||||
writeFileSync(
|
||||
join(root, "senses", "cpu-usage", "index.ts"),
|
||||
"export async function compute() { return 42; }",
|
||||
);
|
||||
|
||||
await waitFor(() => changes.length > 0, 3000);
|
||||
|
||||
expect(changes.length).toBeGreaterThanOrEqual(1);
|
||||
const senseChanges = changes.filter((c) => c.kind === "sense");
|
||||
expect(senseChanges.length).toBeGreaterThanOrEqual(1);
|
||||
expect((senseChanges[0] as { senseName: string }).senseName).toBe("cpu-usage");
|
||||
}, 10_000);
|
||||
|
||||
it("close() stops the watcher", async () => {
|
||||
const root = makeTempNerveRoot();
|
||||
const changes: FileChange[] = [];
|
||||
|
||||
watcher = createFileWatcher(root, (change) => changes.push(change), 50);
|
||||
|
||||
await new Promise((r) => setTimeout(r, 100));
|
||||
watcher.close();
|
||||
watcher = null;
|
||||
|
||||
writeFileSync(join(root, "nerve.yaml"), "senses: {}\nreflexes: []\n# after close\n");
|
||||
|
||||
// Wait and verify no changes were captured
|
||||
await new Promise((r) => setTimeout(r, 500));
|
||||
expect(changes.length).toBe(0);
|
||||
}, 5_000);
|
||||
|
||||
it("debounces rapid changes", async () => {
|
||||
const root = makeTempNerveRoot();
|
||||
const changes: FileChange[] = [];
|
||||
|
||||
watcher = createFileWatcher(root, (change) => changes.push(change), 200);
|
||||
|
||||
await new Promise((r) => setTimeout(r, 100));
|
||||
|
||||
// Write rapidly
|
||||
for (let i = 0; i < 5; i++) {
|
||||
writeFileSync(join(root, "nerve.yaml"), `senses: {}\nreflexes: []\n# v${i}\n`);
|
||||
}
|
||||
|
||||
await waitFor(() => changes.length > 0, 3000);
|
||||
|
||||
// With debounce, should see only 1 change (not 5)
|
||||
expect(changes.length).toBe(1);
|
||||
}, 10_000);
|
||||
});
|
||||
@@ -0,0 +1,36 @@
|
||||
/**
|
||||
* Mock worker that crashes on first compute, then works normally after respawn.
|
||||
* Uses a marker file to track if it has already crashed.
|
||||
*
|
||||
* First invocation: sends ready, then exits with code 1 on first compute.
|
||||
* Subsequent invocations (after respawn): works like normal mock-worker.
|
||||
*/
|
||||
|
||||
import { existsSync, unlinkSync, writeFileSync } from "node:fs";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
|
||||
const markerFile = join(tmpdir(), `nerve-crash-once-${process.ppid}.marker`);
|
||||
|
||||
process.send({ type: "ready" });
|
||||
|
||||
const hasCrashed = existsSync(markerFile);
|
||||
|
||||
process.on("message", (msg) => {
|
||||
if (!msg || typeof msg !== "object") return;
|
||||
|
||||
if (msg.type === "shutdown") {
|
||||
try {
|
||||
unlinkSync(markerFile);
|
||||
} catch {}
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
if (msg.type === "compute" && typeof msg.sense === "string") {
|
||||
if (!hasCrashed) {
|
||||
writeFileSync(markerFile, "crashed", "utf8");
|
||||
process.exit(1);
|
||||
}
|
||||
process.send({ type: "signal", sense: msg.sense, payload: 42 });
|
||||
}
|
||||
});
|
||||
@@ -0,0 +1,18 @@
|
||||
/**
|
||||
* Mock worker that responds to compute with an error message.
|
||||
* Used for error isolation integration tests.
|
||||
*/
|
||||
|
||||
process.send({ type: "ready" });
|
||||
|
||||
process.on("message", (msg) => {
|
||||
if (!msg || typeof msg !== "object") return;
|
||||
|
||||
if (msg.type === "shutdown") {
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
if (msg.type === "compute" && typeof msg.sense === "string") {
|
||||
process.send({ type: "error", sense: msg.sense, error: "simulated compute error" });
|
||||
}
|
||||
});
|
||||
@@ -0,0 +1,24 @@
|
||||
/**
|
||||
* Mock worker that takes a long time to respond to compute messages.
|
||||
* Used for grace period / timeout integration tests.
|
||||
*
|
||||
* On compute: waits 10 seconds before responding (simulates hung compute).
|
||||
* On shutdown: exits cleanly.
|
||||
*/
|
||||
|
||||
process.send({ type: "ready" });
|
||||
|
||||
process.on("message", (msg) => {
|
||||
if (!msg || typeof msg !== "object") return;
|
||||
|
||||
if (msg.type === "shutdown") {
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
if (msg.type === "compute" && typeof msg.sense === "string") {
|
||||
// Intentionally slow — will be killed by grace period
|
||||
setTimeout(() => {
|
||||
process.send({ type: "signal", sense: msg.sense, payload: "late" });
|
||||
}, 10_000);
|
||||
}
|
||||
});
|
||||
@@ -0,0 +1,237 @@
|
||||
/**
|
||||
* Unit tests for kernel Phase 6 enhancements — restartGroup, reloadConfig, getHealth.
|
||||
* Uses mocked child_process.fork.
|
||||
*/
|
||||
|
||||
import { EventEmitter } from "node:events";
|
||||
|
||||
import type { NerveConfig } from "@uncaged/nerve-core";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Mock child_process.fork
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const mockChildren: MockChild[] = [];
|
||||
|
||||
type MockChild = EventEmitter & {
|
||||
send: ReturnType<typeof vi.fn>;
|
||||
kill: ReturnType<typeof vi.fn>;
|
||||
pid: number;
|
||||
connected: boolean;
|
||||
};
|
||||
|
||||
function makeMockChild(pid = 1): MockChild {
|
||||
const child = new EventEmitter() as MockChild;
|
||||
child.connected = true;
|
||||
child.send = vi.fn((msg: unknown) => {
|
||||
if (
|
||||
msg !== null &&
|
||||
typeof msg === "object" &&
|
||||
(msg as Record<string, unknown>).type === "shutdown"
|
||||
) {
|
||||
setImmediate(() => {
|
||||
child.connected = false;
|
||||
child.emit("exit", 0, null);
|
||||
});
|
||||
}
|
||||
});
|
||||
child.kill = vi.fn((_signal?: string) => {
|
||||
child.connected = false;
|
||||
child.emit("exit", null, _signal ?? "SIGKILL");
|
||||
});
|
||||
child.pid = pid;
|
||||
return child;
|
||||
}
|
||||
|
||||
vi.mock("node:child_process", () => ({
|
||||
fork: vi.fn((_script: string, _args: string[], _opts: unknown) => {
|
||||
const child = makeMockChild(mockChildren.length + 100);
|
||||
mockChildren.push(child);
|
||||
return child;
|
||||
}),
|
||||
}));
|
||||
|
||||
// Must also mock file-watcher since kernel imports it
|
||||
vi.mock("../file-watcher.js", () => ({
|
||||
createFileWatcher: vi.fn(() => ({ close: vi.fn() })),
|
||||
}));
|
||||
|
||||
const { createKernel } = await import("../kernel.js");
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
return {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("kernel — getHealth", () => {
|
||||
beforeEach(() => {
|
||||
mockChildren.length = 0;
|
||||
vi.useFakeTimers({ shouldAdvanceTime: true });
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("returns correct health shape", async () => {
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"disk-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
});
|
||||
const kernel = createKernel(config, "/tmp/nerve-test");
|
||||
|
||||
const health = kernel.getHealth();
|
||||
expect(health.activeSenses).toBe(3);
|
||||
expect(health.activeGroups).toBe(2);
|
||||
expect(health.uptime).toBeGreaterThanOrEqual(0);
|
||||
expect(health.memoryUsage).toBeDefined();
|
||||
expect(typeof health.memoryUsage.heapUsed).toBe("number");
|
||||
|
||||
await kernel.stop();
|
||||
});
|
||||
});
|
||||
|
||||
describe("kernel — restartGroup", () => {
|
||||
beforeEach(() => {
|
||||
mockChildren.length = 0;
|
||||
vi.useFakeTimers({ shouldAdvanceTime: true });
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("sends shutdown to old worker and spawns new one", async () => {
|
||||
const config = makeConfig();
|
||||
const kernel = createKernel(config, "/tmp/nerve-test");
|
||||
|
||||
expect(mockChildren.length).toBe(1);
|
||||
const oldChild = mockChildren[0];
|
||||
|
||||
const restartPromise = kernel.restartGroup("system");
|
||||
// The shutdown message triggers exit in the mock
|
||||
await restartPromise;
|
||||
|
||||
// A new child should have been spawned
|
||||
expect(mockChildren.length).toBe(2);
|
||||
|
||||
// Old child got shutdown
|
||||
expect(oldChild.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" }));
|
||||
|
||||
await kernel.stop();
|
||||
});
|
||||
|
||||
it("restartGroup on unknown group does nothing", async () => {
|
||||
const config = makeConfig();
|
||||
const kernel = createKernel(config, "/tmp/nerve-test");
|
||||
|
||||
expect(mockChildren.length).toBe(1);
|
||||
await kernel.restartGroup("nonexistent");
|
||||
// No new child spawned
|
||||
expect(mockChildren.length).toBe(1);
|
||||
|
||||
await kernel.stop();
|
||||
});
|
||||
});
|
||||
|
||||
describe("kernel — reloadConfig", () => {
|
||||
beforeEach(() => {
|
||||
mockChildren.length = 0;
|
||||
vi.useFakeTimers({ shouldAdvanceTime: true });
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("adds new group worker when new sense group appears", async () => {
|
||||
const config = makeConfig();
|
||||
const kernel = createKernel(config, "/tmp/nerve-test");
|
||||
|
||||
expect(mockChildren.length).toBe(1); // only system group
|
||||
expect(kernel.groups.has("network")).toBe(false);
|
||||
|
||||
kernel.reloadConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
});
|
||||
|
||||
expect(kernel.groups.has("network")).toBe(true);
|
||||
expect(mockChildren.length).toBe(2); // system + network
|
||||
|
||||
await kernel.stop();
|
||||
});
|
||||
|
||||
it("removes group worker when its senses are all removed", async () => {
|
||||
const config: NerveConfig = {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
};
|
||||
const kernel = createKernel(config, "/tmp/nerve-test");
|
||||
|
||||
expect(mockChildren.length).toBe(2);
|
||||
expect(kernel.groups.has("network")).toBe(true);
|
||||
|
||||
const networkChild = mockChildren[1];
|
||||
|
||||
kernel.reloadConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
});
|
||||
|
||||
expect(kernel.groups.has("network")).toBe(false);
|
||||
// Network child should have received shutdown
|
||||
expect(networkChild.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" }));
|
||||
|
||||
await kernel.stop();
|
||||
});
|
||||
|
||||
it("health reflects updated sense count after reloadConfig", async () => {
|
||||
const config = makeConfig();
|
||||
const kernel = createKernel(config, "/tmp/nerve-test");
|
||||
|
||||
expect(kernel.getHealth().activeSenses).toBe(1);
|
||||
|
||||
kernel.reloadConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"disk-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
});
|
||||
|
||||
expect(kernel.getHealth().activeSenses).toBe(2);
|
||||
|
||||
await kernel.stop();
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,361 @@
|
||||
/**
|
||||
* Phase 6 integration tests — hot reload, error isolation, grace period, health.
|
||||
*/
|
||||
|
||||
import { dirname, join } from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
|
||||
import type { Signal } from "@uncaged/nerve-core";
|
||||
import type { NerveConfig } from "@uncaged/nerve-core";
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
|
||||
import { createKernel } from "../kernel.js";
|
||||
import type { Kernel } from "../kernel.js";
|
||||
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dir = dirname(__filename);
|
||||
const MOCK_WORKER = join(__dir, "fixtures", "mock-worker.mjs");
|
||||
const ERROR_WORKER = join(__dir, "fixtures", "error-worker.mjs");
|
||||
|
||||
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
return {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
async function pollUntil(
|
||||
predicate: () => boolean,
|
||||
timeoutMs: number,
|
||||
intervalMs = 50,
|
||||
): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const timer = setTimeout(
|
||||
() => reject(new Error(`pollUntil timed out after ${timeoutMs}ms`)),
|
||||
timeoutMs,
|
||||
);
|
||||
const check = setInterval(() => {
|
||||
if (predicate()) {
|
||||
clearTimeout(timer);
|
||||
clearInterval(check);
|
||||
resolve();
|
||||
}
|
||||
}, intervalMs);
|
||||
});
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Hot Reload — restartGroup
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("phase6 — restartGroup", () => {
|
||||
let kernel: Kernel | null = null;
|
||||
|
||||
afterEach(async () => {
|
||||
if (kernel !== null) {
|
||||
await kernel.stop();
|
||||
kernel = null;
|
||||
}
|
||||
});
|
||||
|
||||
it("restartGroup stops old worker and spawns a new one", async () => {
|
||||
const config = makeConfig();
|
||||
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
|
||||
workerScript: MOCK_WORKER,
|
||||
});
|
||||
|
||||
await kernel.ready;
|
||||
|
||||
const oldPid = kernel.getWorkerPid("system");
|
||||
expect(oldPid).not.toBeNull();
|
||||
|
||||
await kernel.restartGroup("system");
|
||||
|
||||
// Wait for new worker to become ready
|
||||
await pollUntil(() => {
|
||||
const newPid = kernel?.getWorkerPid("system");
|
||||
return newPid !== null && newPid !== oldPid;
|
||||
}, 5000);
|
||||
|
||||
const newPid = kernel.getWorkerPid("system");
|
||||
expect(newPid).not.toBeNull();
|
||||
expect(newPid).not.toBe(oldPid);
|
||||
|
||||
// Verify new worker is functional
|
||||
const received: Signal[] = [];
|
||||
const unsub = kernel.bus.subscribe((s) => received.push(s));
|
||||
kernel.triggerCompute("cpu-usage");
|
||||
await pollUntil(() => received.length > 0, 3000);
|
||||
expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
|
||||
unsub();
|
||||
}, 15_000);
|
||||
|
||||
it("restartGroup on nonexistent group does nothing", async () => {
|
||||
const config = makeConfig();
|
||||
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
|
||||
workerScript: MOCK_WORKER,
|
||||
});
|
||||
await kernel.ready;
|
||||
|
||||
// Should not throw
|
||||
await kernel.restartGroup("nonexistent");
|
||||
}, 5_000);
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Hot Reload — reloadConfig
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("phase6 — reloadConfig", () => {
|
||||
let kernel: Kernel | null = null;
|
||||
|
||||
afterEach(async () => {
|
||||
if (kernel !== null) {
|
||||
await kernel.stop();
|
||||
kernel = null;
|
||||
}
|
||||
});
|
||||
|
||||
it("adds new group when new sense group is introduced", async () => {
|
||||
const config = makeConfig();
|
||||
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
|
||||
workerScript: MOCK_WORKER,
|
||||
});
|
||||
await kernel.ready;
|
||||
|
||||
expect(kernel.groups.has("network")).toBe(false);
|
||||
|
||||
const newConfig: NerveConfig = {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
};
|
||||
|
||||
kernel.reloadConfig(newConfig);
|
||||
|
||||
expect(kernel.groups.has("network")).toBe(true);
|
||||
|
||||
// Wait for the new network worker to start
|
||||
await pollUntil(() => kernel?.getWorkerPid("network") !== null, 3000);
|
||||
expect(kernel.getWorkerPid("network")).not.toBeNull();
|
||||
}, 10_000);
|
||||
|
||||
it("removes group when all its senses are removed", async () => {
|
||||
const config: NerveConfig = {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
};
|
||||
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
|
||||
workerScript: MOCK_WORKER,
|
||||
});
|
||||
await kernel.ready;
|
||||
|
||||
expect(kernel.groups.has("network")).toBe(true);
|
||||
|
||||
const newConfig: NerveConfig = {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
};
|
||||
|
||||
kernel.reloadConfig(newConfig);
|
||||
|
||||
expect(kernel.groups.has("network")).toBe(false);
|
||||
}, 10_000);
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Error Isolation
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("phase6 — error isolation", () => {
|
||||
let kernel: Kernel | null = null;
|
||||
|
||||
afterEach(async () => {
|
||||
if (kernel !== null) {
|
||||
await kernel.stop();
|
||||
kernel = null;
|
||||
}
|
||||
});
|
||||
|
||||
it("error from one sense does not crash the worker — other senses still work", async () => {
|
||||
const config: NerveConfig = {
|
||||
senses: {
|
||||
"good-sense": { group: "mixed", throttle: null, timeout: null, gracePeriod: null },
|
||||
"bad-sense": { group: "mixed", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
};
|
||||
|
||||
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
|
||||
workerScript: MOCK_WORKER,
|
||||
});
|
||||
await kernel.ready;
|
||||
|
||||
// Both senses go through the same worker (mock-worker responds to all compute with signal)
|
||||
const received: Signal[] = [];
|
||||
const unsub = kernel.bus.subscribe((s) => received.push(s));
|
||||
|
||||
kernel.triggerCompute("good-sense");
|
||||
await pollUntil(() => received.length > 0, 3000);
|
||||
expect(received[0]).toMatchObject({ senseId: "good-sense" });
|
||||
|
||||
kernel.triggerCompute("bad-sense");
|
||||
await pollUntil(() => received.length > 1, 3000);
|
||||
expect(received[1]).toMatchObject({ senseId: "bad-sense" });
|
||||
|
||||
unsub();
|
||||
}, 10_000);
|
||||
|
||||
it("error worker sends error messages, kernel still running", async () => {
|
||||
const stderrMessages: string[] = [];
|
||||
const stderrSpy = ((original) => {
|
||||
return (chunk: string | Uint8Array) => {
|
||||
stderrMessages.push(String(chunk));
|
||||
return original.call(process.stderr, chunk);
|
||||
};
|
||||
})(process.stderr.write);
|
||||
const origWrite = process.stderr.write;
|
||||
process.stderr.write = stderrSpy as typeof process.stderr.write;
|
||||
|
||||
const config = makeConfig();
|
||||
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
|
||||
workerScript: ERROR_WORKER,
|
||||
});
|
||||
await kernel.ready;
|
||||
|
||||
kernel.triggerCompute("cpu-usage");
|
||||
|
||||
// Wait for the error to be logged
|
||||
await pollUntil(() => stderrMessages.some((m) => m.includes("simulated compute error")), 3000);
|
||||
|
||||
process.stderr.write = origWrite;
|
||||
|
||||
// Kernel should still be running (not crashed)
|
||||
expect(kernel.getWorkerPid("system")).not.toBeNull();
|
||||
}, 10_000);
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// getHealth
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("phase6 — getHealth", () => {
|
||||
let kernel: Kernel | null = null;
|
||||
|
||||
afterEach(async () => {
|
||||
if (kernel !== null) {
|
||||
await kernel.stop();
|
||||
kernel = null;
|
||||
}
|
||||
});
|
||||
|
||||
it("returns health snapshot with correct shape", async () => {
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"disk-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
});
|
||||
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
|
||||
workerScript: MOCK_WORKER,
|
||||
});
|
||||
await kernel.ready;
|
||||
|
||||
const health = kernel.getHealth();
|
||||
|
||||
expect(health.uptime).toBeGreaterThanOrEqual(0);
|
||||
expect(health.activeSenses).toBe(3);
|
||||
expect(health.activeGroups).toBe(2);
|
||||
expect(health.memoryUsage).toBeDefined();
|
||||
expect(typeof health.memoryUsage.heapUsed).toBe("number");
|
||||
}, 10_000);
|
||||
|
||||
it("health reflects config changes after reloadConfig", async () => {
|
||||
const config = makeConfig();
|
||||
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
|
||||
workerScript: MOCK_WORKER,
|
||||
});
|
||||
await kernel.ready;
|
||||
|
||||
expect(kernel.getHealth().activeSenses).toBe(1);
|
||||
|
||||
const newConfig: NerveConfig = {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
};
|
||||
kernel.reloadConfig(newConfig);
|
||||
|
||||
expect(kernel.getHealth().activeSenses).toBe(2);
|
||||
expect(kernel.getHealth().activeGroups).toBe(2);
|
||||
}, 10_000);
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Auto-respawn on crash (existing test extended)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("phase6 — auto-respawn on worker crash", () => {
|
||||
let kernel: Kernel | null = null;
|
||||
|
||||
afterEach(async () => {
|
||||
if (kernel !== null) {
|
||||
await kernel.stop();
|
||||
kernel = null;
|
||||
}
|
||||
});
|
||||
|
||||
it("kernel auto-respawns worker and new worker is functional", async () => {
|
||||
const config = makeConfig();
|
||||
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
|
||||
workerScript: MOCK_WORKER,
|
||||
});
|
||||
await kernel.ready;
|
||||
|
||||
const originalPid = kernel.getWorkerPid("system");
|
||||
expect(originalPid).not.toBeNull();
|
||||
|
||||
// Kill worker to simulate crash
|
||||
process.kill(originalPid as number, "SIGKILL");
|
||||
|
||||
// Wait for respawn
|
||||
await pollUntil(() => {
|
||||
const pid = kernel?.getWorkerPid("system");
|
||||
return pid !== null && pid !== originalPid;
|
||||
}, 5000);
|
||||
|
||||
const newPid = kernel.getWorkerPid("system");
|
||||
expect(newPid).not.toBeNull();
|
||||
expect(newPid).not.toBe(originalPid);
|
||||
|
||||
// Verify new worker responds
|
||||
const received: Signal[] = [];
|
||||
const unsub = kernel.bus.subscribe((s) => received.push(s));
|
||||
kernel.triggerCompute("cpu-usage");
|
||||
await pollUntil(() => received.length > 0, 5000);
|
||||
expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
|
||||
unsub();
|
||||
|
||||
await kernel.stop();
|
||||
kernel = null;
|
||||
}, 15_000);
|
||||
});
|
||||
@@ -0,0 +1,107 @@
|
||||
/**
|
||||
* File Watcher — watches nerveRoot for file changes and signals the kernel.
|
||||
*
|
||||
* Uses Node.js fs.watch (no external dependencies).
|
||||
*
|
||||
* Watched events:
|
||||
* - .ts file under senses/ modified → callback with { kind: "sense", senseName, filePath }
|
||||
* - nerve.yaml modified → callback with { kind: "config", filePath }
|
||||
*
|
||||
* Debounces rapid changes (e.g. editor save flicker) with a configurable delay.
|
||||
*/
|
||||
|
||||
import { watch } from "node:fs";
|
||||
import type { FSWatcher } from "node:fs";
|
||||
import { join, relative, sep } from "node:path";
|
||||
|
||||
export type SenseFileChange = {
|
||||
kind: "sense";
|
||||
senseName: string;
|
||||
filePath: string;
|
||||
};
|
||||
|
||||
export type ConfigFileChange = {
|
||||
kind: "config";
|
||||
filePath: string;
|
||||
};
|
||||
|
||||
export type FileChange = SenseFileChange | ConfigFileChange;
|
||||
|
||||
export type FileChangeHandler = (change: FileChange) => void;
|
||||
|
||||
export type FileWatcher = {
|
||||
close: () => void;
|
||||
};
|
||||
|
||||
const DEFAULT_DEBOUNCE_MS = 300;
|
||||
|
||||
export function createFileWatcher(
|
||||
nerveRoot: string,
|
||||
handler: FileChangeHandler,
|
||||
debounceMs: number = DEFAULT_DEBOUNCE_MS,
|
||||
): FileWatcher {
|
||||
const watchers: FSWatcher[] = [];
|
||||
const debounceTimers = new Map<string, ReturnType<typeof setTimeout>>();
|
||||
|
||||
function debounced(key: string, fn: () => void): void {
|
||||
const existing = debounceTimers.get(key);
|
||||
if (existing !== undefined) clearTimeout(existing);
|
||||
debounceTimers.set(
|
||||
key,
|
||||
setTimeout(() => {
|
||||
debounceTimers.delete(key);
|
||||
fn();
|
||||
}, debounceMs),
|
||||
);
|
||||
}
|
||||
|
||||
function handleFsEvent(_eventType: string, filename: string | null): void {
|
||||
if (filename === null) return;
|
||||
|
||||
const normalized = filename.split(sep).join("/");
|
||||
|
||||
if (normalized === "nerve.yaml") {
|
||||
debounced("config", () => {
|
||||
handler({ kind: "config", filePath: join(nerveRoot, "nerve.yaml") });
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (normalized.startsWith("senses/") && normalized.endsWith(".ts")) {
|
||||
const rel = relative("senses", normalized);
|
||||
const senseName = rel.split("/")[0];
|
||||
if (senseName) {
|
||||
debounced(`sense:${senseName}`, () => {
|
||||
handler({
|
||||
kind: "sense",
|
||||
senseName,
|
||||
filePath: join(nerveRoot, filename),
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
const w = watch(nerveRoot, { recursive: true }, (eventType, filename) => {
|
||||
handleFsEvent(eventType, filename);
|
||||
});
|
||||
watchers.push(w);
|
||||
} catch (e) {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`[file-watcher] Failed to watch "${nerveRoot}": ${msg}\n`);
|
||||
}
|
||||
|
||||
function close(): void {
|
||||
for (const timer of debounceTimers.values()) {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
debounceTimers.clear();
|
||||
for (const w of watchers) {
|
||||
w.close();
|
||||
}
|
||||
watchers.length = 0;
|
||||
}
|
||||
|
||||
return { close };
|
||||
}
|
||||
@@ -1,6 +1,8 @@
|
||||
export type {
|
||||
ComputeMessage,
|
||||
ShutdownMessage,
|
||||
HealthRequestMessage,
|
||||
HealthResponseMessage,
|
||||
ParentToWorkerMessage,
|
||||
SignalMessage,
|
||||
ErrorMessage,
|
||||
@@ -21,4 +23,7 @@ export {
|
||||
} from "./sense-runtime.js";
|
||||
|
||||
export { createKernel } from "./kernel.js";
|
||||
export type { Kernel, KernelOptions } from "./kernel.js";
|
||||
export type { Kernel, KernelOptions, KernelHealth } from "./kernel.js";
|
||||
|
||||
export { createFileWatcher } from "./file-watcher.js";
|
||||
export type { FileWatcher, FileChange, FileChangeHandler } from "./file-watcher.js";
|
||||
|
||||
+62
-26
@@ -17,8 +17,13 @@ export type ShutdownMessage = {
|
||||
type: "shutdown";
|
||||
};
|
||||
|
||||
/** Parent → Worker: request health info from worker */
|
||||
export type HealthRequestMessage = {
|
||||
type: "health-request";
|
||||
};
|
||||
|
||||
/** Union of all messages the parent sends to a worker */
|
||||
export type ParentToWorkerMessage = ComputeMessage | ShutdownMessage;
|
||||
export type ParentToWorkerMessage = ComputeMessage | ShutdownMessage | HealthRequestMessage;
|
||||
|
||||
/** Worker → Parent: compute produced a signal */
|
||||
export type SignalMessage = {
|
||||
@@ -39,8 +44,21 @@ export type ReadyMessage = {
|
||||
type: "ready";
|
||||
};
|
||||
|
||||
/** Worker → Parent: health info response */
|
||||
export type HealthResponseMessage = {
|
||||
type: "health-response";
|
||||
senses: string[];
|
||||
inFlightCount: number;
|
||||
};
|
||||
|
||||
/** Union of all messages a worker sends to the parent */
|
||||
export type WorkerToParentMessage = SignalMessage | ErrorMessage | ReadyMessage;
|
||||
export type WorkerToParentMessage =
|
||||
| SignalMessage
|
||||
| ErrorMessage
|
||||
| ReadyMessage
|
||||
| HealthResponseMessage;
|
||||
|
||||
const PARENT_MSG_TYPES = new Set(["compute", "shutdown", "health-request"]);
|
||||
|
||||
/** Validate and parse an unknown IPC message received from the parent process. */
|
||||
export function parseParentMessage(raw: unknown): Result<ParentToWorkerMessage> {
|
||||
@@ -51,13 +69,47 @@ export function parseParentMessage(raw: unknown): Result<ParentToWorkerMessage>
|
||||
if (typeof obj.type !== "string") {
|
||||
return err(new Error("IPC message missing string 'type' field"));
|
||||
}
|
||||
const type = obj.type;
|
||||
if (type !== "compute" && type !== "shutdown") {
|
||||
return err(new Error(`Unknown IPC message type: "${type}"`));
|
||||
if (!PARENT_MSG_TYPES.has(obj.type)) {
|
||||
return err(new Error(`Unknown IPC message type: "${obj.type}"`));
|
||||
}
|
||||
return ok(raw as ParentToWorkerMessage);
|
||||
}
|
||||
|
||||
function parseSignalMsg(obj: Record<string, unknown>, raw: unknown): Result<WorkerToParentMessage> {
|
||||
if (typeof obj.sense !== "string") {
|
||||
return err(new Error("Worker 'signal' message missing string 'sense' field"));
|
||||
}
|
||||
if (!("payload" in obj)) {
|
||||
return err(new Error("Worker 'signal' message missing 'payload' field"));
|
||||
}
|
||||
return ok(raw as SignalMessage);
|
||||
}
|
||||
|
||||
function parseErrorMsg(obj: Record<string, unknown>, raw: unknown): Result<WorkerToParentMessage> {
|
||||
if (typeof obj.sense !== "string") {
|
||||
return err(new Error("Worker 'error' message missing string 'sense' field"));
|
||||
}
|
||||
if (typeof obj.error !== "string") {
|
||||
return err(new Error("Worker 'error' message missing string 'error' field"));
|
||||
}
|
||||
return ok(raw as ErrorMessage);
|
||||
}
|
||||
|
||||
function parseHealthResponseMsg(
|
||||
obj: Record<string, unknown>,
|
||||
raw: unknown,
|
||||
): Result<WorkerToParentMessage> {
|
||||
if (!Array.isArray(obj.senses)) {
|
||||
return err(new Error("Worker 'health-response' message missing 'senses' array"));
|
||||
}
|
||||
if (typeof obj.inFlightCount !== "number") {
|
||||
return err(new Error("Worker 'health-response' message missing 'inFlightCount' number"));
|
||||
}
|
||||
return ok(raw as HealthResponseMessage);
|
||||
}
|
||||
|
||||
const WORKER_MSG_TYPES = new Set(["signal", "error", "ready", "health-response"]);
|
||||
|
||||
/** Validate and parse an unknown IPC message received from a worker process. */
|
||||
export function parseWorkerMessage(raw: unknown): Result<WorkerToParentMessage> {
|
||||
if (raw === null || typeof raw !== "object") {
|
||||
@@ -67,27 +119,11 @@ export function parseWorkerMessage(raw: unknown): Result<WorkerToParentMessage>
|
||||
if (typeof obj.type !== "string") {
|
||||
return err(new Error("Worker IPC message missing string 'type' field"));
|
||||
}
|
||||
const type = obj.type;
|
||||
if (type !== "signal" && type !== "error" && type !== "ready") {
|
||||
return err(new Error(`Unknown worker IPC message type: "${type}"`));
|
||||
}
|
||||
if (type === "signal") {
|
||||
if (typeof obj.sense !== "string") {
|
||||
return err(new Error("Worker 'signal' message missing string 'sense' field"));
|
||||
}
|
||||
if (!("payload" in obj)) {
|
||||
return err(new Error("Worker 'signal' message missing 'payload' field"));
|
||||
}
|
||||
return ok(raw as SignalMessage);
|
||||
}
|
||||
if (type === "error") {
|
||||
if (typeof obj.sense !== "string") {
|
||||
return err(new Error("Worker 'error' message missing string 'sense' field"));
|
||||
}
|
||||
if (typeof obj.error !== "string") {
|
||||
return err(new Error("Worker 'error' message missing string 'error' field"));
|
||||
}
|
||||
return ok(raw as ErrorMessage);
|
||||
if (!WORKER_MSG_TYPES.has(obj.type)) {
|
||||
return err(new Error(`Unknown worker IPC message type: "${obj.type}"`));
|
||||
}
|
||||
if (obj.type === "signal") return parseSignalMsg(obj, raw);
|
||||
if (obj.type === "error") return parseErrorMsg(obj, raw);
|
||||
if (obj.type === "health-response") return parseHealthResponseMsg(obj, raw);
|
||||
return ok({ type: "ready" });
|
||||
}
|
||||
|
||||
@@ -8,15 +8,21 @@
|
||||
* - Route ErrorMessage from workers → stderr log
|
||||
* - Drive compute triggers via ReflexScheduler
|
||||
* - Graceful shutdown: stop scheduler, send shutdown to all workers
|
||||
* - Hot reload: restartGroup, reloadConfig, file watcher integration
|
||||
* - Health reporting: getHealth
|
||||
*/
|
||||
|
||||
import { fork } from "node:child_process";
|
||||
import type { ChildProcess } from "node:child_process";
|
||||
import { readFileSync } from "node:fs";
|
||||
import { dirname, join } from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
|
||||
import type { NerveConfig, Signal } from "@uncaged/nerve-core";
|
||||
import { parseNerveConfig } from "@uncaged/nerve-core";
|
||||
|
||||
import { createFileWatcher } from "./file-watcher.js";
|
||||
import type { FileWatcher } from "./file-watcher.js";
|
||||
import type { ComputeMessage, ShutdownMessage } from "./ipc.js";
|
||||
import { parseWorkerMessage } from "./ipc.js";
|
||||
import { createReflexScheduler } from "./reflex-scheduler.js";
|
||||
@@ -24,6 +30,14 @@ import type { ReflexScheduler } from "./reflex-scheduler.js";
|
||||
import { createSignalBus } from "./signal-bus.js";
|
||||
import type { SignalBus } from "./signal-bus.js";
|
||||
|
||||
export type KernelHealth = {
|
||||
uptime: number;
|
||||
activeSenses: number;
|
||||
activeGroups: number;
|
||||
pendingComputes: number;
|
||||
memoryUsage: NodeJS.MemoryUsage;
|
||||
};
|
||||
|
||||
export type Kernel = {
|
||||
stop: () => Promise<void>;
|
||||
groups: Set<string>;
|
||||
@@ -35,6 +49,12 @@ export type Kernel = {
|
||||
getWorkerPid: (group: string) => number | null;
|
||||
/** Sends a compute message to the worker responsible for the given sense. */
|
||||
triggerCompute: (senseName: string) => void;
|
||||
/** Gracefully restart a group worker (wait for exit, then respawn). */
|
||||
restartGroup: (group: string) => Promise<void>;
|
||||
/** Reload config from a new NerveConfig, incrementally updating scheduler and workers. */
|
||||
reloadConfig: (newConfig: NerveConfig) => void;
|
||||
/** Return daemon health info. */
|
||||
getHealth: () => KernelHealth;
|
||||
};
|
||||
|
||||
type WorkerEntry = {
|
||||
@@ -66,7 +86,6 @@ function sendCompute(worker: ChildProcess, senseName: string): void {
|
||||
}
|
||||
|
||||
function sendShutdown(worker: ChildProcess): void {
|
||||
// worker.connected is false when the IPC channel has been closed (e.g. worker crashed)
|
||||
if (worker.connected === false) return;
|
||||
const msg: ShutdownMessage = { type: "shutdown" };
|
||||
try {
|
||||
@@ -84,21 +103,24 @@ function groupForSense(config: NerveConfig, senseName: string): string | null {
|
||||
|
||||
export type KernelOptions = {
|
||||
workerScript: string;
|
||||
enableFileWatcher?: boolean;
|
||||
};
|
||||
|
||||
function defaultKernelOptions(): KernelOptions {
|
||||
return { workerScript: resolveWorkerScript() };
|
||||
return { workerScript: resolveWorkerScript(), enableFileWatcher: false };
|
||||
}
|
||||
|
||||
export function createKernel(
|
||||
config: NerveConfig,
|
||||
initialConfig: NerveConfig,
|
||||
nerveRoot: string,
|
||||
options: KernelOptions = defaultKernelOptions(),
|
||||
): Kernel {
|
||||
const bus: SignalBus = createSignalBus();
|
||||
const workerScript = options.workerScript;
|
||||
const startTime = Date.now();
|
||||
|
||||
let config = initialConfig;
|
||||
|
||||
// Signal ID counter is instance-scoped (fix #2)
|
||||
let _signalIdCounter = 0;
|
||||
function nextSignalId(): number {
|
||||
_signalIdCounter += 1;
|
||||
@@ -112,7 +134,6 @@ export function createKernel(
|
||||
|
||||
const workers = new Map<string, WorkerEntry>();
|
||||
let stopped = false;
|
||||
// eslint-disable-next-line prefer-const
|
||||
let scheduler: ReflexScheduler = null as unknown as ReflexScheduler;
|
||||
|
||||
let readyResolve: (() => void) | undefined;
|
||||
@@ -159,6 +180,8 @@ export function createKernel(
|
||||
bus.emit(signal);
|
||||
scheduler.onComputeComplete(msg.sense);
|
||||
}
|
||||
|
||||
// health-response is handled externally by the caller; no action needed here
|
||||
}
|
||||
|
||||
function startWorker(group: string): void {
|
||||
@@ -170,10 +193,8 @@ export function createKernel(
|
||||
process.stderr.write(
|
||||
`[kernel] worker for group "${group}" exited with code ${code ?? "null"}\n`,
|
||||
);
|
||||
// Respawn on unexpected exit (fix #4)
|
||||
if (!stopped && code !== 0) {
|
||||
process.stderr.write(`[kernel] respawning worker for group "${group}" in 1s\n`);
|
||||
// Clear any stuck in-flight state for all senses in this group
|
||||
for (const senseName of sensesForGroup(group)) {
|
||||
scheduler.onComputeComplete(senseName);
|
||||
}
|
||||
@@ -212,7 +233,6 @@ export function createKernel(
|
||||
startWorker(group);
|
||||
}
|
||||
|
||||
// Wait for a worker process to exit, with a timeout + SIGKILL fallback (fix #5)
|
||||
function waitForExit(child: ChildProcess, timeoutMs: number): Promise<void> {
|
||||
return new Promise((resolve) => {
|
||||
const timer = setTimeout(() => {
|
||||
@@ -226,8 +246,115 @@ export function createKernel(
|
||||
});
|
||||
}
|
||||
|
||||
// --- restartGroup: gracefully stop worker, then respawn ---
|
||||
async function restartGroup(group: string): Promise<void> {
|
||||
const entry = workers.get(group);
|
||||
if (entry === undefined) return;
|
||||
|
||||
for (const senseName of sensesForGroup(group)) {
|
||||
scheduler.onComputeComplete(senseName);
|
||||
}
|
||||
|
||||
sendShutdown(entry.process);
|
||||
await waitForExit(entry.process, 5000);
|
||||
|
||||
if (!stopped) {
|
||||
startWorker(group);
|
||||
}
|
||||
}
|
||||
|
||||
function collectGroups(cfg: NerveConfig): Set<string> {
|
||||
const result = new Set<string>();
|
||||
for (const sc of Object.values(cfg.senses)) {
|
||||
result.add(sc.group);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
function removeStaleGroups(oldGroups: Set<string>, newGroups: Set<string>): void {
|
||||
for (const g of oldGroups) {
|
||||
if (newGroups.has(g)) continue;
|
||||
const entry = workers.get(g);
|
||||
if (entry !== undefined) {
|
||||
sendShutdown(entry.process);
|
||||
workers.delete(g);
|
||||
}
|
||||
groups.delete(g);
|
||||
}
|
||||
}
|
||||
|
||||
function addNewGroups(oldGroups: Set<string>, newGroups: Set<string>): void {
|
||||
for (const g of newGroups) {
|
||||
if (oldGroups.has(g)) continue;
|
||||
groups.add(g);
|
||||
if (!stopped) startWorker(g);
|
||||
}
|
||||
}
|
||||
|
||||
function reloadConfig(newConfig: NerveConfig): void {
|
||||
const oldGroups = collectGroups(config);
|
||||
config = newConfig;
|
||||
scheduler.stop();
|
||||
scheduler = createReflexScheduler(config, bus, triggerFn);
|
||||
const newGroups = collectGroups(newConfig);
|
||||
removeStaleGroups(oldGroups, newGroups);
|
||||
addNewGroups(oldGroups, newGroups);
|
||||
}
|
||||
|
||||
function getHealth(): KernelHealth {
|
||||
return {
|
||||
uptime: Date.now() - startTime,
|
||||
activeSenses: Object.keys(config.senses).length,
|
||||
activeGroups: workers.size,
|
||||
pendingComputes: 0,
|
||||
memoryUsage: process.memoryUsage(),
|
||||
};
|
||||
}
|
||||
|
||||
function handleSenseFileChange(senseName: string): void {
|
||||
const sc = config.senses[senseName];
|
||||
if (sc === undefined) return;
|
||||
process.stderr.write(
|
||||
`[kernel] sense file changed: "${senseName}", restarting group "${sc.group}"\n`,
|
||||
);
|
||||
restartGroup(sc.group).catch((e) => {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`[kernel] restartGroup error: ${msg}\n`);
|
||||
});
|
||||
}
|
||||
|
||||
function handleConfigFileChange(): void {
|
||||
process.stderr.write("[kernel] nerve.yaml changed, reloading config\n");
|
||||
try {
|
||||
const raw = readFileSync(join(nerveRoot, "nerve.yaml"), "utf8");
|
||||
const parseResult = parseNerveConfig(raw);
|
||||
if (!parseResult.ok) {
|
||||
process.stderr.write(
|
||||
`[kernel] config parse error, keeping current config: ${parseResult.error.message}\n`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
reloadConfig(parseResult.value);
|
||||
} catch (e) {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`[kernel] failed to read nerve.yaml, keeping current config: ${msg}\n`);
|
||||
}
|
||||
}
|
||||
|
||||
let fileWatcher: FileWatcher | null = null;
|
||||
if (options.enableFileWatcher) {
|
||||
fileWatcher = createFileWatcher(nerveRoot, (change) => {
|
||||
if (change.kind === "sense") handleSenseFileChange(change.senseName);
|
||||
if (change.kind === "config") handleConfigFileChange();
|
||||
});
|
||||
}
|
||||
|
||||
async function stop(): Promise<void> {
|
||||
stopped = true;
|
||||
if (fileWatcher !== null) {
|
||||
fileWatcher.close();
|
||||
fileWatcher = null;
|
||||
}
|
||||
scheduler.stop();
|
||||
const exitPromises: Promise<void>[] = [];
|
||||
for (const entry of workers.values()) {
|
||||
@@ -243,5 +370,16 @@ export function createKernel(
|
||||
|
||||
const senseCount = Object.keys(config.senses).length;
|
||||
|
||||
return { stop, groups, senseCount, bus, ready, getWorkerPid, triggerCompute: triggerFn };
|
||||
return {
|
||||
stop,
|
||||
groups,
|
||||
senseCount,
|
||||
bus,
|
||||
ready,
|
||||
getWorkerPid,
|
||||
triggerCompute: triggerFn,
|
||||
restartGroup,
|
||||
reloadConfig,
|
||||
getHealth,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -79,18 +79,12 @@ async function initSense(
|
||||
|
||||
const dbResult = openSenseDb(dbPath, migrationsDir);
|
||||
if (!dbResult.ok) {
|
||||
process.stderr.write(
|
||||
`[sense-worker] Failed to init DB for "${senseName}": ${dbResult.error.message}\n`,
|
||||
);
|
||||
process.exit(1);
|
||||
throw new Error(`Failed to init DB for "${senseName}": ${dbResult.error.message}`);
|
||||
}
|
||||
|
||||
const computeResult = await loadComputeFn(senseIndexPath);
|
||||
if (!computeResult.ok) {
|
||||
process.stderr.write(
|
||||
`[sense-worker] Failed to load compute for "${senseName}": ${computeResult.error.message}\n`,
|
||||
);
|
||||
process.exit(1);
|
||||
throw new Error(`Failed to load compute for "${senseName}": ${computeResult.error.message}`);
|
||||
}
|
||||
|
||||
const { db } = dbResult.value;
|
||||
@@ -129,12 +123,70 @@ function buildPeers(
|
||||
return Object.fromEntries(entries);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Grace period: hard kill after soft timeout
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const gracePeriodTimers = new Map<string, ReturnType<typeof setTimeout>>();
|
||||
|
||||
function scheduleGracePeriodKill(senseName: string, gracePeriodMs: number): void {
|
||||
if (gracePeriodTimers.has(senseName)) return;
|
||||
process.stderr.write(`[sense-worker] grace period for "${senseName}" (${gracePeriodMs}ms)\n`);
|
||||
const timer = setTimeout(() => {
|
||||
process.stderr.write(`[sense-worker] grace period expired for "${senseName}", hard kill\n`);
|
||||
process.exit(1);
|
||||
}, gracePeriodMs);
|
||||
gracePeriodTimers.set(senseName, timer);
|
||||
}
|
||||
|
||||
function clearGracePeriodTimer(senseName: string): void {
|
||||
const timer = gracePeriodTimers.get(senseName);
|
||||
if (timer === undefined) return;
|
||||
clearTimeout(timer);
|
||||
gracePeriodTimers.delete(senseName);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Compute execution with error isolation
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async function runCompute(
|
||||
senseName: string,
|
||||
runtime: SenseRuntime,
|
||||
peers: PeerMap,
|
||||
timeoutMs: number,
|
||||
gracePeriodMs: number | null,
|
||||
): Promise<void> {
|
||||
try {
|
||||
const result = await executeCompute(runtime, peers, timeoutMs);
|
||||
if (!result.ok) {
|
||||
sendError(senseName, result.error.message);
|
||||
if (gracePeriodMs !== null && result.error.message.includes("timed out")) {
|
||||
scheduleGracePeriodKill(senseName, gracePeriodMs);
|
||||
}
|
||||
return;
|
||||
}
|
||||
clearGracePeriodTimer(senseName);
|
||||
if (result.value !== null) {
|
||||
sendSignal(senseName, result.value);
|
||||
}
|
||||
} catch (e: unknown) {
|
||||
const errMsg = e instanceof Error ? e.message : String(e);
|
||||
sendError(senseName, errMsg);
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// IPC message dispatch
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function handleMessage(
|
||||
raw: unknown,
|
||||
runtimes: Map<string, SenseRuntime>,
|
||||
peers: PeerMap,
|
||||
group: string,
|
||||
timeoutMs: number,
|
||||
gracePeriodMs: number | null,
|
||||
inFlight: Map<string, Promise<void>>,
|
||||
): void {
|
||||
const parseResult = parseParentMessage(raw);
|
||||
@@ -149,33 +201,31 @@ function handleMessage(
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
if (msg.type === "compute") {
|
||||
const runtime = runtimes.get(msg.sense);
|
||||
if (!runtime) {
|
||||
sendError(msg.sense, `Unknown sense "${msg.sense}" in group "${group}"`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Serialize computes for the same sense
|
||||
const previous = inFlight.get(msg.sense) ?? Promise.resolve();
|
||||
const next = previous.then(async () => {
|
||||
const result = await executeCompute(runtime, peers, timeoutMs);
|
||||
if (!result.ok) {
|
||||
sendError(msg.sense, result.error.message);
|
||||
return;
|
||||
}
|
||||
if (result.value !== null) {
|
||||
sendSignal(msg.sense, result.value);
|
||||
}
|
||||
if (msg.type === "health-request") {
|
||||
send({
|
||||
type: "health-response",
|
||||
senses: [...runtimes.keys()],
|
||||
inFlightCount: inFlight.size,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const tracked = next.catch((e: unknown) => {
|
||||
if (msg.type !== "compute") return;
|
||||
|
||||
const runtime = runtimes.get(msg.sense);
|
||||
if (!runtime) {
|
||||
sendError(msg.sense, `Unknown sense "${msg.sense}" in group "${group}"`);
|
||||
return;
|
||||
}
|
||||
|
||||
const previous = inFlight.get(msg.sense) ?? Promise.resolve();
|
||||
const next = previous
|
||||
.then(() => runCompute(msg.sense, runtime, peers, timeoutMs, gracePeriodMs))
|
||||
.catch((e: unknown) => {
|
||||
const errMsg = e instanceof Error ? e.message : String(e);
|
||||
sendError(msg.sense, errMsg);
|
||||
});
|
||||
|
||||
inFlight.set(msg.sense, tracked);
|
||||
}
|
||||
inFlight.set(msg.sense, next);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -198,29 +248,42 @@ async function bootstrap(nerveRoot: string, group: string): Promise<void> {
|
||||
|
||||
const runtimes = new Map<string, SenseRuntime>();
|
||||
const ownDbs = new Map<string, DrizzleDB>();
|
||||
const failedSenses: string[] = [];
|
||||
|
||||
for (const senseName of groupSenses) {
|
||||
const { db, runtime } = await initSense(nerveRoot, senseName);
|
||||
ownDbs.set(senseName, db);
|
||||
runtimes.set(senseName, runtime);
|
||||
try {
|
||||
const { db, runtime } = await initSense(nerveRoot, senseName);
|
||||
ownDbs.set(senseName, db);
|
||||
runtimes.set(senseName, runtime);
|
||||
} catch (e: unknown) {
|
||||
const eMsg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(
|
||||
`[sense-worker] Failed to load sense "${senseName}", skipping: ${eMsg}\n`,
|
||||
);
|
||||
failedSenses.push(senseName);
|
||||
}
|
||||
}
|
||||
|
||||
// If ALL senses failed, exit with error so kernel respawns
|
||||
if (runtimes.size === 0) {
|
||||
process.stderr.write(`[sense-worker] All senses in group "${group}" failed to load, exiting\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const groupSenseNames = new Set(groupSenses);
|
||||
const peers = buildPeers(nerveRoot, Object.keys(config.senses), ownDbs, groupSenseNames);
|
||||
|
||||
// Read timeout from config (uses first group sense's config, or default)
|
||||
// Read timeout and grace period from config (uses first group sense's config)
|
||||
const firstSenseConfig = config.senses[groupSenses[0]];
|
||||
const timeoutMs =
|
||||
typeof (firstSenseConfig as Record<string, unknown>).timeoutMs === "number"
|
||||
? ((firstSenseConfig as Record<string, unknown>).timeoutMs as number)
|
||||
: DEFAULT_TIMEOUT_MS;
|
||||
const timeoutMs = firstSenseConfig?.timeout ?? DEFAULT_TIMEOUT_MS;
|
||||
const gracePeriodMs = firstSenseConfig?.gracePeriod ?? null;
|
||||
|
||||
const inFlight = new Map<string, Promise<void>>();
|
||||
|
||||
sendReady();
|
||||
|
||||
process.on("message", (raw: unknown) => {
|
||||
handleMessage(raw, runtimes, peers, group, timeoutMs, inFlight);
|
||||
handleMessage(raw, runtimes, peers, group, timeoutMs, gracePeriodMs, inFlight);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user