Compare commits

..

6 Commits

Author SHA1 Message Date
xiaoju b4ef669607 fix: address all 9 PR #11 review issues
Critical:
- cli.ts: add shuttingDown guard to prevent double shutdown race
- kernel.ts: check child.connected before IPC send

Warning:
- Rewrite IPC round-trip test to verify actual signal flow
- Rewrite crash recovery test to kill kernel-managed worker
- parseArgs: explicitly handle 'start' subcommand
- Respawn: reset scheduler in-flight state for crashed group

Suggestions:
- Re-export KernelOptions from index.ts
- Add comment explaining mock-worker.mjs format
- Replace setTimeout with pollUntil helper

小橘 🍊(NEKO Team)
2026-04-22 09:57:06 +00:00
xiaoju 22b3690327 feat(cli,daemon): Phase 4 — Process Manager & Isolation
- CLI entry point: `nerve start [--root <path>]` with SIGINT/SIGTERM handling
- Kernel exports groups/senseCount for startup logging
- daemon tsup builds sense-worker.ts as separate entry point
- Integration tests with mock worker (IPC round-trip, crash recovery, graceful shutdown)
- CLI re-exports createKernel/Kernel from daemon

59 tests (was 54), all green. biome check passes.

Closes #5

小橘 🍊(NEKO Team)
2026-04-22 09:45:19 +00:00
xiaomo 7bb5df301d Merge pull request 'feat(daemon): Signal Bus, Reflex Scheduler & Kernel (Phase 3)' (#10) from feat/signal-bus-reflex into main 2026-04-22 09:36:56 +00:00
xiaoju 9443406703 fix(daemon): address all 12 PR #10 review items
🔴 Critical:
1. parseWorkerMessage() in ipc.ts — validates worker→parent IPC messages
2. signalIdCounter moved inside createKernel closure
3. throttle deferred trigger — pending trigger fires after window ends

⚠️ Warnings:
4. worker respawn on crash with backoff
5. stop() awaits worker exit with SIGKILL fallback
6. signal-bus handler errors caught, no re-throw
7. removed unnecessary as SenseReflexConfig cast
8. config validates sense reflex has at least one trigger

💡 Suggestions:
9. signal ID documented as process-scoped (solved by #2)
10. +3 throttle-pending tests
11. +6 kernel unit tests (mock fork, message routing)
12. example imports verified correct

54 tests (was 45), all green. biome check passes.

小橘 🍊(NEKO Team)
2026-04-22 09:34:13 +00:00
xiaoju d9355a6299 feat(daemon): Signal Bus, Reflex Scheduler & Kernel (Phase 3)
- signal-bus: in-memory pub/sub for Signal dispatch, sync broadcast,
  subscriber error isolation
- reflex-scheduler: interval + event-driven triggers, throttle enforcement,
  merge/coalesce (pending flag, no unbounded queue), workflow reflexes skipped
- kernel: orchestrator tying workers, signal bus, and scheduler together,
  graceful shutdown
- examples/nerve.yaml: cpu-usage (10s), disk-usage (30s), system-health
  (on: [cpu-usage, disk-usage])
- 20 new tests (45 total): signal bus (8) + reflex scheduler (12)

Closes #4

小橘 🍊(NEKO Team)
2026-04-22 08:56:38 +00:00
xiaomo bf60047186 Merge pull request 'feat(daemon): Sense Runtime — Worker, IPC, Migrations, Peer Isolation' (#9) from feat/sense-runtime into main 2026-04-22 08:48:30 +00:00
19 changed files with 1648 additions and 4 deletions
+40
View File
@@ -0,0 +1,40 @@
# Example nerve.yaml demonstrating Signal Bus & Reflex Scheduler (Phase 3)
#
# Layout:
# - cpu-usage: periodic every 10s, throttled to 5s minimum between computes
# - disk-usage: periodic every 30s
# - system-health: derived sense, triggered whenever cpu-usage OR disk-usage emits
senses:
cpu-usage:
group: system
throttle: 5s
timeout: 8s
grace_period: null
disk-usage:
group: system
throttle: null
timeout: 15s
grace_period: null
system-health:
group: derived
throttle: 2s
timeout: 10s
grace_period: null
reflexes:
# cpu-usage runs on a 10-second interval
- sense: cpu-usage
interval: 10s
# disk-usage runs on a 30-second interval
- sense: disk-usage
interval: 30s
# system-health is event-driven: fires whenever cpu-usage or disk-usage emits a signal
- sense: system-health
on:
- cpu-usage
- disk-usage
+4
View File
@@ -10,5 +10,9 @@
"types": "dist/index.d.ts",
"scripts": {
"build": "tsup"
},
"dependencies": {
"@uncaged/nerve-core": "workspace:*",
"@uncaged/nerve-daemon": "workspace:*"
}
}
+104
View File
@@ -0,0 +1,104 @@
#!/usr/bin/env node
import { readFileSync } from "node:fs";
import { homedir } from "node:os";
import { join } from "node:path";
import { parseNerveConfig } from "@uncaged/nerve-core";
import { createKernel } from "@uncaged/nerve-daemon";
const DEFAULT_NERVE_ROOT = join(homedir(), ".uncaged-nerve");
function parseArgs(argv: string[]): { root: string } {
// Skip argv[0] (node), argv[1] (script), argv[2] ('start' subcommand)
const args = argv.slice(3);
let root = DEFAULT_NERVE_ROOT;
for (let i = 0; i < args.length; i++) {
if (args[i] === "--root" && i + 1 < args.length) {
root = args[i + 1];
i++;
} else if (!args[i].startsWith("-")) {
process.stderr.write("Usage: nerve start [--root <path>]\n");
process.exit(1);
}
}
return { root };
}
function readConfig(nerveRoot: string): ReturnType<typeof parseNerveConfig> {
const configPath = join(nerveRoot, "nerve.yaml");
let raw: string;
try {
raw = readFileSync(configPath, "utf8");
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
return { ok: false, error: new Error(`Cannot read ${configPath}: ${msg}`) };
}
return parseNerveConfig(raw);
}
async function main(): Promise<void> {
const subcommand = process.argv[2];
if (subcommand !== "start") {
process.stderr.write("Usage: nerve start [--root <path>]\n");
process.exit(1);
}
const { root } = parseArgs(process.argv);
const configResult = readConfig(root);
if (!configResult.ok) {
process.stderr.write(`[nerve] Config error: ${configResult.error.message}\n`);
process.exit(1);
}
const config = configResult.value;
const kernel = createKernel(config, root);
process.stderr.write(
`[nerve] Starting — ${kernel.groups.size} group(s), ${kernel.senseCount} sense(s)\n`,
);
for (const group of kernel.groups) {
const groupSenses = Object.entries(config.senses)
.filter(([, sc]) => sc.group === group)
.map(([name]) => name);
process.stderr.write(`[nerve] group "${group}": ${groupSenses.join(", ")}\n`);
}
let shuttingDown = false;
async function shutdown(): Promise<void> {
if (shuttingDown) return;
shuttingDown = true;
process.stderr.write("\n[nerve] Shutting down…\n");
await kernel.stop();
process.exit(0);
}
process.on("SIGINT", () => {
shutdown().catch((e: unknown) => {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[nerve] Shutdown error: ${msg}\n`);
process.exit(1);
});
});
process.on("SIGTERM", () => {
shutdown().catch((e: unknown) => {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[nerve] Shutdown error: ${msg}\n`);
process.exit(1);
});
});
}
main().catch((e: unknown) => {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[nerve] Fatal error: ${msg}\n`);
process.exit(1);
});
+2 -1
View File
@@ -1 +1,2 @@
// TODO: implement
export { createKernel } from "@uncaged/nerve-daemon";
export type { Kernel } from "@uncaged/nerve-daemon";
+1 -1
View File
@@ -1,7 +1,7 @@
import { defineConfig } from "tsup";
export default defineConfig({
entry: ["src/index.ts"],
entry: ["src/index.ts", "src/cli.ts"],
format: ["esm"],
dts: true,
clean: true,
+6
View File
@@ -98,6 +98,12 @@ function parseSenseReflex(
const intervalResult = parseDurationField(obj.interval, `reflexes[${index}].interval`);
if (!intervalResult.ok) return intervalResult;
if (intervalResult.value === null && on !== null && on.length === 0) {
return err(
new Error(`reflexes[${index}]: sense reflex must have at least one of "interval" or "on"`),
);
}
return ok({
kind: "sense" as const,
sense: obj.sense,
@@ -0,0 +1,28 @@
/**
* Mock sense worker that implements the IPC protocol for integration testing.
*
* This file intentionally uses the .mjs extension (rather than .ts) because it
* is spawned as a child process via fork() at runtime and must execute without
* any TypeScript compilation step. Node.js can run .mjs files directly as
* ESModules, whereas .ts files would require ts-node/tsx to be available in the
* child process environment.
*
* Behaviour:
* - Sends { type: "ready" } on startup
* - On { type: "compute", sense } → sends back { type: "signal", sense, payload: 42 }
* - On { type: "shutdown" } → exits cleanly with code 0
*/
process.send({ type: "ready" });
process.on("message", (msg) => {
if (!msg || typeof msg !== "object") return;
if (msg.type === "shutdown") {
process.exit(0);
}
if (msg.type === "compute" && typeof msg.sense === "string") {
process.send({ type: "signal", sense: msg.sense, payload: 42 });
}
});
@@ -0,0 +1,207 @@
/**
* Integration tests for the Kernel / Process Manager.
*
* These tests use REAL child processes via a mock worker fixture that
* implements the IPC protocol (fixtures/mock-worker.mjs). No build
* artifacts are required.
*/
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import type { Signal } from "@uncaged/nerve-core";
import type { NerveConfig } from "@uncaged/nerve-core";
import { afterEach, describe, expect, it } from "vitest";
import { createKernel } from "../kernel.js";
import type { Kernel } from "../kernel.js";
const __filename = fileURLToPath(import.meta.url);
const __dir = dirname(__filename);
const MOCK_WORKER = join(__dir, "fixtures", "mock-worker.mjs");
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
return {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
...overrides,
};
}
/** Poll until predicate returns true, with a timeout. */
async function pollUntil(
predicate: () => boolean,
timeoutMs: number,
intervalMs = 50,
): Promise<void> {
return new Promise((resolve, reject) => {
const timer = setTimeout(
() => reject(new Error(`pollUntil timed out after ${timeoutMs}ms`)),
timeoutMs,
);
const check = setInterval(() => {
if (predicate()) {
clearTimeout(timer);
clearInterval(check);
resolve();
}
}, intervalMs);
});
}
describe("kernel integration — real child processes", () => {
let kernel: Kernel | null = null;
afterEach(async () => {
if (kernel !== null) {
await kernel.stop();
kernel = null;
}
});
it("returns correct groups and senseCount", () => {
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"disk-io": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
},
});
kernel = createKernel(config, "/tmp/nerve-integration-test", {
workerScript: MOCK_WORKER,
});
expect(kernel.groups.size).toBe(2);
expect(kernel.groups.has("system")).toBe(true);
expect(kernel.groups.has("network")).toBe(true);
expect(kernel.senseCount).toBe(3);
});
it("workers start and respond to compute messages with signals", async () => {
const config = makeConfig();
kernel = createKernel(config, "/tmp/nerve-integration-test", {
workerScript: MOCK_WORKER,
});
// Wait for all workers to be ready (event-based, not fixed delay)
await kernel.ready;
// Subscribe to the bus before triggering compute
const received: Signal[] = [];
const unsub = kernel.bus.subscribe((signal) => {
received.push(signal);
});
// Trigger a compute for "cpu-usage" through the kernel's triggerCompute
kernel.triggerCompute("cpu-usage");
// Poll until a signal arrives on the bus (event-driven, no fixed delay)
await pollUntil(() => received.length > 0, 3000);
expect(received).toHaveLength(1);
expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
unsub();
}, 10_000);
it("graceful shutdown: stop() resolves after all workers exit", async () => {
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
},
});
kernel = createKernel(config, "/tmp/nerve-integration-test", {
workerScript: MOCK_WORKER,
});
// Wait for all workers to be ready (event-based)
await kernel.ready;
const stopPromise = kernel.stop();
kernel = null;
// stop() should resolve within 5s (our shutdown timeout)
await expect(stopPromise).resolves.toBeUndefined();
}, 10_000);
it("compute round-trip: worker receives compute and sends signal back through bus", async () => {
const config = makeConfig();
kernel = createKernel(config, "/tmp/nerve-integration-test", {
workerScript: MOCK_WORKER,
});
// Wait for all workers to be ready (event-based, not fixed delay)
await kernel.ready;
const received: Signal[] = [];
const unsub = kernel.bus.subscribe((signal) => {
received.push(signal);
});
// Trigger compute via the kernel — the kernel sends IPC to the worker,
// the mock worker responds with a signal message, and the kernel routes it to the bus.
kernel.triggerCompute("cpu-usage");
// Poll for the signal on the bus (no fixed delay)
await pollUntil(() => received.length > 0, 3000);
expect(received).toHaveLength(1);
expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
unsub();
}, 10_000);
it("crash recovery: kernel respawns worker after unexpected exit and new worker is functional", async () => {
const config = makeConfig();
kernel = createKernel(config, "/tmp/nerve-integration-test", {
workerScript: MOCK_WORKER,
});
// Wait for initial worker to be ready (event-based)
await kernel.ready;
const originalPid = kernel.getWorkerPid("system");
expect(originalPid).not.toBeNull();
expect(originalPid).toBeGreaterThan(0);
// Kill the kernel's own worker to simulate a crash (SIGKILL, code != 0)
process.kill(originalPid as number, "SIGKILL");
// Poll until the kernel respawns and registers a new worker with a different PID
// (respawn delay is 1s, then fork(), then workers.set())
await pollUntil(() => {
const pid = kernel?.getWorkerPid("system");
return pid !== null && pid !== originalPid;
}, 5000);
const newPid = kernel.getWorkerPid("system");
expect(newPid).not.toBeNull();
expect(newPid).not.toBe(originalPid);
// Wait a bit for the new worker to send its "ready" message and be fully up.
// Poll until the new worker responds to a compute message on the bus.
const postRespawnSignals: Signal[] = [];
const unsub = kernel.bus.subscribe((signal) => {
postRespawnSignals.push(signal);
});
// Trigger compute through the kernel to the new worker
kernel.triggerCompute("cpu-usage");
// Poll for the signal — verifies the new worker is fully functional
await pollUntil(() => postRespawnSignals.length > 0, 5000);
expect(postRespawnSignals).toHaveLength(1);
expect(postRespawnSignals[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
unsub();
// Kernel should still stop gracefully after respawn
await kernel.stop();
kernel = null;
}, 15_000);
});
@@ -0,0 +1,200 @@
import { EventEmitter } from "node:events";
import type { NerveConfig } from "@uncaged/nerve-core";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
// ---------------------------------------------------------------------------
// Mock child_process.fork before importing kernel
// ---------------------------------------------------------------------------
const mockChildren: MockChild[] = [];
type MockChild = EventEmitter & {
send: ReturnType<typeof vi.fn>;
kill: ReturnType<typeof vi.fn>;
pid: number;
};
function makeMockChild(pid = 1): MockChild {
const child = new EventEmitter() as MockChild;
child.send = vi.fn((msg: unknown) => {
// Auto-exit on shutdown so stop() resolves
if (
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "shutdown"
) {
setImmediate(() => child.emit("exit", 0, null));
}
});
child.kill = vi.fn((_signal?: string) => {
child.emit("exit", null, _signal ?? "SIGKILL");
});
child.pid = pid;
return child;
}
vi.mock("node:child_process", () => ({
fork: vi.fn((_script: string, _args: string[], _opts: unknown) => {
const child = makeMockChild(mockChildren.length + 1);
mockChildren.push(child);
return child;
}),
}));
// Import after mock is set up
const { createKernel } = await import("../kernel.js");
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
return {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
...overrides,
};
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describe("kernel — message routing", () => {
beforeEach(() => {
mockChildren.length = 0;
vi.useFakeTimers({ shouldAdvanceTime: true });
});
afterEach(() => {
vi.useRealTimers();
});
it("routes signal message to bus without throwing", async () => {
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
});
const kernel = createKernel(config, "/tmp/nerve-test");
expect(mockChildren.length).toBe(1);
const child = mockChildren[0];
expect(() => {
child.emit("message", { type: "signal", sense: "cpu-usage", payload: 42 });
}).not.toThrow();
await kernel.stop();
});
it("routes error message to stderr", async () => {
const stderrSpy = vi.spyOn(process.stderr, "write").mockImplementation(() => true);
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
});
const kernel = createKernel(config, "/tmp/nerve-test");
const child = mockChildren[0];
child.emit("message", { type: "error", sense: "cpu-usage", error: "compute failed" });
expect(stderrSpy).toHaveBeenCalledWith(
expect.stringContaining('sense "cpu-usage" error: compute failed'),
);
stderrSpy.mockRestore();
await kernel.stop();
});
it("ignores ready messages without logging", async () => {
const stderrSpy = vi.spyOn(process.stderr, "write").mockImplementation(() => true);
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
});
const kernel = createKernel(config, "/tmp/nerve-test");
const child = mockChildren[0];
const callsBefore = stderrSpy.mock.calls.length;
child.emit("message", { type: "ready" });
expect(stderrSpy.mock.calls.length).toBe(callsBefore);
stderrSpy.mockRestore();
await kernel.stop();
});
it("logs invalid worker messages without throwing", async () => {
const stderrSpy = vi.spyOn(process.stderr, "write").mockImplementation(() => true);
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
});
const kernel = createKernel(config, "/tmp/nerve-test");
const child = mockChildren[0];
expect(() => child.emit("message", { type: "unknown-type" })).not.toThrow();
expect(stderrSpy).toHaveBeenCalledWith(expect.stringContaining("invalid worker message"));
stderrSpy.mockRestore();
await kernel.stop();
});
});
describe("kernel — groupForSense mapping", () => {
beforeEach(() => {
mockChildren.length = 0;
vi.useFakeTimers({ shouldAdvanceTime: true });
});
afterEach(() => {
vi.useRealTimers();
});
it("spawns one worker per unique group", async () => {
const config: NerveConfig = {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"disk-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"net-usage": { group: "network", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
};
const kernel = createKernel(config, "/tmp/nerve-test");
// system and network = 2 unique groups
expect(mockChildren.length).toBe(2);
await kernel.stop();
});
it("sends compute to the correct worker on interval trigger", async () => {
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 500, on: null }],
});
createKernel(config, "/tmp/nerve-test");
const child = mockChildren[0];
vi.advanceTimersByTime(500);
expect(child.send).toHaveBeenCalledWith(
expect.objectContaining({ type: "compute", sense: "cpu-usage" }),
);
});
});
@@ -0,0 +1,120 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { NerveConfig, Signal } from "@uncaged/nerve-core";
import { createReflexScheduler } from "../reflex-scheduler.js";
import { createSignalBus } from "../signal-bus.js";
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
return {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
...overrides,
};
}
function makeSignal(senseId: string, payload: unknown = 1): Signal {
return { id: 1, senseId, payload, ts: Date.now() };
}
describe("ReflexScheduler — throttle + pending deferred trigger", () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});
it("trigger during throttle window fires deferred trigger after window ends", () => {
const triggered: string[] = [];
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: 2000, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
// First trigger fires immediately (outside throttle window)
bus.emit(makeSignal("cpu-usage"));
scheduler.onComputeComplete("cpu-usage");
expect(triggered.length).toBe(1);
// Second trigger arrives 500ms later — still within throttle window (2000ms)
vi.advanceTimersByTime(500);
bus.emit(makeSignal("cpu-usage"));
// Should not fire yet (throttled)
expect(triggered.length).toBe(1);
// Advance past the throttle window end; deferred trigger should now fire
vi.advanceTimersByTime(1600);
expect(triggered.length).toBe(2);
scheduler.stop();
});
it("multiple triggers during throttle window only produce one deferred trigger", () => {
const triggered: string[] = [];
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: 2000, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
// First trigger fires
bus.emit(makeSignal("cpu-usage"));
scheduler.onComputeComplete("cpu-usage");
expect(triggered.length).toBe(1);
// Multiple triggers within throttle window — should not stack
vi.advanceTimersByTime(300);
bus.emit(makeSignal("cpu-usage"));
vi.advanceTimersByTime(300);
bus.emit(makeSignal("cpu-usage"));
vi.advanceTimersByTime(300);
bus.emit(makeSignal("cpu-usage"));
expect(triggered.length).toBe(1);
// Advance past window — exactly one deferred trigger fires
vi.advanceTimersByTime(1200);
scheduler.onComputeComplete("cpu-usage");
expect(triggered.length).toBe(2);
scheduler.stop();
});
it("deferred trigger does not fire after stop()", () => {
const triggered: string[] = [];
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: 2000, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
bus.emit(makeSignal("cpu-usage"));
scheduler.onComputeComplete("cpu-usage");
expect(triggered.length).toBe(1);
// Trigger during throttle window — schedules deferred
vi.advanceTimersByTime(500);
bus.emit(makeSignal("cpu-usage"));
expect(triggered.length).toBe(1);
// Stop before window ends
scheduler.stop();
// Advance past window — deferred timer should have been cleared
vi.advanceTimersByTime(2000);
expect(triggered.length).toBe(1);
});
});
@@ -0,0 +1,310 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { NerveConfig, Signal } from "@uncaged/nerve-core";
import { createReflexScheduler } from "../reflex-scheduler.js";
import { createSignalBus } from "../signal-bus.js";
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
return {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"disk-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"system-health": { group: "derived", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
...overrides,
};
}
function makeSignal(senseId: string, payload: unknown = 1): Signal {
return { id: 1, senseId, payload, ts: Date.now() };
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describe("ReflexScheduler — interval reflex", () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});
it("fires triggerFn on schedule", () => {
const triggered: string[] = [];
const config = makeConfig({
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 1000, on: null }],
});
const bus = createSignalBus();
// Use a ref so the triggerFn can call back into the scheduler
const ref: { scheduler: ReturnType<typeof createReflexScheduler> | null } = {
scheduler: null,
};
const scheduler = createReflexScheduler(config, bus, (name) => {
triggered.push(name);
// Immediately complete compute so the scheduler is not blocked by in-flight state
ref.scheduler?.onComputeComplete(name);
});
ref.scheduler = scheduler;
vi.advanceTimersByTime(3000);
expect(triggered.length).toBeGreaterThanOrEqual(3);
expect(triggered.every((n) => n === "cpu-usage")).toBe(true);
scheduler.stop();
});
it("stops firing after stop() is called", () => {
const triggered: string[] = [];
const config = makeConfig({
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 500, on: null }],
});
const bus = createSignalBus();
const ref: { scheduler: ReturnType<typeof createReflexScheduler> | null } = {
scheduler: null,
};
const scheduler = createReflexScheduler(config, bus, (name) => {
triggered.push(name);
ref.scheduler?.onComputeComplete(name);
});
ref.scheduler = scheduler;
vi.advanceTimersByTime(1000);
const countBefore = triggered.length;
scheduler.stop();
vi.advanceTimersByTime(2000);
expect(triggered.length).toBe(countBefore);
});
it("starts from current time — does not compensate for past intervals", () => {
const triggered: string[] = [];
const config = makeConfig({
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 1000, on: null }],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
// Only advance 500ms — should be 0 triggers (not catching up)
vi.advanceTimersByTime(500);
expect(triggered.length).toBe(0);
scheduler.stop();
});
});
describe("ReflexScheduler — event (on) reflex", () => {
it("triggers target sense when watched sense emits a signal", () => {
const triggered: string[] = [];
const config = makeConfig({
reflexes: [
{ kind: "sense", sense: "system-health", interval: null, on: ["cpu-usage", "disk-usage"] },
],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
bus.emit(makeSignal("cpu-usage"));
scheduler.onComputeComplete("system-health");
bus.emit(makeSignal("disk-usage"));
scheduler.onComputeComplete("system-health");
expect(triggered.length).toBe(2);
expect(triggered.every((n) => n === "system-health")).toBe(true);
scheduler.stop();
});
it("does not trigger for signals from non-watched senses", () => {
const triggered: string[] = [];
const config = makeConfig({
reflexes: [{ kind: "sense", sense: "system-health", interval: null, on: ["cpu-usage"] }],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
bus.emit(makeSignal("disk-usage"));
expect(triggered.length).toBe(0);
scheduler.stop();
});
it("stops responding after stop()", () => {
const triggered: string[] = [];
const config = makeConfig({
reflexes: [{ kind: "sense", sense: "system-health", interval: null, on: ["cpu-usage"] }],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
scheduler.stop();
bus.emit(makeSignal("cpu-usage"));
expect(triggered.length).toBe(0);
});
});
describe("ReflexScheduler — throttle", () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});
it("skips rapid triggers within throttle window", () => {
const triggered: string[] = [];
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: 2000, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
bus.emit(makeSignal("cpu-usage"));
scheduler.onComputeComplete("cpu-usage");
// Immediately trigger again — within throttle window
bus.emit(makeSignal("cpu-usage"));
expect(triggered.length).toBe(1);
scheduler.stop();
});
it("allows trigger after throttle window has passed", () => {
const triggered: string[] = [];
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: 1000, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
bus.emit(makeSignal("cpu-usage"));
scheduler.onComputeComplete("cpu-usage");
vi.advanceTimersByTime(1500);
bus.emit(makeSignal("cpu-usage"));
scheduler.onComputeComplete("cpu-usage");
expect(triggered.length).toBe(2);
scheduler.stop();
});
});
describe("ReflexScheduler — merge/coalesce", () => {
it("concurrent triggers collapse to at most one pending run", () => {
const triggered: string[] = [];
const config = makeConfig({
reflexes: [{ kind: "sense", sense: "system-health", interval: null, on: ["cpu-usage"] }],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
// First trigger starts compute
bus.emit(makeSignal("cpu-usage"));
expect(triggered.length).toBe(1);
// Three more arrive while first is in-flight — all should coalesce to one pending
bus.emit(makeSignal("cpu-usage"));
bus.emit(makeSignal("cpu-usage"));
bus.emit(makeSignal("cpu-usage"));
expect(triggered.length).toBe(1);
// Complete first compute → pending drains as exactly one more run
scheduler.onComputeComplete("system-health");
expect(triggered.length).toBe(2);
// Complete second compute → no more pending
scheduler.onComputeComplete("system-health");
expect(triggered.length).toBe(2);
scheduler.stop();
});
it("no new trigger while in-flight without pending → no extra run after complete", () => {
const triggered: string[] = [];
const config = makeConfig({
reflexes: [{ kind: "sense", sense: "system-health", interval: null, on: ["cpu-usage"] }],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
bus.emit(makeSignal("cpu-usage"));
expect(triggered.length).toBe(1);
// Complete with no pending
scheduler.onComputeComplete("system-health");
expect(triggered.length).toBe(1);
scheduler.stop();
});
});
describe("ReflexScheduler — interval + on combined", () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});
it("fires both via interval and event", () => {
const triggered: string[] = [];
const config = makeConfig({
reflexes: [{ kind: "sense", sense: "system-health", interval: 1000, on: ["cpu-usage"] }],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
// Event trigger
bus.emit(makeSignal("cpu-usage"));
scheduler.onComputeComplete("system-health");
// Interval trigger
vi.advanceTimersByTime(1000);
scheduler.onComputeComplete("system-health");
expect(triggered.length).toBeGreaterThanOrEqual(2);
scheduler.stop();
});
});
describe("ReflexScheduler — workflow reflexes ignored", () => {
it("does not set up any scheduling for workflow kind reflexes", () => {
const triggered: string[] = [];
const config: NerveConfig = {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "workflow", workflow: "my-workflow", on: ["cpu-usage"] }],
workflows: {
"my-workflow": { concurrency: 1, overflow: "drop" },
},
};
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
bus.emit(makeSignal("cpu-usage"));
expect(triggered.length).toBe(0);
scheduler.stop();
});
});
@@ -0,0 +1,99 @@
import { describe, expect, it, vi } from "vitest";
import type { Signal } from "@uncaged/nerve-core";
import { createSignalBus } from "../signal-bus.js";
function makeSignal(senseId: string, payload: unknown = 1): Signal {
return { id: 1, senseId, payload, ts: Date.now() };
}
describe("createSignalBus", () => {
it("delivers emitted signal to a subscriber", () => {
const bus = createSignalBus();
const received: Signal[] = [];
bus.subscribe((s) => received.push(s));
const sig = makeSignal("cpu-usage", 42);
bus.emit(sig);
expect(received).toHaveLength(1);
expect(received[0]).toBe(sig);
});
it("delivers to multiple subscribers", () => {
const bus = createSignalBus();
const a: Signal[] = [];
const b: Signal[] = [];
bus.subscribe((s) => a.push(s));
bus.subscribe((s) => b.push(s));
bus.emit(makeSignal("cpu-usage"));
expect(a).toHaveLength(1);
expect(b).toHaveLength(1);
});
it("unsubscribe stops delivery", () => {
const bus = createSignalBus();
const received: Signal[] = [];
const unsub = bus.subscribe((s) => received.push(s));
bus.emit(makeSignal("cpu-usage"));
unsub();
bus.emit(makeSignal("cpu-usage"));
expect(received).toHaveLength(1);
});
it("remaining subscribers still receive after one unsubscribes", () => {
const bus = createSignalBus();
const a: Signal[] = [];
const b: Signal[] = [];
const unsubA = bus.subscribe((s) => a.push(s));
bus.subscribe((s) => b.push(s));
unsubA();
bus.emit(makeSignal("cpu-usage"));
expect(a).toHaveLength(0);
expect(b).toHaveLength(1);
});
it("emit with no subscribers does nothing", () => {
const bus = createSignalBus();
expect(() => bus.emit(makeSignal("cpu-usage"))).not.toThrow();
});
it("dispatch is synchronous", () => {
const bus = createSignalBus();
const order: string[] = [];
bus.subscribe(() => order.push("handler"));
order.push("before");
bus.emit(makeSignal("cpu-usage"));
order.push("after");
expect(order).toEqual(["before", "handler", "after"]);
});
it("handler exceptions don't prevent other handlers from running", () => {
const bus = createSignalBus();
const received: Signal[] = [];
bus.subscribe(() => {
throw new Error("boom");
});
bus.subscribe((s) => received.push(s));
expect(() => bus.emit(makeSignal("cpu-usage"))).toThrow("boom");
expect(received).toHaveLength(1);
});
it("same handler can be subscribed once and fires once per emit", () => {
const bus = createSignalBus();
const handler = vi.fn();
bus.subscribe(handler);
bus.emit(makeSignal("cpu-usage"));
bus.emit(makeSignal("cpu-usage"));
expect(handler).toHaveBeenCalledTimes(2);
});
});
+5
View File
@@ -8,6 +8,8 @@ export type {
WorkerToParentMessage,
} from "./ipc.js";
export type { SignalBus, SignalHandler, Unsubscribe } from "./signal-bus.js";
export type { ComputeFn, DrizzleDB, PeerMap, SenseRuntime } from "./sense-runtime.js";
export {
@@ -17,3 +19,6 @@ export {
loadComputeFn,
executeCompute,
} from "./sense-runtime.js";
export { createKernel } from "./kernel.js";
export type { Kernel, KernelOptions } from "./kernel.js";
+34
View File
@@ -57,3 +57,37 @@ export function parseParentMessage(raw: unknown): Result<ParentToWorkerMessage>
}
return ok(raw as ParentToWorkerMessage);
}
/** Validate and parse an unknown IPC message received from a worker process. */
export function parseWorkerMessage(raw: unknown): Result<WorkerToParentMessage> {
if (raw === null || typeof raw !== "object") {
return err(new Error("Worker IPC message is not an object"));
}
const obj = raw as Record<string, unknown>;
if (typeof obj.type !== "string") {
return err(new Error("Worker IPC message missing string 'type' field"));
}
const type = obj.type;
if (type !== "signal" && type !== "error" && type !== "ready") {
return err(new Error(`Unknown worker IPC message type: "${type}"`));
}
if (type === "signal") {
if (typeof obj.sense !== "string") {
return err(new Error("Worker 'signal' message missing string 'sense' field"));
}
if (!("payload" in obj)) {
return err(new Error("Worker 'signal' message missing 'payload' field"));
}
return ok(raw as SignalMessage);
}
if (type === "error") {
if (typeof obj.sense !== "string") {
return err(new Error("Worker 'error' message missing string 'sense' field"));
}
if (typeof obj.error !== "string") {
return err(new Error("Worker 'error' message missing string 'error' field"));
}
return ok(raw as ErrorMessage);
}
return ok({ type: "ready" });
}
+247
View File
@@ -0,0 +1,247 @@
/**
* Kernel — the main orchestrator that ties sense workers, signal bus, and
* reflex scheduler together.
*
* Responsibilities:
* - Spawn one child process per sense group (via fork)
* - Route SignalMessage from workers → SignalBus
* - Route ErrorMessage from workers → stderr log
* - Drive compute triggers via ReflexScheduler
* - Graceful shutdown: stop scheduler, send shutdown to all workers
*/
import { fork } from "node:child_process";
import type { ChildProcess } from "node:child_process";
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import type { NerveConfig, Signal } from "@uncaged/nerve-core";
import type { ComputeMessage, ShutdownMessage } from "./ipc.js";
import { parseWorkerMessage } from "./ipc.js";
import { createReflexScheduler } from "./reflex-scheduler.js";
import type { ReflexScheduler } from "./reflex-scheduler.js";
import { createSignalBus } from "./signal-bus.js";
import type { SignalBus } from "./signal-bus.js";
export type Kernel = {
stop: () => Promise<void>;
groups: Set<string>;
senseCount: number;
bus: SignalBus;
/** Resolves when all workers have sent their initial "ready" message. */
ready: Promise<void>;
/** Returns the PID of the worker process for a given group, or null if not found. */
getWorkerPid: (group: string) => number | null;
/** Sends a compute message to the worker responsible for the given sense. */
triggerCompute: (senseName: string) => void;
};
type WorkerEntry = {
group: string;
process: ChildProcess;
};
function resolveWorkerScript(): string {
const __filename = fileURLToPath(import.meta.url);
const __dir = dirname(__filename);
return join(__dir, "sense-worker.js");
}
function spawnWorker(nerveRoot: string, group: string, workerScript: string): ChildProcess {
return fork(workerScript, ["--group", group, "--root", nerveRoot], {
stdio: ["ignore", "inherit", "inherit", "ipc"],
});
}
function sendCompute(worker: ChildProcess, senseName: string): void {
// worker.connected is false when the IPC channel has been closed (e.g. worker crashed)
if (worker.connected === false) return;
const msg: ComputeMessage = { type: "compute", sense: senseName };
try {
worker.send(msg);
} catch {
// IPC channel closed between connected check and send
}
}
function sendShutdown(worker: ChildProcess): void {
// worker.connected is false when the IPC channel has been closed (e.g. worker crashed)
if (worker.connected === false) return;
const msg: ShutdownMessage = { type: "shutdown" };
try {
worker.send(msg);
} catch {
// IPC channel closed between connected check and send
}
}
function groupForSense(config: NerveConfig, senseName: string): string | null {
const senseConfig = config.senses[senseName];
if (senseConfig === undefined) return null;
return senseConfig.group;
}
export type KernelOptions = {
workerScript: string;
};
function defaultKernelOptions(): KernelOptions {
return { workerScript: resolveWorkerScript() };
}
export function createKernel(
config: NerveConfig,
nerveRoot: string,
options: KernelOptions = defaultKernelOptions(),
): Kernel {
const bus: SignalBus = createSignalBus();
const workerScript = options.workerScript;
// Signal ID counter is instance-scoped (fix #2)
let _signalIdCounter = 0;
function nextSignalId(): number {
_signalIdCounter += 1;
return _signalIdCounter;
}
const groups = new Set<string>();
for (const senseConfig of Object.values(config.senses)) {
groups.add(senseConfig.group);
}
const workers = new Map<string, WorkerEntry>();
let stopped = false;
// eslint-disable-next-line prefer-const
let scheduler: ReflexScheduler = null as unknown as ReflexScheduler;
let readyResolve: () => void;
const ready = new Promise<void>((resolve) => {
readyResolve = resolve;
});
let pendingReadyCount = groups.size > 0 ? groups.size : 0;
function sensesForGroup(group: string): string[] {
return Object.entries(config.senses)
.filter(([, sc]) => sc.group === group)
.map(([name]) => name);
}
function handleWorkerMessage(raw: unknown): void {
const result = parseWorkerMessage(raw);
if (!result.ok) {
process.stderr.write(`[kernel] invalid worker message: ${result.error.message}\n`);
return;
}
const msg = result.value;
if (msg.type === "ready") {
pendingReadyCount -= 1;
if (pendingReadyCount <= 0) {
readyResolve();
}
return;
}
if (msg.type === "error") {
process.stderr.write(`[kernel] sense "${msg.sense}" error: ${msg.error}\n`);
scheduler.onComputeComplete(msg.sense);
return;
}
if (msg.type === "signal") {
const signal: Signal = {
id: nextSignalId(),
senseId: msg.sense,
payload: msg.payload,
ts: Date.now(),
};
bus.emit(signal);
scheduler.onComputeComplete(msg.sense);
}
}
function startWorker(group: string): void {
const child = spawnWorker(nerveRoot, group, workerScript);
child.on("message", handleWorkerMessage);
child.on("exit", (code) => {
process.stderr.write(
`[kernel] worker for group "${group}" exited with code ${code ?? "null"}\n`,
);
// Respawn on unexpected exit (fix #4)
if (!stopped && code !== 0) {
process.stderr.write(`[kernel] respawning worker for group "${group}" in 1s\n`);
// Clear any stuck in-flight state for all senses in this group
for (const senseName of sensesForGroup(group)) {
scheduler.onComputeComplete(senseName);
}
setTimeout(() => {
if (!stopped) {
startWorker(group);
}
}, 1000);
}
});
workers.set(group, { group, process: child });
}
function triggerFn(senseName: string): void {
const group = groupForSense(config, senseName);
if (group === null) {
process.stderr.write(`[kernel] triggerFn: unknown sense "${senseName}"\n`);
return;
}
const entry = workers.get(group);
if (entry === undefined) {
process.stderr.write(`[kernel] triggerFn: no worker for group "${group}"\n`);
return;
}
sendCompute(entry.process, senseName);
}
scheduler = createReflexScheduler(config, bus, triggerFn);
if (groups.size === 0) {
readyResolve();
}
for (const group of groups) {
startWorker(group);
}
// Wait for a worker process to exit, with a timeout + SIGKILL fallback (fix #5)
function waitForExit(child: ChildProcess, timeoutMs: number): Promise<void> {
return new Promise((resolve) => {
const timer = setTimeout(() => {
child.kill("SIGKILL");
resolve();
}, timeoutMs);
child.once("exit", () => {
clearTimeout(timer);
resolve();
});
});
}
async function stop(): Promise<void> {
stopped = true;
scheduler.stop();
const exitPromises: Promise<void>[] = [];
for (const entry of workers.values()) {
sendShutdown(entry.process);
exitPromises.push(waitForExit(entry.process, 5000));
}
await Promise.all(exitPromises);
}
function getWorkerPid(group: string): number | null {
return workers.get(group)?.process.pid ?? null;
}
const senseCount = Object.keys(config.senses).length;
return { stop, groups, senseCount, bus, ready, getWorkerPid, triggerCompute: triggerFn };
}
+181
View File
@@ -0,0 +1,181 @@
/**
* Reflex Scheduler — drives sense compute cycles based on ReflexConfig.
*
* Supports:
* - interval: periodic setInterval-based triggering
* - on[]: event-driven triggering via the signal bus
* - throttle: skip triggers that arrive too soon after the last compute
* - merge/coalesce: if compute is in-flight, record one pending trigger;
* run it once after the current compute completes (no unbounded queue)
*/
import type { NerveConfig } from "@uncaged/nerve-core";
import type { SignalBus, Unsubscribe } from "./signal-bus.js";
/** Sends a compute message to the worker responsible for the given sense. */
export type TriggerFn = (senseName: string) => void;
/** Per-sense mutable state tracked by the scheduler. */
type SenseState = {
lastComputeAt: number;
inFlight: boolean;
pending: boolean;
deferredTimer: ReturnType<typeof setTimeout> | null;
};
/** Handle returned by createReflexScheduler — call stop() for graceful shutdown. */
export type ReflexScheduler = {
/** Notify scheduler that a compute cycle finished. Drains the pending flag. */
onComputeComplete: (senseName: string) => void;
stop: () => void;
};
function makeSenseState(): SenseState {
return { lastComputeAt: 0, inFlight: false, pending: false, deferredTimer: null };
}
/**
* Create and start a reflex scheduler.
*
* @param config Full NerveConfig (reads senses for throttle/timeout, reflexes for schedule).
* @param bus SignalBus to subscribe for event-driven reflexes.
* @param triggerFn Called with the sense name when a compute should be dispatched.
* @returns ReflexScheduler with stop() and onComputeComplete() methods.
*/
export function createReflexScheduler(
config: NerveConfig,
bus: SignalBus,
triggerFn: TriggerFn,
): ReflexScheduler {
const intervals: ReturnType<typeof setInterval>[] = [];
const unsubscribers: Unsubscribe[] = [];
const states = new Map<string, SenseState>();
function getState(senseName: string): SenseState {
let state = states.get(senseName);
if (state === undefined) {
state = makeSenseState();
states.set(senseName, state);
}
return state;
}
function dispatchCompute(senseName: string): void {
const state = getState(senseName);
state.inFlight = true;
state.pending = false;
state.lastComputeAt = Date.now();
triggerFn(senseName);
}
/**
* Attempt to trigger compute for a sense.
* Respects throttle window and merge/coalesce semantics.
* If within throttle window, schedules a single deferred trigger at window end (fix #3).
*/
function maybeTrigger(senseName: string): void {
const senseConfig = config.senses[senseName];
const throttleMs = senseConfig?.throttle ?? null;
const state = getState(senseName);
const now = Date.now();
if (throttleMs !== null) {
const elapsed = now - state.lastComputeAt;
if (elapsed < throttleMs) {
// Schedule one deferred trigger at the end of the throttle window (no stacking)
if (state.deferredTimer === null) {
const remaining = throttleMs - elapsed;
state.deferredTimer = setTimeout(() => {
state.deferredTimer = null;
maybeTrigger(senseName);
}, remaining);
}
return;
}
}
if (state.inFlight) {
state.pending = true;
return;
}
dispatchCompute(senseName);
}
/**
* Called by the kernel when a compute cycle completes (signal or error received).
* Drains the pending flag if set.
*/
function onComputeComplete(senseName: string): void {
const state = states.get(senseName);
if (state === undefined) return;
state.inFlight = false;
if (!state.pending) return;
const senseConfig = config.senses[senseName];
const throttleMs = senseConfig?.throttle ?? null;
const now = Date.now();
if (throttleMs !== null && now - state.lastComputeAt < throttleMs) {
// Schedule deferred trigger if not already pending (fix #3)
if (state.deferredTimer === null) {
const remaining = throttleMs - (now - state.lastComputeAt);
state.deferredTimer = setTimeout(() => {
state.deferredTimer = null;
const s = states.get(senseName);
if (s !== undefined && !s.inFlight) {
dispatchCompute(senseName);
}
}, remaining);
}
state.pending = false;
return;
}
dispatchCompute(senseName);
}
for (const reflex of config.reflexes) {
if (reflex.kind !== "sense") continue;
const senseReflex = reflex;
const senseName = senseReflex.sense;
if (senseReflex.interval !== null) {
const id = setInterval(() => {
maybeTrigger(senseName);
}, senseReflex.interval);
intervals.push(id);
}
if (senseReflex.on !== null && senseReflex.on.length > 0) {
const watchedSenses = new Set(senseReflex.on);
const unsub = bus.subscribe((signal) => {
if (watchedSenses.has(signal.senseId)) {
maybeTrigger(senseName);
}
});
unsubscribers.push(unsub);
}
}
function stop(): void {
for (const id of intervals) {
clearInterval(id);
}
for (const unsub of unsubscribers) {
unsub();
}
for (const state of states.values()) {
if (state.deferredTimer !== null) {
clearTimeout(state.deferredTimer);
state.deferredTimer = null;
}
}
intervals.length = 0;
unsubscribers.length = 0;
}
return { onComputeComplete, stop };
}
+51
View File
@@ -0,0 +1,51 @@
/**
* In-memory signal bus for routing signals between sense workers and reflex subscribers.
* Synchronous dispatch — no persistence, no async queuing.
*
* If a handler throws, the error is logged and remaining handlers still run.
* The first error is re-thrown after all handlers complete so callers can observe it.
*/
import type { Signal } from "@uncaged/nerve-core";
export type SignalHandler = (signal: Signal) => void;
export type Unsubscribe = () => void;
export type SignalBus = {
emit: (signal: Signal) => void;
subscribe: (handler: SignalHandler) => Unsubscribe;
};
export function createSignalBus(): SignalBus {
const handlers = new Set<SignalHandler>();
function emit(signal: Signal): void {
let firstError: unknown = null;
let hasError = false;
for (const handler of handlers) {
try {
handler(signal);
} catch (e) {
console.error("[signal-bus] handler error:", e);
if (!hasError) {
firstError = e;
hasError = true;
}
}
}
if (hasError) {
throw firstError;
}
}
function subscribe(handler: SignalHandler): Unsubscribe {
handlers.add(handler);
return () => {
handlers.delete(handler);
};
}
return { emit, subscribe };
}
+1 -1
View File
@@ -1,7 +1,7 @@
import { defineConfig } from "tsup";
export default defineConfig({
entry: ["src/index.ts"],
entry: ["src/index.ts", "src/sense-worker.ts"],
format: ["esm"],
dts: true,
clean: true,
+8 -1
View File
@@ -18,7 +18,14 @@ importers:
specifier: ^5.5.0
version: 5.9.3
packages/cli: {}
packages/cli:
dependencies:
'@uncaged/nerve-core':
specifier: workspace:*
version: link:../core
'@uncaged/nerve-daemon':
specifier: workspace:*
version: link:../daemon
packages/core:
dependencies: