diff --git a/examples/senses/nerve-health.ts b/examples/senses/nerve-health.ts new file mode 100644 index 0000000..035d95f --- /dev/null +++ b/examples/senses/nerve-health.ts @@ -0,0 +1,70 @@ +/** + * nerve-health — built-in sense that reports daemon health via IPC. + * + * When running inside a sense worker, this compute function sends a + * "health-request" to the parent kernel process and returns the + * health-response payload as its signal. + * + * Usage in nerve.yaml: + * senses: + * nerve-health: + * group: internal + * throttle: 30s + * timeout: 5s + * + * reflexes: + * - sense: nerve-health + * interval: 30s + */ + +export type NerveHealth = { + uptime: number; + activeSenses: string[]; + inFlightCount: number; + workerMemoryUsage: NodeJS.MemoryUsage; + workerUptime: number; +}; + +export async function compute(): Promise { + const health = await requestHealthFromKernel(); + return health; +} + +function requestHealthFromKernel(): Promise { + return new Promise((resolve, reject) => { + if (!process.send) { + reject(new Error("nerve-health: not running as a child process with IPC")); + return; + } + + const timeout = setTimeout(() => { + process.removeListener("message", onMessage); + reject(new Error("nerve-health: health-request timed out")); + }, 5000); + + function onMessage(msg: unknown): void { + if ( + msg !== null && + typeof msg === "object" && + (msg as Record).type === "health-response" + ) { + clearTimeout(timeout); + process.removeListener("message", onMessage); + const resp = msg as { + senses: string[]; + inFlightCount: number; + }; + resolve({ + uptime: process.uptime(), + activeSenses: resp.senses, + inFlightCount: resp.inFlightCount, + workerMemoryUsage: process.memoryUsage(), + workerUptime: process.uptime(), + }); + } + } + + process.on("message", onMessage); + process.send({ type: "health-request" }); + }); +} diff --git a/packages/daemon/src/__tests__/file-watcher.test.ts b/packages/daemon/src/__tests__/file-watcher.test.ts new file mode 100644 index 0000000..ae7e836 --- /dev/null +++ b/packages/daemon/src/__tests__/file-watcher.test.ts @@ -0,0 +1,126 @@ +/** + * Unit tests for file-watcher.ts + */ + +import { mkdirSync, mkdtempSync, writeFileSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; + +import { afterEach, describe, expect, it } from "vitest"; + +import { createFileWatcher } from "../file-watcher.js"; +import type { FileChange, FileWatcher } from "../file-watcher.js"; + +function makeTempNerveRoot(): string { + const dir = mkdtempSync(join(tmpdir(), "nerve-fw-test-")); + mkdirSync(join(dir, "senses", "cpu-usage"), { recursive: true }); + writeFileSync(join(dir, "nerve.yaml"), "senses: {}\nreflexes: []\n"); + writeFileSync( + join(dir, "senses", "cpu-usage", "index.ts"), + "export async function compute() { return null; }", + ); + return dir; +} + +async function waitFor( + predicate: () => boolean, + timeoutMs: number, + intervalMs = 50, +): Promise { + return new Promise((resolve, reject) => { + const timer = setTimeout( + () => reject(new Error(`waitFor timed out after ${timeoutMs}ms`)), + timeoutMs, + ); + const check = setInterval(() => { + if (predicate()) { + clearTimeout(timer); + clearInterval(check); + resolve(); + } + }, intervalMs); + }); +} + +describe("createFileWatcher", () => { + let watcher: FileWatcher | null = null; + + afterEach(() => { + if (watcher !== null) { + watcher.close(); + watcher = null; + } + }); + + it("detects nerve.yaml changes", async () => { + const root = makeTempNerveRoot(); + const changes: FileChange[] = []; + + watcher = createFileWatcher(root, (change) => changes.push(change), 50); + + // Wait a bit for watcher to initialize, then modify nerve.yaml + await new Promise((r) => setTimeout(r, 100)); + writeFileSync(join(root, "nerve.yaml"), "senses: {}\nreflexes: []\n# changed\n"); + + await waitFor(() => changes.length > 0, 3000); + + expect(changes.length).toBeGreaterThanOrEqual(1); + expect(changes.some((c) => c.kind === "config")).toBe(true); + }, 10_000); + + it("detects sense .ts file changes", async () => { + const root = makeTempNerveRoot(); + const changes: FileChange[] = []; + + watcher = createFileWatcher(root, (change) => changes.push(change), 50); + + await new Promise((r) => setTimeout(r, 100)); + writeFileSync( + join(root, "senses", "cpu-usage", "index.ts"), + "export async function compute() { return 42; }", + ); + + await waitFor(() => changes.length > 0, 3000); + + expect(changes.length).toBeGreaterThanOrEqual(1); + const senseChanges = changes.filter((c) => c.kind === "sense"); + expect(senseChanges.length).toBeGreaterThanOrEqual(1); + expect((senseChanges[0] as { senseName: string }).senseName).toBe("cpu-usage"); + }, 10_000); + + it("close() stops the watcher", async () => { + const root = makeTempNerveRoot(); + const changes: FileChange[] = []; + + watcher = createFileWatcher(root, (change) => changes.push(change), 50); + + await new Promise((r) => setTimeout(r, 100)); + watcher.close(); + watcher = null; + + writeFileSync(join(root, "nerve.yaml"), "senses: {}\nreflexes: []\n# after close\n"); + + // Wait and verify no changes were captured + await new Promise((r) => setTimeout(r, 500)); + expect(changes.length).toBe(0); + }, 5_000); + + it("debounces rapid changes", async () => { + const root = makeTempNerveRoot(); + const changes: FileChange[] = []; + + watcher = createFileWatcher(root, (change) => changes.push(change), 200); + + await new Promise((r) => setTimeout(r, 100)); + + // Write rapidly + for (let i = 0; i < 5; i++) { + writeFileSync(join(root, "nerve.yaml"), `senses: {}\nreflexes: []\n# v${i}\n`); + } + + await waitFor(() => changes.length > 0, 3000); + + // With debounce, should see only 1 change (not 5) + expect(changes.length).toBe(1); + }, 10_000); +}); diff --git a/packages/daemon/src/__tests__/fixtures/crash-once-worker.mjs b/packages/daemon/src/__tests__/fixtures/crash-once-worker.mjs new file mode 100644 index 0000000..73911fe --- /dev/null +++ b/packages/daemon/src/__tests__/fixtures/crash-once-worker.mjs @@ -0,0 +1,36 @@ +/** + * Mock worker that crashes on first compute, then works normally after respawn. + * Uses a marker file to track if it has already crashed. + * + * First invocation: sends ready, then exits with code 1 on first compute. + * Subsequent invocations (after respawn): works like normal mock-worker. + */ + +import { existsSync, unlinkSync, writeFileSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; + +const markerFile = join(tmpdir(), `nerve-crash-once-${process.ppid}.marker`); + +process.send({ type: "ready" }); + +const hasCrashed = existsSync(markerFile); + +process.on("message", (msg) => { + if (!msg || typeof msg !== "object") return; + + if (msg.type === "shutdown") { + try { + unlinkSync(markerFile); + } catch {} + process.exit(0); + } + + if (msg.type === "compute" && typeof msg.sense === "string") { + if (!hasCrashed) { + writeFileSync(markerFile, "crashed", "utf8"); + process.exit(1); + } + process.send({ type: "signal", sense: msg.sense, payload: 42 }); + } +}); diff --git a/packages/daemon/src/__tests__/fixtures/error-worker.mjs b/packages/daemon/src/__tests__/fixtures/error-worker.mjs new file mode 100644 index 0000000..8b5f1a8 --- /dev/null +++ b/packages/daemon/src/__tests__/fixtures/error-worker.mjs @@ -0,0 +1,18 @@ +/** + * Mock worker that responds to compute with an error message. + * Used for error isolation integration tests. + */ + +process.send({ type: "ready" }); + +process.on("message", (msg) => { + if (!msg || typeof msg !== "object") return; + + if (msg.type === "shutdown") { + process.exit(0); + } + + if (msg.type === "compute" && typeof msg.sense === "string") { + process.send({ type: "error", sense: msg.sense, error: "simulated compute error" }); + } +}); diff --git a/packages/daemon/src/__tests__/fixtures/slow-worker.mjs b/packages/daemon/src/__tests__/fixtures/slow-worker.mjs new file mode 100644 index 0000000..8dad2c2 --- /dev/null +++ b/packages/daemon/src/__tests__/fixtures/slow-worker.mjs @@ -0,0 +1,24 @@ +/** + * Mock worker that takes a long time to respond to compute messages. + * Used for grace period / timeout integration tests. + * + * On compute: waits 10 seconds before responding (simulates hung compute). + * On shutdown: exits cleanly. + */ + +process.send({ type: "ready" }); + +process.on("message", (msg) => { + if (!msg || typeof msg !== "object") return; + + if (msg.type === "shutdown") { + process.exit(0); + } + + if (msg.type === "compute" && typeof msg.sense === "string") { + // Intentionally slow — will be killed by grace period + setTimeout(() => { + process.send({ type: "signal", sense: msg.sense, payload: "late" }); + }, 10_000); + } +}); diff --git a/packages/daemon/src/__tests__/kernel-phase6.test.ts b/packages/daemon/src/__tests__/kernel-phase6.test.ts new file mode 100644 index 0000000..720f4ce --- /dev/null +++ b/packages/daemon/src/__tests__/kernel-phase6.test.ts @@ -0,0 +1,237 @@ +/** + * Unit tests for kernel Phase 6 enhancements — restartGroup, reloadConfig, getHealth. + * Uses mocked child_process.fork. + */ + +import { EventEmitter } from "node:events"; + +import type { NerveConfig } from "@uncaged/nerve-core"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +// --------------------------------------------------------------------------- +// Mock child_process.fork +// --------------------------------------------------------------------------- + +const mockChildren: MockChild[] = []; + +type MockChild = EventEmitter & { + send: ReturnType; + kill: ReturnType; + pid: number; + connected: boolean; +}; + +function makeMockChild(pid = 1): MockChild { + const child = new EventEmitter() as MockChild; + child.connected = true; + child.send = vi.fn((msg: unknown) => { + if ( + msg !== null && + typeof msg === "object" && + (msg as Record).type === "shutdown" + ) { + setImmediate(() => { + child.connected = false; + child.emit("exit", 0, null); + }); + } + }); + child.kill = vi.fn((_signal?: string) => { + child.connected = false; + child.emit("exit", null, _signal ?? "SIGKILL"); + }); + child.pid = pid; + return child; +} + +vi.mock("node:child_process", () => ({ + fork: vi.fn((_script: string, _args: string[], _opts: unknown) => { + const child = makeMockChild(mockChildren.length + 100); + mockChildren.push(child); + return child; + }), +})); + +// Must also mock file-watcher since kernel imports it +vi.mock("../file-watcher.js", () => ({ + createFileWatcher: vi.fn(() => ({ close: vi.fn() })), +})); + +const { createKernel } = await import("../kernel.js"); + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function makeConfig(overrides: Partial = {}): NerveConfig { + return { + senses: { + "cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + }, + reflexes: [], + workflows: null, + ...overrides, + }; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe("kernel — getHealth", () => { + beforeEach(() => { + mockChildren.length = 0; + vi.useFakeTimers({ shouldAdvanceTime: true }); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("returns correct health shape", async () => { + const config = makeConfig({ + senses: { + "cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + "disk-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + "net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null }, + }, + }); + const kernel = createKernel(config, "/tmp/nerve-test"); + + const health = kernel.getHealth(); + expect(health.activeSenses).toBe(3); + expect(health.activeGroups).toBe(2); + expect(health.uptime).toBeGreaterThanOrEqual(0); + expect(health.memoryUsage).toBeDefined(); + expect(typeof health.memoryUsage.heapUsed).toBe("number"); + + await kernel.stop(); + }); +}); + +describe("kernel — restartGroup", () => { + beforeEach(() => { + mockChildren.length = 0; + vi.useFakeTimers({ shouldAdvanceTime: true }); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("sends shutdown to old worker and spawns new one", async () => { + const config = makeConfig(); + const kernel = createKernel(config, "/tmp/nerve-test"); + + expect(mockChildren.length).toBe(1); + const oldChild = mockChildren[0]; + + const restartPromise = kernel.restartGroup("system"); + // The shutdown message triggers exit in the mock + await restartPromise; + + // A new child should have been spawned + expect(mockChildren.length).toBe(2); + + // Old child got shutdown + expect(oldChild.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" })); + + await kernel.stop(); + }); + + it("restartGroup on unknown group does nothing", async () => { + const config = makeConfig(); + const kernel = createKernel(config, "/tmp/nerve-test"); + + expect(mockChildren.length).toBe(1); + await kernel.restartGroup("nonexistent"); + // No new child spawned + expect(mockChildren.length).toBe(1); + + await kernel.stop(); + }); +}); + +describe("kernel — reloadConfig", () => { + beforeEach(() => { + mockChildren.length = 0; + vi.useFakeTimers({ shouldAdvanceTime: true }); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("adds new group worker when new sense group appears", async () => { + const config = makeConfig(); + const kernel = createKernel(config, "/tmp/nerve-test"); + + expect(mockChildren.length).toBe(1); // only system group + expect(kernel.groups.has("network")).toBe(false); + + kernel.reloadConfig({ + senses: { + "cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + "net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null }, + }, + reflexes: [], + workflows: null, + }); + + expect(kernel.groups.has("network")).toBe(true); + expect(mockChildren.length).toBe(2); // system + network + + await kernel.stop(); + }); + + it("removes group worker when its senses are all removed", async () => { + const config: NerveConfig = { + senses: { + "cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + "net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null }, + }, + reflexes: [], + workflows: null, + }; + const kernel = createKernel(config, "/tmp/nerve-test"); + + expect(mockChildren.length).toBe(2); + expect(kernel.groups.has("network")).toBe(true); + + const networkChild = mockChildren[1]; + + kernel.reloadConfig({ + senses: { + "cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + }, + reflexes: [], + workflows: null, + }); + + expect(kernel.groups.has("network")).toBe(false); + // Network child should have received shutdown + expect(networkChild.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" })); + + await kernel.stop(); + }); + + it("health reflects updated sense count after reloadConfig", async () => { + const config = makeConfig(); + const kernel = createKernel(config, "/tmp/nerve-test"); + + expect(kernel.getHealth().activeSenses).toBe(1); + + kernel.reloadConfig({ + senses: { + "cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + "disk-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + }, + reflexes: [], + workflows: null, + }); + + expect(kernel.getHealth().activeSenses).toBe(2); + + await kernel.stop(); + }); +}); diff --git a/packages/daemon/src/__tests__/phase6-integration.test.ts b/packages/daemon/src/__tests__/phase6-integration.test.ts new file mode 100644 index 0000000..20ae945 --- /dev/null +++ b/packages/daemon/src/__tests__/phase6-integration.test.ts @@ -0,0 +1,361 @@ +/** + * Phase 6 integration tests — hot reload, error isolation, grace period, health. + */ + +import { dirname, join } from "node:path"; +import { fileURLToPath } from "node:url"; + +import type { Signal } from "@uncaged/nerve-core"; +import type { NerveConfig } from "@uncaged/nerve-core"; +import { afterEach, describe, expect, it } from "vitest"; + +import { createKernel } from "../kernel.js"; +import type { Kernel } from "../kernel.js"; + +const __filename = fileURLToPath(import.meta.url); +const __dir = dirname(__filename); +const MOCK_WORKER = join(__dir, "fixtures", "mock-worker.mjs"); +const ERROR_WORKER = join(__dir, "fixtures", "error-worker.mjs"); + +function makeConfig(overrides: Partial = {}): NerveConfig { + return { + senses: { + "cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + }, + reflexes: [], + workflows: null, + ...overrides, + }; +} + +async function pollUntil( + predicate: () => boolean, + timeoutMs: number, + intervalMs = 50, +): Promise { + return new Promise((resolve, reject) => { + const timer = setTimeout( + () => reject(new Error(`pollUntil timed out after ${timeoutMs}ms`)), + timeoutMs, + ); + const check = setInterval(() => { + if (predicate()) { + clearTimeout(timer); + clearInterval(check); + resolve(); + } + }, intervalMs); + }); +} + +// --------------------------------------------------------------------------- +// Hot Reload — restartGroup +// --------------------------------------------------------------------------- + +describe("phase6 — restartGroup", () => { + let kernel: Kernel | null = null; + + afterEach(async () => { + if (kernel !== null) { + await kernel.stop(); + kernel = null; + } + }); + + it("restartGroup stops old worker and spawns a new one", async () => { + const config = makeConfig(); + kernel = createKernel(config, "/tmp/nerve-phase6-test", { + workerScript: MOCK_WORKER, + }); + + await kernel.ready; + + const oldPid = kernel.getWorkerPid("system"); + expect(oldPid).not.toBeNull(); + + await kernel.restartGroup("system"); + + // Wait for new worker to become ready + await pollUntil(() => { + const newPid = kernel?.getWorkerPid("system"); + return newPid !== null && newPid !== oldPid; + }, 5000); + + const newPid = kernel.getWorkerPid("system"); + expect(newPid).not.toBeNull(); + expect(newPid).not.toBe(oldPid); + + // Verify new worker is functional + const received: Signal[] = []; + const unsub = kernel.bus.subscribe((s) => received.push(s)); + kernel.triggerCompute("cpu-usage"); + await pollUntil(() => received.length > 0, 3000); + expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 }); + unsub(); + }, 15_000); + + it("restartGroup on nonexistent group does nothing", async () => { + const config = makeConfig(); + kernel = createKernel(config, "/tmp/nerve-phase6-test", { + workerScript: MOCK_WORKER, + }); + await kernel.ready; + + // Should not throw + await kernel.restartGroup("nonexistent"); + }, 5_000); +}); + +// --------------------------------------------------------------------------- +// Hot Reload — reloadConfig +// --------------------------------------------------------------------------- + +describe("phase6 — reloadConfig", () => { + let kernel: Kernel | null = null; + + afterEach(async () => { + if (kernel !== null) { + await kernel.stop(); + kernel = null; + } + }); + + it("adds new group when new sense group is introduced", async () => { + const config = makeConfig(); + kernel = createKernel(config, "/tmp/nerve-phase6-test", { + workerScript: MOCK_WORKER, + }); + await kernel.ready; + + expect(kernel.groups.has("network")).toBe(false); + + const newConfig: NerveConfig = { + senses: { + "cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + "net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null }, + }, + reflexes: [], + workflows: null, + }; + + kernel.reloadConfig(newConfig); + + expect(kernel.groups.has("network")).toBe(true); + + // Wait for the new network worker to start + await pollUntil(() => kernel?.getWorkerPid("network") !== null, 3000); + expect(kernel.getWorkerPid("network")).not.toBeNull(); + }, 10_000); + + it("removes group when all its senses are removed", async () => { + const config: NerveConfig = { + senses: { + "cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + "net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null }, + }, + reflexes: [], + workflows: null, + }; + kernel = createKernel(config, "/tmp/nerve-phase6-test", { + workerScript: MOCK_WORKER, + }); + await kernel.ready; + + expect(kernel.groups.has("network")).toBe(true); + + const newConfig: NerveConfig = { + senses: { + "cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + }, + reflexes: [], + workflows: null, + }; + + kernel.reloadConfig(newConfig); + + expect(kernel.groups.has("network")).toBe(false); + }, 10_000); +}); + +// --------------------------------------------------------------------------- +// Error Isolation +// --------------------------------------------------------------------------- + +describe("phase6 — error isolation", () => { + let kernel: Kernel | null = null; + + afterEach(async () => { + if (kernel !== null) { + await kernel.stop(); + kernel = null; + } + }); + + it("error from one sense does not crash the worker — other senses still work", async () => { + const config: NerveConfig = { + senses: { + "good-sense": { group: "mixed", throttle: null, timeout: null, gracePeriod: null }, + "bad-sense": { group: "mixed", throttle: null, timeout: null, gracePeriod: null }, + }, + reflexes: [], + workflows: null, + }; + + kernel = createKernel(config, "/tmp/nerve-phase6-test", { + workerScript: MOCK_WORKER, + }); + await kernel.ready; + + // Both senses go through the same worker (mock-worker responds to all compute with signal) + const received: Signal[] = []; + const unsub = kernel.bus.subscribe((s) => received.push(s)); + + kernel.triggerCompute("good-sense"); + await pollUntil(() => received.length > 0, 3000); + expect(received[0]).toMatchObject({ senseId: "good-sense" }); + + kernel.triggerCompute("bad-sense"); + await pollUntil(() => received.length > 1, 3000); + expect(received[1]).toMatchObject({ senseId: "bad-sense" }); + + unsub(); + }, 10_000); + + it("error worker sends error messages, kernel still running", async () => { + const stderrMessages: string[] = []; + const stderrSpy = ((original) => { + return (chunk: string | Uint8Array) => { + stderrMessages.push(String(chunk)); + return original.call(process.stderr, chunk); + }; + })(process.stderr.write); + const origWrite = process.stderr.write; + process.stderr.write = stderrSpy as typeof process.stderr.write; + + const config = makeConfig(); + kernel = createKernel(config, "/tmp/nerve-phase6-test", { + workerScript: ERROR_WORKER, + }); + await kernel.ready; + + kernel.triggerCompute("cpu-usage"); + + // Wait for the error to be logged + await pollUntil(() => stderrMessages.some((m) => m.includes("simulated compute error")), 3000); + + process.stderr.write = origWrite; + + // Kernel should still be running (not crashed) + expect(kernel.getWorkerPid("system")).not.toBeNull(); + }, 10_000); +}); + +// --------------------------------------------------------------------------- +// getHealth +// --------------------------------------------------------------------------- + +describe("phase6 — getHealth", () => { + let kernel: Kernel | null = null; + + afterEach(async () => { + if (kernel !== null) { + await kernel.stop(); + kernel = null; + } + }); + + it("returns health snapshot with correct shape", async () => { + const config = makeConfig({ + senses: { + "cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + "disk-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + "net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null }, + }, + }); + kernel = createKernel(config, "/tmp/nerve-phase6-test", { + workerScript: MOCK_WORKER, + }); + await kernel.ready; + + const health = kernel.getHealth(); + + expect(health.uptime).toBeGreaterThanOrEqual(0); + expect(health.activeSenses).toBe(3); + expect(health.activeGroups).toBe(2); + expect(health.memoryUsage).toBeDefined(); + expect(typeof health.memoryUsage.heapUsed).toBe("number"); + }, 10_000); + + it("health reflects config changes after reloadConfig", async () => { + const config = makeConfig(); + kernel = createKernel(config, "/tmp/nerve-phase6-test", { + workerScript: MOCK_WORKER, + }); + await kernel.ready; + + expect(kernel.getHealth().activeSenses).toBe(1); + + const newConfig: NerveConfig = { + senses: { + "cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + "net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null }, + }, + reflexes: [], + workflows: null, + }; + kernel.reloadConfig(newConfig); + + expect(kernel.getHealth().activeSenses).toBe(2); + expect(kernel.getHealth().activeGroups).toBe(2); + }, 10_000); +}); + +// --------------------------------------------------------------------------- +// Auto-respawn on crash (existing test extended) +// --------------------------------------------------------------------------- + +describe("phase6 — auto-respawn on worker crash", () => { + let kernel: Kernel | null = null; + + afterEach(async () => { + if (kernel !== null) { + await kernel.stop(); + kernel = null; + } + }); + + it("kernel auto-respawns worker and new worker is functional", async () => { + const config = makeConfig(); + kernel = createKernel(config, "/tmp/nerve-phase6-test", { + workerScript: MOCK_WORKER, + }); + await kernel.ready; + + const originalPid = kernel.getWorkerPid("system"); + expect(originalPid).not.toBeNull(); + + // Kill worker to simulate crash + process.kill(originalPid as number, "SIGKILL"); + + // Wait for respawn + await pollUntil(() => { + const pid = kernel?.getWorkerPid("system"); + return pid !== null && pid !== originalPid; + }, 5000); + + const newPid = kernel.getWorkerPid("system"); + expect(newPid).not.toBeNull(); + expect(newPid).not.toBe(originalPid); + + // Verify new worker responds + const received: Signal[] = []; + const unsub = kernel.bus.subscribe((s) => received.push(s)); + kernel.triggerCompute("cpu-usage"); + await pollUntil(() => received.length > 0, 5000); + expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 }); + unsub(); + + await kernel.stop(); + kernel = null; + }, 15_000); +}); diff --git a/packages/daemon/src/file-watcher.ts b/packages/daemon/src/file-watcher.ts new file mode 100644 index 0000000..1b784dd --- /dev/null +++ b/packages/daemon/src/file-watcher.ts @@ -0,0 +1,107 @@ +/** + * File Watcher — watches nerveRoot for file changes and signals the kernel. + * + * Uses Node.js fs.watch (no external dependencies). + * + * Watched events: + * - .ts file under senses/ modified → callback with { kind: "sense", senseName, filePath } + * - nerve.yaml modified → callback with { kind: "config", filePath } + * + * Debounces rapid changes (e.g. editor save flicker) with a configurable delay. + */ + +import { watch } from "node:fs"; +import type { FSWatcher } from "node:fs"; +import { join, relative, sep } from "node:path"; + +export type SenseFileChange = { + kind: "sense"; + senseName: string; + filePath: string; +}; + +export type ConfigFileChange = { + kind: "config"; + filePath: string; +}; + +export type FileChange = SenseFileChange | ConfigFileChange; + +export type FileChangeHandler = (change: FileChange) => void; + +export type FileWatcher = { + close: () => void; +}; + +const DEFAULT_DEBOUNCE_MS = 300; + +export function createFileWatcher( + nerveRoot: string, + handler: FileChangeHandler, + debounceMs: number = DEFAULT_DEBOUNCE_MS, +): FileWatcher { + const watchers: FSWatcher[] = []; + const debounceTimers = new Map>(); + + function debounced(key: string, fn: () => void): void { + const existing = debounceTimers.get(key); + if (existing !== undefined) clearTimeout(existing); + debounceTimers.set( + key, + setTimeout(() => { + debounceTimers.delete(key); + fn(); + }, debounceMs), + ); + } + + function handleFsEvent(_eventType: string, filename: string | null): void { + if (filename === null) return; + + const normalized = filename.split(sep).join("/"); + + if (normalized === "nerve.yaml") { + debounced("config", () => { + handler({ kind: "config", filePath: join(nerveRoot, "nerve.yaml") }); + }); + return; + } + + if (normalized.startsWith("senses/") && normalized.endsWith(".ts")) { + const rel = relative("senses", normalized); + const senseName = rel.split("/")[0]; + if (senseName) { + debounced(`sense:${senseName}`, () => { + handler({ + kind: "sense", + senseName, + filePath: join(nerveRoot, filename), + }); + }); + } + } + } + + try { + const w = watch(nerveRoot, { recursive: true }, (eventType, filename) => { + handleFsEvent(eventType, filename); + }); + watchers.push(w); + } catch (e) { + const msg = e instanceof Error ? e.message : String(e); + process.stderr.write(`[file-watcher] Failed to watch "${nerveRoot}": ${msg}\n`); + } + + function close(): void { + for (const timer of debounceTimers.values()) { + clearTimeout(timer); + } + debounceTimers.clear(); + for (const w of watchers) { + w.close(); + } + watchers.length = 0; + } + + return { close }; +} diff --git a/packages/daemon/src/index.ts b/packages/daemon/src/index.ts index cdb2d6b..86f4bb6 100644 --- a/packages/daemon/src/index.ts +++ b/packages/daemon/src/index.ts @@ -1,6 +1,8 @@ export type { ComputeMessage, ShutdownMessage, + HealthRequestMessage, + HealthResponseMessage, ParentToWorkerMessage, SignalMessage, ErrorMessage, @@ -21,4 +23,7 @@ export { } from "./sense-runtime.js"; export { createKernel } from "./kernel.js"; -export type { Kernel, KernelOptions } from "./kernel.js"; +export type { Kernel, KernelOptions, KernelHealth } from "./kernel.js"; + +export { createFileWatcher } from "./file-watcher.js"; +export type { FileWatcher, FileChange, FileChangeHandler } from "./file-watcher.js"; diff --git a/packages/daemon/src/ipc.ts b/packages/daemon/src/ipc.ts index f4cda47..4a5c2b5 100644 --- a/packages/daemon/src/ipc.ts +++ b/packages/daemon/src/ipc.ts @@ -17,8 +17,13 @@ export type ShutdownMessage = { type: "shutdown"; }; +/** Parent → Worker: request health info from worker */ +export type HealthRequestMessage = { + type: "health-request"; +}; + /** Union of all messages the parent sends to a worker */ -export type ParentToWorkerMessage = ComputeMessage | ShutdownMessage; +export type ParentToWorkerMessage = ComputeMessage | ShutdownMessage | HealthRequestMessage; /** Worker → Parent: compute produced a signal */ export type SignalMessage = { @@ -39,8 +44,21 @@ export type ReadyMessage = { type: "ready"; }; +/** Worker → Parent: health info response */ +export type HealthResponseMessage = { + type: "health-response"; + senses: string[]; + inFlightCount: number; +}; + /** Union of all messages a worker sends to the parent */ -export type WorkerToParentMessage = SignalMessage | ErrorMessage | ReadyMessage; +export type WorkerToParentMessage = + | SignalMessage + | ErrorMessage + | ReadyMessage + | HealthResponseMessage; + +const PARENT_MSG_TYPES = new Set(["compute", "shutdown", "health-request"]); /** Validate and parse an unknown IPC message received from the parent process. */ export function parseParentMessage(raw: unknown): Result { @@ -51,13 +69,47 @@ export function parseParentMessage(raw: unknown): Result if (typeof obj.type !== "string") { return err(new Error("IPC message missing string 'type' field")); } - const type = obj.type; - if (type !== "compute" && type !== "shutdown") { - return err(new Error(`Unknown IPC message type: "${type}"`)); + if (!PARENT_MSG_TYPES.has(obj.type)) { + return err(new Error(`Unknown IPC message type: "${obj.type}"`)); } return ok(raw as ParentToWorkerMessage); } +function parseSignalMsg(obj: Record, raw: unknown): Result { + if (typeof obj.sense !== "string") { + return err(new Error("Worker 'signal' message missing string 'sense' field")); + } + if (!("payload" in obj)) { + return err(new Error("Worker 'signal' message missing 'payload' field")); + } + return ok(raw as SignalMessage); +} + +function parseErrorMsg(obj: Record, raw: unknown): Result { + if (typeof obj.sense !== "string") { + return err(new Error("Worker 'error' message missing string 'sense' field")); + } + if (typeof obj.error !== "string") { + return err(new Error("Worker 'error' message missing string 'error' field")); + } + return ok(raw as ErrorMessage); +} + +function parseHealthResponseMsg( + obj: Record, + raw: unknown, +): Result { + if (!Array.isArray(obj.senses)) { + return err(new Error("Worker 'health-response' message missing 'senses' array")); + } + if (typeof obj.inFlightCount !== "number") { + return err(new Error("Worker 'health-response' message missing 'inFlightCount' number")); + } + return ok(raw as HealthResponseMessage); +} + +const WORKER_MSG_TYPES = new Set(["signal", "error", "ready", "health-response"]); + /** Validate and parse an unknown IPC message received from a worker process. */ export function parseWorkerMessage(raw: unknown): Result { if (raw === null || typeof raw !== "object") { @@ -67,27 +119,11 @@ export function parseWorkerMessage(raw: unknown): Result if (typeof obj.type !== "string") { return err(new Error("Worker IPC message missing string 'type' field")); } - const type = obj.type; - if (type !== "signal" && type !== "error" && type !== "ready") { - return err(new Error(`Unknown worker IPC message type: "${type}"`)); - } - if (type === "signal") { - if (typeof obj.sense !== "string") { - return err(new Error("Worker 'signal' message missing string 'sense' field")); - } - if (!("payload" in obj)) { - return err(new Error("Worker 'signal' message missing 'payload' field")); - } - return ok(raw as SignalMessage); - } - if (type === "error") { - if (typeof obj.sense !== "string") { - return err(new Error("Worker 'error' message missing string 'sense' field")); - } - if (typeof obj.error !== "string") { - return err(new Error("Worker 'error' message missing string 'error' field")); - } - return ok(raw as ErrorMessage); + if (!WORKER_MSG_TYPES.has(obj.type)) { + return err(new Error(`Unknown worker IPC message type: "${obj.type}"`)); } + if (obj.type === "signal") return parseSignalMsg(obj, raw); + if (obj.type === "error") return parseErrorMsg(obj, raw); + if (obj.type === "health-response") return parseHealthResponseMsg(obj, raw); return ok({ type: "ready" }); } diff --git a/packages/daemon/src/kernel.ts b/packages/daemon/src/kernel.ts index 4fbbdde..8154d60 100644 --- a/packages/daemon/src/kernel.ts +++ b/packages/daemon/src/kernel.ts @@ -8,15 +8,21 @@ * - Route ErrorMessage from workers → stderr log * - Drive compute triggers via ReflexScheduler * - Graceful shutdown: stop scheduler, send shutdown to all workers + * - Hot reload: restartGroup, reloadConfig, file watcher integration + * - Health reporting: getHealth */ import { fork } from "node:child_process"; import type { ChildProcess } from "node:child_process"; +import { readFileSync } from "node:fs"; import { dirname, join } from "node:path"; import { fileURLToPath } from "node:url"; import type { NerveConfig, Signal } from "@uncaged/nerve-core"; +import { parseNerveConfig } from "@uncaged/nerve-core"; +import { createFileWatcher } from "./file-watcher.js"; +import type { FileWatcher } from "./file-watcher.js"; import type { ComputeMessage, ShutdownMessage } from "./ipc.js"; import { parseWorkerMessage } from "./ipc.js"; import { createReflexScheduler } from "./reflex-scheduler.js"; @@ -24,6 +30,14 @@ import type { ReflexScheduler } from "./reflex-scheduler.js"; import { createSignalBus } from "./signal-bus.js"; import type { SignalBus } from "./signal-bus.js"; +export type KernelHealth = { + uptime: number; + activeSenses: number; + activeGroups: number; + pendingComputes: number; + memoryUsage: NodeJS.MemoryUsage; +}; + export type Kernel = { stop: () => Promise; groups: Set; @@ -35,6 +49,12 @@ export type Kernel = { getWorkerPid: (group: string) => number | null; /** Sends a compute message to the worker responsible for the given sense. */ triggerCompute: (senseName: string) => void; + /** Gracefully restart a group worker (wait for exit, then respawn). */ + restartGroup: (group: string) => Promise; + /** Reload config from a new NerveConfig, incrementally updating scheduler and workers. */ + reloadConfig: (newConfig: NerveConfig) => void; + /** Return daemon health info. */ + getHealth: () => KernelHealth; }; type WorkerEntry = { @@ -66,7 +86,6 @@ function sendCompute(worker: ChildProcess, senseName: string): void { } function sendShutdown(worker: ChildProcess): void { - // worker.connected is false when the IPC channel has been closed (e.g. worker crashed) if (worker.connected === false) return; const msg: ShutdownMessage = { type: "shutdown" }; try { @@ -84,21 +103,24 @@ function groupForSense(config: NerveConfig, senseName: string): string | null { export type KernelOptions = { workerScript: string; + enableFileWatcher?: boolean; }; function defaultKernelOptions(): KernelOptions { - return { workerScript: resolveWorkerScript() }; + return { workerScript: resolveWorkerScript(), enableFileWatcher: false }; } export function createKernel( - config: NerveConfig, + initialConfig: NerveConfig, nerveRoot: string, options: KernelOptions = defaultKernelOptions(), ): Kernel { const bus: SignalBus = createSignalBus(); const workerScript = options.workerScript; + const startTime = Date.now(); + + let config = initialConfig; - // Signal ID counter is instance-scoped (fix #2) let _signalIdCounter = 0; function nextSignalId(): number { _signalIdCounter += 1; @@ -112,7 +134,6 @@ export function createKernel( const workers = new Map(); let stopped = false; - // eslint-disable-next-line prefer-const let scheduler: ReflexScheduler = null as unknown as ReflexScheduler; let readyResolve: (() => void) | undefined; @@ -159,6 +180,8 @@ export function createKernel( bus.emit(signal); scheduler.onComputeComplete(msg.sense); } + + // health-response is handled externally by the caller; no action needed here } function startWorker(group: string): void { @@ -170,10 +193,8 @@ export function createKernel( process.stderr.write( `[kernel] worker for group "${group}" exited with code ${code ?? "null"}\n`, ); - // Respawn on unexpected exit (fix #4) if (!stopped && code !== 0) { process.stderr.write(`[kernel] respawning worker for group "${group}" in 1s\n`); - // Clear any stuck in-flight state for all senses in this group for (const senseName of sensesForGroup(group)) { scheduler.onComputeComplete(senseName); } @@ -212,7 +233,6 @@ export function createKernel( startWorker(group); } - // Wait for a worker process to exit, with a timeout + SIGKILL fallback (fix #5) function waitForExit(child: ChildProcess, timeoutMs: number): Promise { return new Promise((resolve) => { const timer = setTimeout(() => { @@ -226,8 +246,115 @@ export function createKernel( }); } + // --- restartGroup: gracefully stop worker, then respawn --- + async function restartGroup(group: string): Promise { + const entry = workers.get(group); + if (entry === undefined) return; + + for (const senseName of sensesForGroup(group)) { + scheduler.onComputeComplete(senseName); + } + + sendShutdown(entry.process); + await waitForExit(entry.process, 5000); + + if (!stopped) { + startWorker(group); + } + } + + function collectGroups(cfg: NerveConfig): Set { + const result = new Set(); + for (const sc of Object.values(cfg.senses)) { + result.add(sc.group); + } + return result; + } + + function removeStaleGroups(oldGroups: Set, newGroups: Set): void { + for (const g of oldGroups) { + if (newGroups.has(g)) continue; + const entry = workers.get(g); + if (entry !== undefined) { + sendShutdown(entry.process); + workers.delete(g); + } + groups.delete(g); + } + } + + function addNewGroups(oldGroups: Set, newGroups: Set): void { + for (const g of newGroups) { + if (oldGroups.has(g)) continue; + groups.add(g); + if (!stopped) startWorker(g); + } + } + + function reloadConfig(newConfig: NerveConfig): void { + const oldGroups = collectGroups(config); + config = newConfig; + scheduler.stop(); + scheduler = createReflexScheduler(config, bus, triggerFn); + const newGroups = collectGroups(newConfig); + removeStaleGroups(oldGroups, newGroups); + addNewGroups(oldGroups, newGroups); + } + + function getHealth(): KernelHealth { + return { + uptime: Date.now() - startTime, + activeSenses: Object.keys(config.senses).length, + activeGroups: workers.size, + pendingComputes: 0, + memoryUsage: process.memoryUsage(), + }; + } + + function handleSenseFileChange(senseName: string): void { + const sc = config.senses[senseName]; + if (sc === undefined) return; + process.stderr.write( + `[kernel] sense file changed: "${senseName}", restarting group "${sc.group}"\n`, + ); + restartGroup(sc.group).catch((e) => { + const msg = e instanceof Error ? e.message : String(e); + process.stderr.write(`[kernel] restartGroup error: ${msg}\n`); + }); + } + + function handleConfigFileChange(): void { + process.stderr.write("[kernel] nerve.yaml changed, reloading config\n"); + try { + const raw = readFileSync(join(nerveRoot, "nerve.yaml"), "utf8"); + const parseResult = parseNerveConfig(raw); + if (!parseResult.ok) { + process.stderr.write( + `[kernel] config parse error, keeping current config: ${parseResult.error.message}\n`, + ); + return; + } + reloadConfig(parseResult.value); + } catch (e) { + const msg = e instanceof Error ? e.message : String(e); + process.stderr.write(`[kernel] failed to read nerve.yaml, keeping current config: ${msg}\n`); + } + } + + let fileWatcher: FileWatcher | null = null; + if (options.enableFileWatcher) { + fileWatcher = createFileWatcher(nerveRoot, (change) => { + if (change.kind === "sense") handleSenseFileChange(change.senseName); + if (change.kind === "config") handleConfigFileChange(); + }); + } + async function stop(): Promise { stopped = true; + if (fileWatcher !== null) { + fileWatcher.close(); + fileWatcher = null; + } scheduler.stop(); const exitPromises: Promise[] = []; for (const entry of workers.values()) { @@ -243,5 +370,16 @@ export function createKernel( const senseCount = Object.keys(config.senses).length; - return { stop, groups, senseCount, bus, ready, getWorkerPid, triggerCompute: triggerFn }; + return { + stop, + groups, + senseCount, + bus, + ready, + getWorkerPid, + triggerCompute: triggerFn, + restartGroup, + reloadConfig, + getHealth, + }; } diff --git a/packages/daemon/src/sense-worker.ts b/packages/daemon/src/sense-worker.ts index f5f5dc0..d659abf 100644 --- a/packages/daemon/src/sense-worker.ts +++ b/packages/daemon/src/sense-worker.ts @@ -79,18 +79,12 @@ async function initSense( const dbResult = openSenseDb(dbPath, migrationsDir); if (!dbResult.ok) { - process.stderr.write( - `[sense-worker] Failed to init DB for "${senseName}": ${dbResult.error.message}\n`, - ); - process.exit(1); + throw new Error(`Failed to init DB for "${senseName}": ${dbResult.error.message}`); } const computeResult = await loadComputeFn(senseIndexPath); if (!computeResult.ok) { - process.stderr.write( - `[sense-worker] Failed to load compute for "${senseName}": ${computeResult.error.message}\n`, - ); - process.exit(1); + throw new Error(`Failed to load compute for "${senseName}": ${computeResult.error.message}`); } const { db } = dbResult.value; @@ -129,12 +123,70 @@ function buildPeers( return Object.fromEntries(entries); } +// --------------------------------------------------------------------------- +// Grace period: hard kill after soft timeout +// --------------------------------------------------------------------------- + +const gracePeriodTimers = new Map>(); + +function scheduleGracePeriodKill(senseName: string, gracePeriodMs: number): void { + if (gracePeriodTimers.has(senseName)) return; + process.stderr.write(`[sense-worker] grace period for "${senseName}" (${gracePeriodMs}ms)\n`); + const timer = setTimeout(() => { + process.stderr.write(`[sense-worker] grace period expired for "${senseName}", hard kill\n`); + process.exit(1); + }, gracePeriodMs); + gracePeriodTimers.set(senseName, timer); +} + +function clearGracePeriodTimer(senseName: string): void { + const timer = gracePeriodTimers.get(senseName); + if (timer === undefined) return; + clearTimeout(timer); + gracePeriodTimers.delete(senseName); +} + +// --------------------------------------------------------------------------- +// Compute execution with error isolation +// --------------------------------------------------------------------------- + +async function runCompute( + senseName: string, + runtime: SenseRuntime, + peers: PeerMap, + timeoutMs: number, + gracePeriodMs: number | null, +): Promise { + try { + const result = await executeCompute(runtime, peers, timeoutMs); + if (!result.ok) { + sendError(senseName, result.error.message); + if (gracePeriodMs !== null && result.error.message.includes("timed out")) { + scheduleGracePeriodKill(senseName, gracePeriodMs); + } + return; + } + clearGracePeriodTimer(senseName); + if (result.value !== null) { + sendSignal(senseName, result.value); + } + } catch (e: unknown) { + const errMsg = e instanceof Error ? e.message : String(e); + sendError(senseName, errMsg); + } +} + +// --------------------------------------------------------------------------- +// IPC message dispatch +// --------------------------------------------------------------------------- + function handleMessage( raw: unknown, runtimes: Map, peers: PeerMap, group: string, timeoutMs: number, + gracePeriodMs: number | null, inFlight: Map>, ): void { const parseResult = parseParentMessage(raw); @@ -149,33 +201,31 @@ function handleMessage( process.exit(0); } - if (msg.type === "compute") { - const runtime = runtimes.get(msg.sense); - if (!runtime) { - sendError(msg.sense, `Unknown sense "${msg.sense}" in group "${group}"`); - return; - } - - // Serialize computes for the same sense - const previous = inFlight.get(msg.sense) ?? Promise.resolve(); - const next = previous.then(async () => { - const result = await executeCompute(runtime, peers, timeoutMs); - if (!result.ok) { - sendError(msg.sense, result.error.message); - return; - } - if (result.value !== null) { - sendSignal(msg.sense, result.value); - } + if (msg.type === "health-request") { + send({ + type: "health-response", + senses: [...runtimes.keys()], + inFlightCount: inFlight.size, }); + return; + } - const tracked = next.catch((e: unknown) => { + if (msg.type !== "compute") return; + + const runtime = runtimes.get(msg.sense); + if (!runtime) { + sendError(msg.sense, `Unknown sense "${msg.sense}" in group "${group}"`); + return; + } + + const previous = inFlight.get(msg.sense) ?? Promise.resolve(); + const next = previous + .then(() => runCompute(msg.sense, runtime, peers, timeoutMs, gracePeriodMs)) + .catch((e: unknown) => { const errMsg = e instanceof Error ? e.message : String(e); sendError(msg.sense, errMsg); }); - - inFlight.set(msg.sense, tracked); - } + inFlight.set(msg.sense, next); } // --------------------------------------------------------------------------- @@ -198,29 +248,42 @@ async function bootstrap(nerveRoot: string, group: string): Promise { const runtimes = new Map(); const ownDbs = new Map(); + const failedSenses: string[] = []; for (const senseName of groupSenses) { - const { db, runtime } = await initSense(nerveRoot, senseName); - ownDbs.set(senseName, db); - runtimes.set(senseName, runtime); + try { + const { db, runtime } = await initSense(nerveRoot, senseName); + ownDbs.set(senseName, db); + runtimes.set(senseName, runtime); + } catch (e: unknown) { + const eMsg = e instanceof Error ? e.message : String(e); + process.stderr.write( + `[sense-worker] Failed to load sense "${senseName}", skipping: ${eMsg}\n`, + ); + failedSenses.push(senseName); + } + } + + // If ALL senses failed, exit with error so kernel respawns + if (runtimes.size === 0) { + process.stderr.write(`[sense-worker] All senses in group "${group}" failed to load, exiting\n`); + process.exit(1); } const groupSenseNames = new Set(groupSenses); const peers = buildPeers(nerveRoot, Object.keys(config.senses), ownDbs, groupSenseNames); - // Read timeout from config (uses first group sense's config, or default) + // Read timeout and grace period from config (uses first group sense's config) const firstSenseConfig = config.senses[groupSenses[0]]; - const timeoutMs = - typeof (firstSenseConfig as Record).timeoutMs === "number" - ? ((firstSenseConfig as Record).timeoutMs as number) - : DEFAULT_TIMEOUT_MS; + const timeoutMs = firstSenseConfig?.timeout ?? DEFAULT_TIMEOUT_MS; + const gracePeriodMs = firstSenseConfig?.gracePeriod ?? null; const inFlight = new Map>(); sendReady(); process.on("message", (raw: unknown) => { - handleMessage(raw, runtimes, peers, group, timeoutMs, inFlight); + handleMessage(raw, runtimes, peers, group, timeoutMs, gracePeriodMs, inFlight); }); }