Compare commits

...

14 Commits

Author SHA1 Message Date
xiaoju 1b995379f9 feat: Phase 7 — structured logging system (RFC §2.4/§5.4)
- log-store.ts: SQLite append-only log store with query API and meta table
- kernel: system start/stop logs, signal/error logging, file watcher events
- reflex-scheduler: run_start/run_complete logging per compute
- Exports: createLogStore, LogStore, LogEntry, LogQuery
- Tests: log-store CRUD, query filters, meta, integration with reflex

Closes #14
小橘 🍊(NEKO Team)
2026-04-22 11:20:29 +00:00
xiaomo ea6bc5610b Merge pull request 'Phase 6: Hot Reload & Error Handling' (#13) from feat/phase-6-hot-reload into main 2026-04-22 11:12:38 +00:00
xiaoju 49d078b144 fix: address PR #13 review — per-sense timeout, reload restart, await ready
- reloadConfig: restart existing groups when new senses added
- sense-worker: per-sense timeout/gracePeriod lookup at compute time (RFC §5.3)
- restartGroup: await worker ready promise before returning
- Comments: scheduler in-flight state loss, fs.watch Linux caveats, grace period blast radius

小橘 🍊(NEKO Team)
2026-04-22 11:07:33 +00:00
xiaoju 9ca8c8ecb8 feat: Phase 6 — hot reload, error isolation, grace period, nerve-health
- file-watcher.ts: watch nerveRoot for .ts and nerve.yaml changes
- kernel.ts: restartGroup(), reloadConfig(), getHealth(), auto-respawn on crash
- sense-worker.ts: compute try/catch error isolation, grace_period hard kill
- ipc.ts: new message types for health, restart, reload
- examples/senses/nerve-health.ts: built-in daemon health sense
- Integration tests for hot reload, error isolation, grace period

小橘 🍊(NEKO Team)
2026-04-22 10:57:00 +00:00
xiaomo 00c1932144 Merge pull request 'feat: Phase 5 — CLI & User Workspace' (#12) from feat/phase-5-cli-workspace into main 2026-04-22 10:33:53 +00:00
xiaoju 097a4790be fix: address PR #12 review — cross-platform, pkg manager detection, exports
- status.ts: wrap /proc in try/catch, fallback to PID file mtime (macOS safe)
- init.ts: detect pnpm > yarn > npm instead of hardcoding pnpm
- init.ts: use dirname() instead of join(.., '..')
- index.ts: restore createKernel/Kernel re-exports (non-breaking)
- start.ts: use fileURLToPath(import.meta.url) for daemon spawn
- start.ts: use logStream.fd instead of double type cast
- validate.ts: remove misleading error numbering

小橘 🍊(NEKO Team)
2026-04-22 10:32:06 +00:00
xiaoju ad2b40dd4f feat: implement Phase 5 CLI & User Workspace
- Restructure CLI with citty multi-command framework
- nerve init: create workspace skeleton at ~/.uncaged-nerve/
- nerve start: foreground + daemon (-d) modes with graceful shutdown
- nerve stop: SIGTERM → 10s wait → SIGKILL, PID file cleanup
- nerve status: show pid, uptime, senses, workers
- nerve validate: parse nerve.yaml with error reporting
- workspace.ts: shared utilities (PID file, paths, isRunning)
- Example cpu-usage sense with realistic os.cpus() compute

小橘 🍊(NEKO Team)
2026-04-22 10:16:41 +00:00
xiaomo 31d1eae44a Merge pull request 'feat(cli,daemon): Phase 4 — Process Manager & Isolation' (#11) from feat/phase-4-process-manager into main 2026-04-22 09:59:43 +00:00
xiaoju b4ef669607 fix: address all 9 PR #11 review issues
Critical:
- cli.ts: add shuttingDown guard to prevent double shutdown race
- kernel.ts: check child.connected before IPC send

Warning:
- Rewrite IPC round-trip test to verify actual signal flow
- Rewrite crash recovery test to kill kernel-managed worker
- parseArgs: explicitly handle 'start' subcommand
- Respawn: reset scheduler in-flight state for crashed group

Suggestions:
- Re-export KernelOptions from index.ts
- Add comment explaining mock-worker.mjs format
- Replace setTimeout with pollUntil helper

小橘 🍊(NEKO Team)
2026-04-22 09:57:06 +00:00
xiaoju 22b3690327 feat(cli,daemon): Phase 4 — Process Manager & Isolation
- CLI entry point: `nerve start [--root <path>]` with SIGINT/SIGTERM handling
- Kernel exports groups/senseCount for startup logging
- daemon tsup builds sense-worker.ts as separate entry point
- Integration tests with mock worker (IPC round-trip, crash recovery, graceful shutdown)
- CLI re-exports createKernel/Kernel from daemon

59 tests (was 54), all green. biome check passes.

Closes #5

小橘 🍊(NEKO Team)
2026-04-22 09:45:19 +00:00
xiaomo 7bb5df301d Merge pull request 'feat(daemon): Signal Bus, Reflex Scheduler & Kernel (Phase 3)' (#10) from feat/signal-bus-reflex into main 2026-04-22 09:36:56 +00:00
xiaoju 9443406703 fix(daemon): address all 12 PR #10 review items
🔴 Critical:
1. parseWorkerMessage() in ipc.ts — validates worker→parent IPC messages
2. signalIdCounter moved inside createKernel closure
3. throttle deferred trigger — pending trigger fires after window ends

⚠️ Warnings:
4. worker respawn on crash with backoff
5. stop() awaits worker exit with SIGKILL fallback
6. signal-bus handler errors caught, no re-throw
7. removed unnecessary as SenseReflexConfig cast
8. config validates sense reflex has at least one trigger

💡 Suggestions:
9. signal ID documented as process-scoped (solved by #2)
10. +3 throttle-pending tests
11. +6 kernel unit tests (mock fork, message routing)
12. example imports verified correct

54 tests (was 45), all green. biome check passes.

小橘 🍊(NEKO Team)
2026-04-22 09:34:13 +00:00
xiaoju d9355a6299 feat(daemon): Signal Bus, Reflex Scheduler & Kernel (Phase 3)
- signal-bus: in-memory pub/sub for Signal dispatch, sync broadcast,
  subscriber error isolation
- reflex-scheduler: interval + event-driven triggers, throttle enforcement,
  merge/coalesce (pending flag, no unbounded queue), workflow reflexes skipped
- kernel: orchestrator tying workers, signal bus, and scheduler together,
  graceful shutdown
- examples/nerve.yaml: cpu-usage (10s), disk-usage (30s), system-health
  (on: [cpu-usage, disk-usage])
- 20 new tests (45 total): signal bus (8) + reflex scheduler (12)

Closes #4

小橘 🍊(NEKO Team)
2026-04-22 08:56:38 +00:00
xiaomo bf60047186 Merge pull request 'feat(daemon): Sense Runtime — Worker, IPC, Migrations, Peer Isolation' (#9) from feat/sense-runtime into main 2026-04-22 08:48:30 +00:00
40 changed files with 4042 additions and 55 deletions
+40
View File
@@ -0,0 +1,40 @@
# Example nerve.yaml demonstrating Signal Bus & Reflex Scheduler (Phase 3)
#
# Layout:
# - cpu-usage: periodic every 10s, throttled to 5s minimum between computes
# - disk-usage: periodic every 30s
# - system-health: derived sense, triggered whenever cpu-usage OR disk-usage emits
senses:
cpu-usage:
group: system
throttle: 5s
timeout: 8s
grace_period: null
disk-usage:
group: system
throttle: null
timeout: 15s
grace_period: null
system-health:
group: derived
throttle: 2s
timeout: 10s
grace_period: null
reflexes:
# cpu-usage runs on a 10-second interval
- sense: cpu-usage
interval: 10s
# disk-usage runs on a 30-second interval
- sense: disk-usage
interval: 30s
# system-health is event-driven: fires whenever cpu-usage or disk-usage emits a signal
- sense: system-health
on:
- cpu-usage
- disk-usage
+70
View File
@@ -0,0 +1,70 @@
/**
* nerve-health — built-in sense that reports daemon health via IPC.
*
* When running inside a sense worker, this compute function sends a
* "health-request" to the parent kernel process and returns the
* health-response payload as its signal.
*
* Usage in nerve.yaml:
* senses:
* nerve-health:
* group: internal
* throttle: 30s
* timeout: 5s
*
* reflexes:
* - sense: nerve-health
* interval: 30s
*/
export type NerveHealth = {
uptime: number;
activeSenses: string[];
inFlightCount: number;
workerMemoryUsage: NodeJS.MemoryUsage;
workerUptime: number;
};
export async function compute(): Promise<NerveHealth | null> {
const health = await requestHealthFromKernel();
return health;
}
function requestHealthFromKernel(): Promise<NerveHealth> {
return new Promise((resolve, reject) => {
if (!process.send) {
reject(new Error("nerve-health: not running as a child process with IPC"));
return;
}
const timeout = setTimeout(() => {
process.removeListener("message", onMessage);
reject(new Error("nerve-health: health-request timed out"));
}, 5000);
function onMessage(msg: unknown): void {
if (
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "health-response"
) {
clearTimeout(timeout);
process.removeListener("message", onMessage);
const resp = msg as {
senses: string[];
inFlightCount: number;
};
resolve({
uptime: process.uptime(),
activeSenses: resp.senses,
inFlightCount: resp.inFlightCount,
workerMemoryUsage: process.memoryUsage(),
workerUptime: process.uptime(),
});
}
}
process.on("message", onMessage);
process.send({ type: "health-request" });
});
}
+8
View File
@@ -10,5 +10,13 @@
"types": "dist/index.d.ts",
"scripts": {
"build": "tsup"
},
"dependencies": {
"@uncaged/nerve-core": "workspace:*",
"@uncaged/nerve-daemon": "workspace:*",
"citty": "^0.1.6"
},
"devDependencies": {
"@types/node": "^22.0.0"
}
}
+25
View File
@@ -0,0 +1,25 @@
#!/usr/bin/env node
import { defineCommand, runMain } from "citty";
import { initCommand } from "./commands/init.js";
import { startCommand } from "./commands/start.js";
import { statusCommand } from "./commands/status.js";
import { stopCommand } from "./commands/stop.js";
import { validateCommand } from "./commands/validate.js";
const main = defineCommand({
meta: {
name: "nerve",
description: "Nerve — an AI agent kernel",
},
subCommands: {
init: initCommand,
start: startCommand,
stop: stopCommand,
status: statusCommand,
validate: validateCommand,
},
});
runMain(main);
+170
View File
@@ -0,0 +1,170 @@
import { existsSync, mkdirSync, writeFileSync } from "node:fs";
import { dirname, join } from "node:path";
import { defineCommand } from "citty";
import { getNerveRoot } from "../workspace.js";
const NERVE_YAML = `# nerve.yaml — Nerve workspace configuration
senses:
cpu-usage:
group: system
throttle: 5s
timeout: 10s
grace_period: null
reflexes:
- kind: sense
sense: cpu-usage
interval: 10s
`;
const PACKAGE_JSON = `{
"name": "my-nerve-workspace",
"version": "0.0.1",
"private": true,
"type": "module",
"dependencies": {
"@uncaged/nerve-core": "latest",
"drizzle-orm": "latest"
},
"devDependencies": {
"drizzle-kit": "latest"
}
}
`;
const GITIGNORE = `data/
node_modules/
`;
const CPU_SCHEMA_TS = `import { integer, real, sqliteTable, text } from "drizzle-orm/sqlite-core";
export const cpuUsage = sqliteTable("cpu_usage", {
id: integer("id").primaryKey({ autoIncrement: true }),
ts: integer("ts").notNull(),
model: text("model").notNull(),
loadPercent: real("load_percent").notNull(),
});
`;
const CPU_INDEX_TS = `import { cpus } from "node:os";
export async function compute(): Promise<unknown> {
const cpuList = cpus();
let totalIdle = 0;
let totalTick = 0;
for (const cpu of cpuList) {
for (const [, time] of Object.entries(cpu.times)) {
totalTick += time;
}
totalIdle += cpu.times.idle;
}
const loadPercent = totalTick === 0 ? 0 : ((totalTick - totalIdle) / totalTick) * 100;
return {
model: cpuList[0]?.model ?? "unknown",
loadPercent: Math.round(loadPercent * 100) / 100,
ts: Date.now(),
};
}
`;
const CPU_MIGRATION_SQL = `CREATE TABLE IF NOT EXISTS cpu_usage (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts INTEGER NOT NULL,
model TEXT NOT NULL,
load_percent REAL NOT NULL
);
`;
function writeFile(filePath: string, content: string): void {
mkdirSync(dirname(filePath), { recursive: true });
writeFileSync(filePath, content, "utf8");
}
async function runCommand(cmd: string, args: string[], cwd: string): Promise<void> {
const { spawn } = await import("node:child_process");
await new Promise<void>((resolve, reject) => {
const child = spawn(cmd, args, { cwd, stdio: "inherit" });
child.on("close", (code) => {
if (code === 0) resolve();
else reject(new Error(`${cmd} exited with code ${code}`));
});
child.on("error", reject);
});
}
async function detectPackageManager(): Promise<{ cmd: string; args: string[] }> {
const { execFile } = await import("node:child_process");
const { promisify } = await import("node:util");
const execFileAsync = promisify(execFile);
for (const pm of ["pnpm", "yarn", "npm"]) {
try {
await execFileAsync(pm, ["--version"]);
const args = pm === "pnpm" ? ["install", "--no-cache"] : ["install"];
return { cmd: pm, args };
} catch {
// not available, try next
}
}
return { cmd: "npm", args: ["install"] };
}
export const initCommand = defineCommand({
meta: {
name: "init",
description: "Initialize the ~/.uncaged-nerve/ workspace",
},
args: {
force: {
type: "boolean",
description: "Reinitialize even if workspace already exists (preserves data/)",
default: false,
},
},
async run({ args }) {
const nerveRoot = getNerveRoot();
if (existsSync(nerveRoot) && !args.force) {
process.stderr.write("⚠️ ~/.uncaged-nerve/ already exists. Use --force to reinitialize.\n");
process.exit(1);
}
mkdirSync(join(nerveRoot, "data"), { recursive: true });
mkdirSync(join(nerveRoot, "senses", "cpu-usage", "migrations"), { recursive: true });
writeFile(join(nerveRoot, "nerve.yaml"), NERVE_YAML);
writeFile(join(nerveRoot, "package.json"), PACKAGE_JSON);
writeFile(join(nerveRoot, ".gitignore"), GITIGNORE);
writeFile(join(nerveRoot, "senses", "cpu-usage", "schema.ts"), CPU_SCHEMA_TS);
writeFile(join(nerveRoot, "senses", "cpu-usage", "index.ts"), CPU_INDEX_TS);
writeFile(
join(nerveRoot, "senses", "cpu-usage", "migrations", "0001_init.sql"),
CPU_MIGRATION_SQL,
);
process.stdout.write("Installing dependencies…\n");
try {
const { cmd, args } = await detectPackageManager();
await runCommand(cmd, args, nerveRoot);
} catch {
process.stdout.write("⚠️ Install failed — you may need to install dependencies manually.\n");
}
if (!existsSync(join(nerveRoot, ".git"))) {
try {
await runCommand("git", ["init"], nerveRoot);
} catch {
process.stdout.write("⚠️ git init failed — skipping.\n");
}
}
process.stdout.write(
"✅ Workspace created at ~/.uncaged-nerve/\n 1 example sense: cpu-usage\n Run `nerve start` to launch the daemon.\n",
);
},
});
+145
View File
@@ -0,0 +1,145 @@
import { createWriteStream } from "node:fs";
import { readFileSync } from "node:fs";
import { mkdir } from "node:fs/promises";
import { join } from "node:path";
import { fileURLToPath } from "node:url";
import { parseNerveConfig } from "@uncaged/nerve-core";
import { createKernel } from "@uncaged/nerve-daemon";
import { defineCommand } from "citty";
import { getLogPath, getNerveRoot, isRunning, readPidFile, writePidFile } from "../workspace.js";
function readConfig(nerveRoot: string): ReturnType<typeof parseNerveConfig> {
const configPath = join(nerveRoot, "nerve.yaml");
let raw: string;
try {
raw = readFileSync(configPath, "utf8");
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
return { ok: false, error: new Error(`❌ Cannot read ${configPath}: ${msg}`) };
}
return parseNerveConfig(raw);
}
async function runForeground(nerveRoot: string): Promise<void> {
const configResult = readConfig(nerveRoot);
if (!configResult.ok) {
process.stderr.write(`${configResult.error.message}\n`);
process.exit(1);
}
const config = configResult.value;
const kernel = createKernel(config, nerveRoot);
const senseNames = Object.keys(config.senses);
const groups = [...kernel.groups];
process.stdout.write(
`✅ Nerve starting — ${senseNames.length} sense(s), ${groups.length} group(s)\n`,
);
for (const group of groups) {
const groupSenses = Object.entries(config.senses)
.filter(([, sc]) => sc.group === group)
.map(([name]) => name);
process.stdout.write(` group "${group}": ${groupSenses.join(", ")}\n`);
}
process.stdout.write(" Press Ctrl+C to stop.\n");
let shuttingDown = false;
async function shutdown(): Promise<void> {
if (shuttingDown) return;
shuttingDown = true;
process.stdout.write("\n[nerve] Shutting down…\n");
await kernel.stop();
process.exit(0);
}
process.on("SIGINT", () => {
shutdown().catch((e: unknown) => {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[nerve] Shutdown error: ${msg}\n`);
process.exit(1);
});
});
process.on("SIGTERM", () => {
shutdown().catch((e: unknown) => {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[nerve] Shutdown error: ${msg}\n`);
process.exit(1);
});
});
await kernel.ready;
}
async function runDaemon(nerveRoot: string): Promise<void> {
if (isRunning()) {
const pid = readPidFile();
process.stderr.write(`⚠️ Nerve daemon is already running (pid ${pid}).\n`);
process.exit(1);
}
const configResult = readConfig(nerveRoot);
if (!configResult.ok) {
process.stderr.write(`${configResult.error.message}\n`);
process.exit(1);
}
const logPath = getLogPath();
await mkdir(join(nerveRoot, "logs"), { recursive: true });
const { spawn } = await import("node:child_process");
const logStream = createWriteStream(logPath, { flags: "a" });
await new Promise<void>((resolve) => {
if (logStream.pending) logStream.once("open", () => resolve());
else resolve();
});
const selfPath = fileURLToPath(import.meta.url);
const child = spawn(process.execPath, [selfPath, "start"], {
detached: true,
stdio: ["ignore", logStream.fd, logStream.fd],
env: { ...process.env, NERVE_DAEMON_MODE: "1" },
});
child.unref();
const pid = child.pid;
if (!pid) {
process.stderr.write("❌ Failed to spawn daemon process.\n");
process.exit(1);
}
writePidFile(pid);
process.stdout.write(`✅ Nerve daemon started (pid ${pid}).\n`);
process.stdout.write(` Logs: ${logPath}\n`);
process.stdout.write(" Run `nerve stop` to stop.\n");
}
export const startCommand = defineCommand({
meta: {
name: "start",
description: "Start the nerve daemon",
},
args: {
daemon: {
type: "boolean",
alias: "d",
description: "Run as background daemon",
default: false,
},
},
async run({ args }) {
const nerveRoot = getNerveRoot();
if (args.daemon) {
await runDaemon(nerveRoot);
} else {
await runForeground(nerveRoot);
}
},
});
+79
View File
@@ -0,0 +1,79 @@
import { readFileSync, statSync } from "node:fs";
import { join } from "node:path";
import { parseNerveConfig } from "@uncaged/nerve-core";
import { defineCommand } from "citty";
import { getNerveRoot, getPidPath, isRunning, readPidFile } from "../workspace.js";
function formatUptime(ms: number): string {
const totalSeconds = Math.floor(ms / 1000);
const hours = Math.floor(totalSeconds / 3600);
const minutes = Math.floor((totalSeconds % 3600) / 60);
const seconds = totalSeconds % 60;
if (hours > 0) return `${hours}h ${minutes}m ${seconds}s`;
if (minutes > 0) return `${minutes}m ${seconds}s`;
return `${seconds}s`;
}
function getUptimeMs(pid: number): number | null {
try {
const pidStat = readFileSync(`/proc/${pid}/stat`, "utf8").split(" ");
const startJiffies = Number(pidStat[21]);
const clkTck = 100;
const uptimeRaw = readFileSync("/proc/uptime", "utf8").split(" ")[0];
const systemUptimeSec = Number.parseFloat(uptimeRaw);
const processStartSec = startJiffies / clkTck;
return (systemUptimeSec - processStartSec) * 1000;
} catch {
// /proc not available (non-Linux); fall back to PID file mtime
try {
const pidMtime = statSync(getPidPath()).mtimeMs;
return Date.now() - pidMtime;
} catch {
return null;
}
}
}
export const statusCommand = defineCommand({
meta: {
name: "status",
description: "Show nerve daemon status",
},
async run() {
if (!isRunning()) {
process.stdout.write("😴 Nerve daemon is not running.\n");
return;
}
const pid = readPidFile() as number;
const configPath = join(getNerveRoot(), "nerve.yaml");
let senseList: string[] = [];
let workerGroups: string[] = [];
try {
const raw = readFileSync(configPath, "utf8");
const result = parseNerveConfig(raw);
if (result.ok) {
senseList = Object.keys(result.value.senses);
workerGroups = [...new Set(Object.values(result.value.senses).map((s) => s.group))];
}
} catch {
// config may not be readable; continue with what we have
}
const uptimeMs = getUptimeMs(pid);
const uptimeStr = uptimeMs !== null ? formatUptime(uptimeMs) : "unknown";
process.stdout.write("✅ Nerve daemon is running.\n");
process.stdout.write(` pid: ${pid}\n`);
process.stdout.write(` uptime: ${uptimeStr}\n`);
process.stdout.write(` senses: ${senseList.length > 0 ? senseList.join(", ") : "(none)"}\n`);
process.stdout.write(
` workers: ${workerGroups.length > 0 ? workerGroups.join(", ") : "(none)"}\n`,
);
process.stdout.write(" signals: (pending SignalBus persistence)\n");
},
});
+58
View File
@@ -0,0 +1,58 @@
import { defineCommand } from "citty";
import { isRunning, readPidFile, removePidFile } from "../workspace.js";
async function waitForExit(pid: number, timeoutMs: number): Promise<boolean> {
const start = Date.now();
while (Date.now() - start < timeoutMs) {
try {
process.kill(pid, 0);
} catch {
return true;
}
await new Promise<void>((resolve) => setTimeout(resolve, 200));
}
return false;
}
export const stopCommand = defineCommand({
meta: {
name: "stop",
description: "Stop the nerve daemon",
},
async run() {
const pid = readPidFile();
if (pid === null) {
process.stdout.write("⚠️ No PID file found — daemon may not be running.\n");
return;
}
if (!isRunning()) {
process.stdout.write("⚠️ Daemon is not running (stale PID file). Cleaning up.\n");
removePidFile();
return;
}
process.stdout.write(`Stopping nerve daemon (pid ${pid})…\n`);
try {
process.kill(pid, "SIGTERM");
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`❌ Failed to send SIGTERM: ${msg}\n`);
process.exit(1);
}
const graceful = await waitForExit(pid, 10_000);
if (!graceful) {
process.stdout.write("⚠️ Daemon did not exit in 10s — sending SIGKILL.\n");
try {
process.kill(pid, "SIGKILL");
} catch {
// already dead
}
}
removePidFile();
process.stdout.write("✅ Daemon stopped.\n");
},
});
+40
View File
@@ -0,0 +1,40 @@
import { readFileSync } from "node:fs";
import { join } from "node:path";
import { parseNerveConfig } from "@uncaged/nerve-core";
import { defineCommand } from "citty";
import { getNerveRoot } from "../workspace.js";
export const validateCommand = defineCommand({
meta: {
name: "validate",
description: "Validate nerve.yaml configuration",
},
async run() {
const configPath = join(getNerveRoot(), "nerve.yaml");
let raw: string;
try {
raw = readFileSync(configPath, "utf8");
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`❌ Cannot read ${configPath}: ${msg}\n`);
process.exit(1);
}
const result = parseNerveConfig(raw);
if (!result.ok) {
process.stderr.write(`❌ Config validation failed: ${result.error.message}\n`);
process.exit(1);
}
const config = result.value;
const senseCount = Object.keys(config.senses).length;
const reflexCount = config.reflexes.length;
const workflowCount = config.workflows ? Object.keys(config.workflows).length : 0;
process.stdout.write(
`✅ nerve.yaml is valid — ${senseCount} sense(s), ${reflexCount} reflex(es), ${workflowCount} workflow(s)\n`,
);
},
});
+12 -1
View File
@@ -1 +1,12 @@
// TODO: implement
export {
getNerveRoot,
getPidPath,
getLogPath,
readPidFile,
writePidFile,
removePidFile,
isRunning,
} from "./workspace.js";
export { createKernel } from "@uncaged/nerve-daemon";
export type { Kernel } from "@uncaged/nerve-daemon";
+45
View File
@@ -0,0 +1,45 @@
import { existsSync, readFileSync, rmSync, writeFileSync } from "node:fs";
import { homedir } from "node:os";
import { join } from "node:path";
export function getNerveRoot(): string {
return join(homedir(), ".uncaged-nerve");
}
export function getPidPath(): string {
return join(getNerveRoot(), "nerve.pid");
}
export function getLogPath(): string {
return join(getNerveRoot(), "logs", "nerve.log");
}
export function readPidFile(): number | null {
const pidPath = getPidPath();
if (!existsSync(pidPath)) return null;
const raw = readFileSync(pidPath, "utf8").trim();
const pid = Number.parseInt(raw, 10);
return Number.isNaN(pid) ? null : pid;
}
export function writePidFile(pid: number): void {
writeFileSync(getPidPath(), String(pid), "utf8");
}
export function removePidFile(): void {
const pidPath = getPidPath();
if (existsSync(pidPath)) {
rmSync(pidPath);
}
}
export function isRunning(): boolean {
const pid = readPidFile();
if (pid === null) return false;
try {
process.kill(pid, 0);
return true;
} catch {
return false;
}
}
+3 -1
View File
@@ -2,7 +2,9 @@
"extends": "../../tsconfig.json",
"compilerOptions": {
"outDir": "dist",
"rootDir": "src"
"rootDir": "src",
"composite": false,
"types": ["node"]
},
"include": ["src"]
}
+1 -1
View File
@@ -1,7 +1,7 @@
import { defineConfig } from "tsup";
export default defineConfig({
entry: ["src/index.ts"],
entry: ["src/index.ts", "src/cli.ts"],
format: ["esm"],
dts: true,
clean: true,
+6
View File
@@ -98,6 +98,12 @@ function parseSenseReflex(
const intervalResult = parseDurationField(obj.interval, `reflexes[${index}].interval`);
if (!intervalResult.ok) return intervalResult;
if (intervalResult.value === null && on !== null && on.length === 0) {
return err(
new Error(`reflexes[${index}]: sense reflex must have at least one of "interval" or "on"`),
);
}
return ok({
kind: "sense" as const,
sense: obj.sense,
+2 -1
View File
@@ -2,7 +2,8 @@
"extends": "../../tsconfig.json",
"compilerOptions": {
"outDir": "dist",
"rootDir": "src"
"rootDir": "src",
"composite": false
},
"include": ["src"]
}
@@ -0,0 +1,126 @@
/**
* Unit tests for file-watcher.ts
*/
import { mkdirSync, mkdtempSync, writeFileSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, describe, expect, it } from "vitest";
import { createFileWatcher } from "../file-watcher.js";
import type { FileChange, FileWatcher } from "../file-watcher.js";
function makeTempNerveRoot(): string {
const dir = mkdtempSync(join(tmpdir(), "nerve-fw-test-"));
mkdirSync(join(dir, "senses", "cpu-usage"), { recursive: true });
writeFileSync(join(dir, "nerve.yaml"), "senses: {}\nreflexes: []\n");
writeFileSync(
join(dir, "senses", "cpu-usage", "index.ts"),
"export async function compute() { return null; }",
);
return dir;
}
async function waitFor(
predicate: () => boolean,
timeoutMs: number,
intervalMs = 50,
): Promise<void> {
return new Promise((resolve, reject) => {
const timer = setTimeout(
() => reject(new Error(`waitFor timed out after ${timeoutMs}ms`)),
timeoutMs,
);
const check = setInterval(() => {
if (predicate()) {
clearTimeout(timer);
clearInterval(check);
resolve();
}
}, intervalMs);
});
}
describe("createFileWatcher", () => {
let watcher: FileWatcher | null = null;
afterEach(() => {
if (watcher !== null) {
watcher.close();
watcher = null;
}
});
it("detects nerve.yaml changes", async () => {
const root = makeTempNerveRoot();
const changes: FileChange[] = [];
watcher = createFileWatcher(root, (change) => changes.push(change), 50);
// Wait a bit for watcher to initialize, then modify nerve.yaml
await new Promise((r) => setTimeout(r, 100));
writeFileSync(join(root, "nerve.yaml"), "senses: {}\nreflexes: []\n# changed\n");
await waitFor(() => changes.length > 0, 3000);
expect(changes.length).toBeGreaterThanOrEqual(1);
expect(changes.some((c) => c.kind === "config")).toBe(true);
}, 10_000);
it("detects sense .ts file changes", async () => {
const root = makeTempNerveRoot();
const changes: FileChange[] = [];
watcher = createFileWatcher(root, (change) => changes.push(change), 50);
await new Promise((r) => setTimeout(r, 100));
writeFileSync(
join(root, "senses", "cpu-usage", "index.ts"),
"export async function compute() { return 42; }",
);
await waitFor(() => changes.length > 0, 3000);
expect(changes.length).toBeGreaterThanOrEqual(1);
const senseChanges = changes.filter((c) => c.kind === "sense");
expect(senseChanges.length).toBeGreaterThanOrEqual(1);
expect((senseChanges[0] as { senseName: string }).senseName).toBe("cpu-usage");
}, 10_000);
it("close() stops the watcher", async () => {
const root = makeTempNerveRoot();
const changes: FileChange[] = [];
watcher = createFileWatcher(root, (change) => changes.push(change), 50);
await new Promise((r) => setTimeout(r, 100));
watcher.close();
watcher = null;
writeFileSync(join(root, "nerve.yaml"), "senses: {}\nreflexes: []\n# after close\n");
// Wait and verify no changes were captured
await new Promise((r) => setTimeout(r, 500));
expect(changes.length).toBe(0);
}, 5_000);
it("debounces rapid changes", async () => {
const root = makeTempNerveRoot();
const changes: FileChange[] = [];
watcher = createFileWatcher(root, (change) => changes.push(change), 200);
await new Promise((r) => setTimeout(r, 100));
// Write rapidly
for (let i = 0; i < 5; i++) {
writeFileSync(join(root, "nerve.yaml"), `senses: {}\nreflexes: []\n# v${i}\n`);
}
await waitFor(() => changes.length > 0, 3000);
// With debounce, should see only 1 change (not 5)
expect(changes.length).toBe(1);
}, 10_000);
});
@@ -0,0 +1,36 @@
/**
* Mock worker that crashes on first compute, then works normally after respawn.
* Uses a marker file to track if it has already crashed.
*
* First invocation: sends ready, then exits with code 1 on first compute.
* Subsequent invocations (after respawn): works like normal mock-worker.
*/
import { existsSync, unlinkSync, writeFileSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
const markerFile = join(tmpdir(), `nerve-crash-once-${process.ppid}.marker`);
process.send({ type: "ready" });
const hasCrashed = existsSync(markerFile);
process.on("message", (msg) => {
if (!msg || typeof msg !== "object") return;
if (msg.type === "shutdown") {
try {
unlinkSync(markerFile);
} catch {}
process.exit(0);
}
if (msg.type === "compute" && typeof msg.sense === "string") {
if (!hasCrashed) {
writeFileSync(markerFile, "crashed", "utf8");
process.exit(1);
}
process.send({ type: "signal", sense: msg.sense, payload: 42 });
}
});
@@ -0,0 +1,18 @@
/**
* Mock worker that responds to compute with an error message.
* Used for error isolation integration tests.
*/
process.send({ type: "ready" });
process.on("message", (msg) => {
if (!msg || typeof msg !== "object") return;
if (msg.type === "shutdown") {
process.exit(0);
}
if (msg.type === "compute" && typeof msg.sense === "string") {
process.send({ type: "error", sense: msg.sense, error: "simulated compute error" });
}
});
@@ -0,0 +1,28 @@
/**
* Mock sense worker that implements the IPC protocol for integration testing.
*
* This file intentionally uses the .mjs extension (rather than .ts) because it
* is spawned as a child process via fork() at runtime and must execute without
* any TypeScript compilation step. Node.js can run .mjs files directly as
* ESModules, whereas .ts files would require ts-node/tsx to be available in the
* child process environment.
*
* Behaviour:
* - Sends { type: "ready" } on startup
* - On { type: "compute", sense } → sends back { type: "signal", sense, payload: 42 }
* - On { type: "shutdown" } → exits cleanly with code 0
*/
process.send({ type: "ready" });
process.on("message", (msg) => {
if (!msg || typeof msg !== "object") return;
if (msg.type === "shutdown") {
process.exit(0);
}
if (msg.type === "compute" && typeof msg.sense === "string") {
process.send({ type: "signal", sense: msg.sense, payload: 42 });
}
});
@@ -0,0 +1,24 @@
/**
* Mock worker that takes a long time to respond to compute messages.
* Used for grace period / timeout integration tests.
*
* On compute: waits 10 seconds before responding (simulates hung compute).
* On shutdown: exits cleanly.
*/
process.send({ type: "ready" });
process.on("message", (msg) => {
if (!msg || typeof msg !== "object") return;
if (msg.type === "shutdown") {
process.exit(0);
}
if (msg.type === "compute" && typeof msg.sense === "string") {
// Intentionally slow — will be killed by grace period
setTimeout(() => {
process.send({ type: "signal", sense: msg.sense, payload: "late" });
}, 10_000);
}
});
@@ -0,0 +1,207 @@
/**
* Integration tests for the Kernel / Process Manager.
*
* These tests use REAL child processes via a mock worker fixture that
* implements the IPC protocol (fixtures/mock-worker.mjs). No build
* artifacts are required.
*/
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import type { Signal } from "@uncaged/nerve-core";
import type { NerveConfig } from "@uncaged/nerve-core";
import { afterEach, describe, expect, it } from "vitest";
import { createKernel } from "../kernel.js";
import type { Kernel } from "../kernel.js";
const __filename = fileURLToPath(import.meta.url);
const __dir = dirname(__filename);
const MOCK_WORKER = join(__dir, "fixtures", "mock-worker.mjs");
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
return {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
...overrides,
};
}
/** Poll until predicate returns true, with a timeout. */
async function pollUntil(
predicate: () => boolean,
timeoutMs: number,
intervalMs = 50,
): Promise<void> {
return new Promise((resolve, reject) => {
const timer = setTimeout(
() => reject(new Error(`pollUntil timed out after ${timeoutMs}ms`)),
timeoutMs,
);
const check = setInterval(() => {
if (predicate()) {
clearTimeout(timer);
clearInterval(check);
resolve();
}
}, intervalMs);
});
}
describe("kernel integration — real child processes", () => {
let kernel: Kernel | null = null;
afterEach(async () => {
if (kernel !== null) {
await kernel.stop();
kernel = null;
}
});
it("returns correct groups and senseCount", () => {
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"disk-io": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
},
});
kernel = createKernel(config, "/tmp/nerve-integration-test", {
workerScript: MOCK_WORKER,
});
expect(kernel.groups.size).toBe(2);
expect(kernel.groups.has("system")).toBe(true);
expect(kernel.groups.has("network")).toBe(true);
expect(kernel.senseCount).toBe(3);
});
it("workers start and respond to compute messages with signals", async () => {
const config = makeConfig();
kernel = createKernel(config, "/tmp/nerve-integration-test", {
workerScript: MOCK_WORKER,
});
// Wait for all workers to be ready (event-based, not fixed delay)
await kernel.ready;
// Subscribe to the bus before triggering compute
const received: Signal[] = [];
const unsub = kernel.bus.subscribe((signal) => {
received.push(signal);
});
// Trigger a compute for "cpu-usage" through the kernel's triggerCompute
kernel.triggerCompute("cpu-usage");
// Poll until a signal arrives on the bus (event-driven, no fixed delay)
await pollUntil(() => received.length > 0, 3000);
expect(received).toHaveLength(1);
expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
unsub();
}, 10_000);
it("graceful shutdown: stop() resolves after all workers exit", async () => {
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
},
});
kernel = createKernel(config, "/tmp/nerve-integration-test", {
workerScript: MOCK_WORKER,
});
// Wait for all workers to be ready (event-based)
await kernel.ready;
const stopPromise = kernel.stop();
kernel = null;
// stop() should resolve within 5s (our shutdown timeout)
await expect(stopPromise).resolves.toBeUndefined();
}, 10_000);
it("compute round-trip: worker receives compute and sends signal back through bus", async () => {
const config = makeConfig();
kernel = createKernel(config, "/tmp/nerve-integration-test", {
workerScript: MOCK_WORKER,
});
// Wait for all workers to be ready (event-based, not fixed delay)
await kernel.ready;
const received: Signal[] = [];
const unsub = kernel.bus.subscribe((signal) => {
received.push(signal);
});
// Trigger compute via the kernel — the kernel sends IPC to the worker,
// the mock worker responds with a signal message, and the kernel routes it to the bus.
kernel.triggerCompute("cpu-usage");
// Poll for the signal on the bus (no fixed delay)
await pollUntil(() => received.length > 0, 3000);
expect(received).toHaveLength(1);
expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
unsub();
}, 10_000);
it("crash recovery: kernel respawns worker after unexpected exit and new worker is functional", async () => {
const config = makeConfig();
kernel = createKernel(config, "/tmp/nerve-integration-test", {
workerScript: MOCK_WORKER,
});
// Wait for initial worker to be ready (event-based)
await kernel.ready;
const originalPid = kernel.getWorkerPid("system");
expect(originalPid).not.toBeNull();
expect(originalPid).toBeGreaterThan(0);
// Kill the kernel's own worker to simulate a crash (SIGKILL, code != 0)
process.kill(originalPid as number, "SIGKILL");
// Poll until the kernel respawns and registers a new worker with a different PID
// (respawn delay is 1s, then fork(), then workers.set())
await pollUntil(() => {
const pid = kernel?.getWorkerPid("system");
return pid !== null && pid !== originalPid;
}, 5000);
const newPid = kernel.getWorkerPid("system");
expect(newPid).not.toBeNull();
expect(newPid).not.toBe(originalPid);
// Wait a bit for the new worker to send its "ready" message and be fully up.
// Poll until the new worker responds to a compute message on the bus.
const postRespawnSignals: Signal[] = [];
const unsub = kernel.bus.subscribe((signal) => {
postRespawnSignals.push(signal);
});
// Trigger compute through the kernel to the new worker
kernel.triggerCompute("cpu-usage");
// Poll for the signal — verifies the new worker is fully functional
await pollUntil(() => postRespawnSignals.length > 0, 5000);
expect(postRespawnSignals).toHaveLength(1);
expect(postRespawnSignals[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
unsub();
// Kernel should still stop gracefully after respawn
await kernel.stop();
kernel = null;
}, 15_000);
});
@@ -0,0 +1,241 @@
/**
* Unit tests for kernel Phase 6 enhancements — restartGroup, reloadConfig, getHealth.
* Uses mocked child_process.fork.
*/
import { EventEmitter } from "node:events";
import type { NerveConfig } from "@uncaged/nerve-core";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
// ---------------------------------------------------------------------------
// Mock child_process.fork
// ---------------------------------------------------------------------------
const mockChildren: MockChild[] = [];
type MockChild = EventEmitter & {
send: ReturnType<typeof vi.fn>;
kill: ReturnType<typeof vi.fn>;
pid: number;
connected: boolean;
};
function makeMockChild(pid = 1): MockChild {
const child = new EventEmitter() as MockChild;
child.connected = true;
child.send = vi.fn((msg: unknown) => {
if (
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "shutdown"
) {
setImmediate(() => {
child.connected = false;
child.emit("exit", 0, null);
});
}
});
child.kill = vi.fn((_signal?: string) => {
child.connected = false;
child.emit("exit", null, _signal ?? "SIGKILL");
});
child.pid = pid;
// Auto-emit ready so restartGroup's await resolves
setImmediate(() => {
child.emit("message", { type: "ready" });
});
return child;
}
vi.mock("node:child_process", () => ({
fork: vi.fn((_script: string, _args: string[], _opts: unknown) => {
const child = makeMockChild(mockChildren.length + 100);
mockChildren.push(child);
return child;
}),
}));
// Must also mock file-watcher since kernel imports it
vi.mock("../file-watcher.js", () => ({
createFileWatcher: vi.fn(() => ({ close: vi.fn() })),
}));
const { createKernel } = await import("../kernel.js");
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
return {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
...overrides,
};
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describe("kernel — getHealth", () => {
beforeEach(() => {
mockChildren.length = 0;
vi.useFakeTimers({ shouldAdvanceTime: true });
});
afterEach(() => {
vi.useRealTimers();
});
it("returns correct health shape", async () => {
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"disk-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
},
});
const kernel = createKernel(config, "/tmp/nerve-test");
const health = kernel.getHealth();
expect(health.activeSenses).toBe(3);
expect(health.activeGroups).toBe(2);
expect(health.uptime).toBeGreaterThanOrEqual(0);
expect(health.memoryUsage).toBeDefined();
expect(typeof health.memoryUsage.heapUsed).toBe("number");
await kernel.stop();
});
});
describe("kernel — restartGroup", () => {
beforeEach(() => {
mockChildren.length = 0;
vi.useFakeTimers({ shouldAdvanceTime: true });
});
afterEach(() => {
vi.useRealTimers();
});
it("sends shutdown to old worker and spawns new one", async () => {
const config = makeConfig();
const kernel = createKernel(config, "/tmp/nerve-test");
expect(mockChildren.length).toBe(1);
const oldChild = mockChildren[0];
const restartPromise = kernel.restartGroup("system");
// The shutdown message triggers exit in the mock
await restartPromise;
// A new child should have been spawned
expect(mockChildren.length).toBe(2);
// Old child got shutdown
expect(oldChild.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" }));
await kernel.stop();
});
it("restartGroup on unknown group does nothing", async () => {
const config = makeConfig();
const kernel = createKernel(config, "/tmp/nerve-test");
expect(mockChildren.length).toBe(1);
await kernel.restartGroup("nonexistent");
// No new child spawned
expect(mockChildren.length).toBe(1);
await kernel.stop();
});
});
describe("kernel — reloadConfig", () => {
beforeEach(() => {
mockChildren.length = 0;
vi.useFakeTimers({ shouldAdvanceTime: true });
});
afterEach(() => {
vi.useRealTimers();
});
it("adds new group worker when new sense group appears", async () => {
const config = makeConfig();
const kernel = createKernel(config, "/tmp/nerve-test");
expect(mockChildren.length).toBe(1); // only system group
expect(kernel.groups.has("network")).toBe(false);
kernel.reloadConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
});
expect(kernel.groups.has("network")).toBe(true);
expect(mockChildren.length).toBe(2); // system + network
await kernel.stop();
});
it("removes group worker when its senses are all removed", async () => {
const config: NerveConfig = {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
};
const kernel = createKernel(config, "/tmp/nerve-test");
expect(mockChildren.length).toBe(2);
expect(kernel.groups.has("network")).toBe(true);
const networkChild = mockChildren[1];
kernel.reloadConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
});
expect(kernel.groups.has("network")).toBe(false);
// Network child should have received shutdown
expect(networkChild.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" }));
await kernel.stop();
});
it("health reflects updated sense count after reloadConfig", async () => {
const config = makeConfig();
const kernel = createKernel(config, "/tmp/nerve-test");
expect(kernel.getHealth().activeSenses).toBe(1);
kernel.reloadConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"disk-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
});
expect(kernel.getHealth().activeSenses).toBe(2);
await kernel.stop();
});
});
@@ -0,0 +1,200 @@
import { EventEmitter } from "node:events";
import type { NerveConfig } from "@uncaged/nerve-core";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
// ---------------------------------------------------------------------------
// Mock child_process.fork before importing kernel
// ---------------------------------------------------------------------------
const mockChildren: MockChild[] = [];
type MockChild = EventEmitter & {
send: ReturnType<typeof vi.fn>;
kill: ReturnType<typeof vi.fn>;
pid: number;
};
function makeMockChild(pid = 1): MockChild {
const child = new EventEmitter() as MockChild;
child.send = vi.fn((msg: unknown) => {
// Auto-exit on shutdown so stop() resolves
if (
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "shutdown"
) {
setImmediate(() => child.emit("exit", 0, null));
}
});
child.kill = vi.fn((_signal?: string) => {
child.emit("exit", null, _signal ?? "SIGKILL");
});
child.pid = pid;
return child;
}
vi.mock("node:child_process", () => ({
fork: vi.fn((_script: string, _args: string[], _opts: unknown) => {
const child = makeMockChild(mockChildren.length + 1);
mockChildren.push(child);
return child;
}),
}));
// Import after mock is set up
const { createKernel } = await import("../kernel.js");
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
return {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
...overrides,
};
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describe("kernel — message routing", () => {
beforeEach(() => {
mockChildren.length = 0;
vi.useFakeTimers({ shouldAdvanceTime: true });
});
afterEach(() => {
vi.useRealTimers();
});
it("routes signal message to bus without throwing", async () => {
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
});
const kernel = createKernel(config, "/tmp/nerve-test");
expect(mockChildren.length).toBe(1);
const child = mockChildren[0];
expect(() => {
child.emit("message", { type: "signal", sense: "cpu-usage", payload: 42 });
}).not.toThrow();
await kernel.stop();
});
it("routes error message to stderr", async () => {
const stderrSpy = vi.spyOn(process.stderr, "write").mockImplementation(() => true);
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
});
const kernel = createKernel(config, "/tmp/nerve-test");
const child = mockChildren[0];
child.emit("message", { type: "error", sense: "cpu-usage", error: "compute failed" });
expect(stderrSpy).toHaveBeenCalledWith(
expect.stringContaining('sense "cpu-usage" error: compute failed'),
);
stderrSpy.mockRestore();
await kernel.stop();
});
it("ignores ready messages without logging", async () => {
const stderrSpy = vi.spyOn(process.stderr, "write").mockImplementation(() => true);
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
});
const kernel = createKernel(config, "/tmp/nerve-test");
const child = mockChildren[0];
const callsBefore = stderrSpy.mock.calls.length;
child.emit("message", { type: "ready" });
expect(stderrSpy.mock.calls.length).toBe(callsBefore);
stderrSpy.mockRestore();
await kernel.stop();
});
it("logs invalid worker messages without throwing", async () => {
const stderrSpy = vi.spyOn(process.stderr, "write").mockImplementation(() => true);
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
});
const kernel = createKernel(config, "/tmp/nerve-test");
const child = mockChildren[0];
expect(() => child.emit("message", { type: "unknown-type" })).not.toThrow();
expect(stderrSpy).toHaveBeenCalledWith(expect.stringContaining("invalid worker message"));
stderrSpy.mockRestore();
await kernel.stop();
});
});
describe("kernel — groupForSense mapping", () => {
beforeEach(() => {
mockChildren.length = 0;
vi.useFakeTimers({ shouldAdvanceTime: true });
});
afterEach(() => {
vi.useRealTimers();
});
it("spawns one worker per unique group", async () => {
const config: NerveConfig = {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"disk-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"net-usage": { group: "network", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
};
const kernel = createKernel(config, "/tmp/nerve-test");
// system and network = 2 unique groups
expect(mockChildren.length).toBe(2);
await kernel.stop();
});
it("sends compute to the correct worker on interval trigger", async () => {
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 500, on: null }],
});
createKernel(config, "/tmp/nerve-test");
const child = mockChildren[0];
vi.advanceTimersByTime(500);
expect(child.send).toHaveBeenCalledWith(
expect.objectContaining({ type: "compute", sense: "cpu-usage" }),
);
});
});
@@ -0,0 +1,117 @@
import { mkdtempSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { NerveConfig, Signal } from "@uncaged/nerve-core";
import { createLogStore } from "../log-store.js";
import type { LogStore } from "../log-store.js";
import { createReflexScheduler } from "../reflex-scheduler.js";
import { createSignalBus } from "../signal-bus.js";
describe("LogStore + ReflexScheduler integration", () => {
let tmpDir: string;
let logStore: LogStore;
beforeEach(() => {
tmpDir = mkdtempSync(join(tmpdir(), "nerve-log-int-"));
logStore = createLogStore(join(tmpDir, "logs.db"));
});
afterEach(() => {
logStore.close();
rmSync(tmpDir, { recursive: true, force: true });
});
it("logs run_start when reflex triggers a compute", () => {
const config: NerveConfig = {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
workflows: null,
};
const bus = createSignalBus();
const triggered: string[] = [];
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name), {
logStore,
});
const signal: Signal = { id: 1, senseId: "cpu-usage", payload: 42, ts: Date.now() };
bus.emit(signal);
const logs = logStore.query({ source: "reflex", type: "run_start" });
expect(logs).toHaveLength(1);
expect(logs[0].refId).toBe("cpu-usage");
expect(triggered).toHaveLength(1);
scheduler.stop();
});
it("interval reflex produces run_start logs", () => {
vi.useFakeTimers();
const config: NerveConfig = {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 1000, on: null }],
workflows: null,
};
const bus = createSignalBus();
const ref: { scheduler: ReturnType<typeof createReflexScheduler> | null } = { scheduler: null };
const scheduler = createReflexScheduler(
config,
bus,
(name) => {
ref.scheduler?.onComputeComplete(name);
},
{ logStore },
);
ref.scheduler = scheduler;
vi.advanceTimersByTime(3000);
const logs = logStore.query({ source: "reflex", type: "run_start" });
expect(logs.length).toBeGreaterThanOrEqual(3);
expect(logs.every((l) => l.refId === "cpu-usage")).toBe(true);
scheduler.stop();
vi.useRealTimers();
});
it("logs cannot trigger reflexes (architectural constraint)", () => {
const config: NerveConfig = {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
workflows: null,
};
const bus = createSignalBus();
const triggered: string[] = [];
const scheduler = createReflexScheduler(
config,
bus,
(name) => {
triggered.push(name);
scheduler.onComputeComplete(name);
},
{ logStore },
);
logStore.append({
source: "reflex",
type: "run_complete",
refId: "cpu-usage",
payload: '{"v":99}',
ts: Date.now(),
});
// Writing to the log store should NOT trigger any reflex.
// Only bus.emit(signal) triggers reflexes.
expect(triggered).toHaveLength(0);
scheduler.stop();
});
});
@@ -0,0 +1,214 @@
import { mkdtempSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { createLogStore } from "../log-store.js";
import type { LogStore } from "../log-store.js";
describe("LogStore", () => {
let tmpDir: string;
let store: LogStore;
beforeEach(() => {
tmpDir = mkdtempSync(join(tmpdir(), "nerve-log-test-"));
store = createLogStore(join(tmpDir, "data", "logs.db"));
});
afterEach(() => {
store.close();
rmSync(tmpDir, { recursive: true, force: true });
});
describe("append + query", () => {
it("appends an entry and returns it with an id", () => {
const entry = store.append({
source: "system",
type: "start",
refId: null,
payload: null,
ts: 1000,
});
expect(entry.id).toBe(1);
expect(entry.source).toBe("system");
expect(entry.type).toBe("start");
});
it("auto-increments ids", () => {
const e1 = store.append({
source: "system",
type: "start",
refId: null,
payload: null,
ts: 1000,
});
const e2 = store.append({
source: "system",
type: "stop",
refId: null,
payload: null,
ts: 2000,
});
expect(e2.id).toBe((e1.id ?? 0) + 1);
});
it("returns all entries when queried with no filter", () => {
store.append({ source: "system", type: "start", refId: null, payload: null, ts: 1000 });
store.append({ source: "reflex", type: "run_start", refId: "cpu", payload: null, ts: 2000 });
store.append({
source: "reflex",
type: "run_complete",
refId: "cpu",
payload: '{"v":42}',
ts: 3000,
});
const all = store.query();
expect(all).toHaveLength(3);
});
});
describe("query filters", () => {
beforeEach(() => {
store.append({ source: "system", type: "start", refId: null, payload: null, ts: 1000 });
store.append({ source: "reflex", type: "run_start", refId: "cpu", payload: null, ts: 2000 });
store.append({
source: "reflex",
type: "run_complete",
refId: "cpu",
payload: '{"v":42}',
ts: 3000,
});
store.append({
source: "system",
type: "error",
refId: "disk",
payload: '{"error":"fail"}',
ts: 4000,
});
store.append({ source: "system", type: "stop", refId: null, payload: null, ts: 5000 });
});
it("filters by source", () => {
const results = store.query({ source: "reflex" });
expect(results).toHaveLength(2);
expect(results.every((r) => r.source === "reflex")).toBe(true);
});
it("filters by type", () => {
const results = store.query({ type: "error" });
expect(results).toHaveLength(1);
expect(results[0].refId).toBe("disk");
});
it("filters by refId", () => {
const results = store.query({ refId: "cpu" });
expect(results).toHaveLength(2);
});
it("filters by since (inclusive)", () => {
const results = store.query({ since: 3000 });
expect(results).toHaveLength(3);
expect(results[0].ts).toBe(3000);
});
it("filters by until (inclusive)", () => {
const results = store.query({ until: 2000 });
expect(results).toHaveLength(2);
});
it("filters by since + until range", () => {
const results = store.query({ since: 2000, until: 4000 });
expect(results).toHaveLength(3);
});
it("applies limit", () => {
const results = store.query({ limit: 2 });
expect(results).toHaveLength(2);
expect(results[0].type).toBe("start");
expect(results[1].type).toBe("run_start");
});
it("combines multiple filters", () => {
const results = store.query({ source: "system", since: 4000 });
expect(results).toHaveLength(2);
expect(results[0].type).toBe("error");
expect(results[1].type).toBe("stop");
});
it("returns empty array when no matches", () => {
const results = store.query({ source: "workflow" });
expect(results).toHaveLength(0);
});
});
describe("query ordering", () => {
it("returns entries in insertion order (ascending id)", () => {
store.append({ source: "system", type: "start", refId: null, payload: null, ts: 5000 });
store.append({ source: "reflex", type: "run_start", refId: "a", payload: null, ts: 1000 });
const all = store.query();
expect(all[0].ts).toBe(5000);
expect(all[1].ts).toBe(1000);
});
});
describe("meta table", () => {
it("returns null for nonexistent key", () => {
expect(store.getMeta("missing")).toBeNull();
});
it("sets and gets a value", () => {
store.setMeta("archived_up_to", "2026-03-22");
expect(store.getMeta("archived_up_to")).toBe("2026-03-22");
});
it("upserts on duplicate key", () => {
store.setMeta("version", "1");
store.setMeta("version", "2");
expect(store.getMeta("version")).toBe("2");
});
it("supports multiple keys", () => {
store.setMeta("a", "1");
store.setMeta("b", "2");
expect(store.getMeta("a")).toBe("1");
expect(store.getMeta("b")).toBe("2");
});
});
describe("append-only semantics", () => {
it("ids are always increasing", () => {
const entries = Array.from({ length: 10 }, (_, i) =>
store.append({ source: "system", type: "test", refId: null, payload: null, ts: i }),
);
for (let i = 1; i < entries.length; i++) {
expect(entries[i].id).toBeGreaterThan(entries[i - 1].id ?? 0);
}
});
});
describe("payload JSON round-trip", () => {
it("preserves JSON payload", () => {
const payload = JSON.stringify({ cpu: 95, host: "node-1" });
store.append({ source: "reflex", type: "run_complete", refId: "cpu", payload, ts: 1000 });
const results = store.query({ refId: "cpu" });
expect(results).toHaveLength(1);
expect(JSON.parse(results[0].payload ?? "null")).toEqual({ cpu: 95, host: "node-1" });
});
});
describe("creates parent directories", () => {
it("creates nested directory structure for db path", () => {
const deepPath = join(tmpDir, "a", "b", "c", "test.db");
const deepStore = createLogStore(deepPath);
deepStore.append({ source: "system", type: "start", refId: null, payload: null, ts: 1000 });
expect(deepStore.query()).toHaveLength(1);
deepStore.close();
});
});
});
@@ -0,0 +1,361 @@
/**
* Phase 6 integration tests — hot reload, error isolation, grace period, health.
*/
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import type { Signal } from "@uncaged/nerve-core";
import type { NerveConfig } from "@uncaged/nerve-core";
import { afterEach, describe, expect, it } from "vitest";
import { createKernel } from "../kernel.js";
import type { Kernel } from "../kernel.js";
const __filename = fileURLToPath(import.meta.url);
const __dir = dirname(__filename);
const MOCK_WORKER = join(__dir, "fixtures", "mock-worker.mjs");
const ERROR_WORKER = join(__dir, "fixtures", "error-worker.mjs");
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
return {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
...overrides,
};
}
async function pollUntil(
predicate: () => boolean,
timeoutMs: number,
intervalMs = 50,
): Promise<void> {
return new Promise((resolve, reject) => {
const timer = setTimeout(
() => reject(new Error(`pollUntil timed out after ${timeoutMs}ms`)),
timeoutMs,
);
const check = setInterval(() => {
if (predicate()) {
clearTimeout(timer);
clearInterval(check);
resolve();
}
}, intervalMs);
});
}
// ---------------------------------------------------------------------------
// Hot Reload — restartGroup
// ---------------------------------------------------------------------------
describe("phase6 — restartGroup", () => {
let kernel: Kernel | null = null;
afterEach(async () => {
if (kernel !== null) {
await kernel.stop();
kernel = null;
}
});
it("restartGroup stops old worker and spawns a new one", async () => {
const config = makeConfig();
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
workerScript: MOCK_WORKER,
});
await kernel.ready;
const oldPid = kernel.getWorkerPid("system");
expect(oldPid).not.toBeNull();
await kernel.restartGroup("system");
// Wait for new worker to become ready
await pollUntil(() => {
const newPid = kernel?.getWorkerPid("system");
return newPid !== null && newPid !== oldPid;
}, 5000);
const newPid = kernel.getWorkerPid("system");
expect(newPid).not.toBeNull();
expect(newPid).not.toBe(oldPid);
// Verify new worker is functional
const received: Signal[] = [];
const unsub = kernel.bus.subscribe((s) => received.push(s));
kernel.triggerCompute("cpu-usage");
await pollUntil(() => received.length > 0, 3000);
expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
unsub();
}, 15_000);
it("restartGroup on nonexistent group does nothing", async () => {
const config = makeConfig();
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
workerScript: MOCK_WORKER,
});
await kernel.ready;
// Should not throw
await kernel.restartGroup("nonexistent");
}, 5_000);
});
// ---------------------------------------------------------------------------
// Hot Reload — reloadConfig
// ---------------------------------------------------------------------------
describe("phase6 — reloadConfig", () => {
let kernel: Kernel | null = null;
afterEach(async () => {
if (kernel !== null) {
await kernel.stop();
kernel = null;
}
});
it("adds new group when new sense group is introduced", async () => {
const config = makeConfig();
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
workerScript: MOCK_WORKER,
});
await kernel.ready;
expect(kernel.groups.has("network")).toBe(false);
const newConfig: NerveConfig = {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
};
kernel.reloadConfig(newConfig);
expect(kernel.groups.has("network")).toBe(true);
// Wait for the new network worker to start
await pollUntil(() => kernel?.getWorkerPid("network") !== null, 3000);
expect(kernel.getWorkerPid("network")).not.toBeNull();
}, 10_000);
it("removes group when all its senses are removed", async () => {
const config: NerveConfig = {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
};
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
workerScript: MOCK_WORKER,
});
await kernel.ready;
expect(kernel.groups.has("network")).toBe(true);
const newConfig: NerveConfig = {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
};
kernel.reloadConfig(newConfig);
expect(kernel.groups.has("network")).toBe(false);
}, 10_000);
});
// ---------------------------------------------------------------------------
// Error Isolation
// ---------------------------------------------------------------------------
describe("phase6 — error isolation", () => {
let kernel: Kernel | null = null;
afterEach(async () => {
if (kernel !== null) {
await kernel.stop();
kernel = null;
}
});
it("error from one sense does not crash the worker — other senses still work", async () => {
const config: NerveConfig = {
senses: {
"good-sense": { group: "mixed", throttle: null, timeout: null, gracePeriod: null },
"bad-sense": { group: "mixed", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
};
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
workerScript: MOCK_WORKER,
});
await kernel.ready;
// Both senses go through the same worker (mock-worker responds to all compute with signal)
const received: Signal[] = [];
const unsub = kernel.bus.subscribe((s) => received.push(s));
kernel.triggerCompute("good-sense");
await pollUntil(() => received.length > 0, 3000);
expect(received[0]).toMatchObject({ senseId: "good-sense" });
kernel.triggerCompute("bad-sense");
await pollUntil(() => received.length > 1, 3000);
expect(received[1]).toMatchObject({ senseId: "bad-sense" });
unsub();
}, 10_000);
it("error worker sends error messages, kernel still running", async () => {
const stderrMessages: string[] = [];
const stderrSpy = ((original) => {
return (chunk: string | Uint8Array) => {
stderrMessages.push(String(chunk));
return original.call(process.stderr, chunk);
};
})(process.stderr.write);
const origWrite = process.stderr.write;
process.stderr.write = stderrSpy as typeof process.stderr.write;
const config = makeConfig();
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
workerScript: ERROR_WORKER,
});
await kernel.ready;
kernel.triggerCompute("cpu-usage");
// Wait for the error to be logged
await pollUntil(() => stderrMessages.some((m) => m.includes("simulated compute error")), 3000);
process.stderr.write = origWrite;
// Kernel should still be running (not crashed)
expect(kernel.getWorkerPid("system")).not.toBeNull();
}, 10_000);
});
// ---------------------------------------------------------------------------
// getHealth
// ---------------------------------------------------------------------------
describe("phase6 — getHealth", () => {
let kernel: Kernel | null = null;
afterEach(async () => {
if (kernel !== null) {
await kernel.stop();
kernel = null;
}
});
it("returns health snapshot with correct shape", async () => {
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"disk-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
},
});
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
workerScript: MOCK_WORKER,
});
await kernel.ready;
const health = kernel.getHealth();
expect(health.uptime).toBeGreaterThanOrEqual(0);
expect(health.activeSenses).toBe(3);
expect(health.activeGroups).toBe(2);
expect(health.memoryUsage).toBeDefined();
expect(typeof health.memoryUsage.heapUsed).toBe("number");
}, 10_000);
it("health reflects config changes after reloadConfig", async () => {
const config = makeConfig();
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
workerScript: MOCK_WORKER,
});
await kernel.ready;
expect(kernel.getHealth().activeSenses).toBe(1);
const newConfig: NerveConfig = {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
};
kernel.reloadConfig(newConfig);
expect(kernel.getHealth().activeSenses).toBe(2);
expect(kernel.getHealth().activeGroups).toBe(2);
}, 10_000);
});
// ---------------------------------------------------------------------------
// Auto-respawn on crash (existing test extended)
// ---------------------------------------------------------------------------
describe("phase6 — auto-respawn on worker crash", () => {
let kernel: Kernel | null = null;
afterEach(async () => {
if (kernel !== null) {
await kernel.stop();
kernel = null;
}
});
it("kernel auto-respawns worker and new worker is functional", async () => {
const config = makeConfig();
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
workerScript: MOCK_WORKER,
});
await kernel.ready;
const originalPid = kernel.getWorkerPid("system");
expect(originalPid).not.toBeNull();
// Kill worker to simulate crash
process.kill(originalPid as number, "SIGKILL");
// Wait for respawn
await pollUntil(() => {
const pid = kernel?.getWorkerPid("system");
return pid !== null && pid !== originalPid;
}, 5000);
const newPid = kernel.getWorkerPid("system");
expect(newPid).not.toBeNull();
expect(newPid).not.toBe(originalPid);
// Verify new worker responds
const received: Signal[] = [];
const unsub = kernel.bus.subscribe((s) => received.push(s));
kernel.triggerCompute("cpu-usage");
await pollUntil(() => received.length > 0, 5000);
expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
unsub();
await kernel.stop();
kernel = null;
}, 15_000);
});
@@ -0,0 +1,120 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { NerveConfig, Signal } from "@uncaged/nerve-core";
import { createReflexScheduler } from "../reflex-scheduler.js";
import { createSignalBus } from "../signal-bus.js";
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
return {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
...overrides,
};
}
function makeSignal(senseId: string, payload: unknown = 1): Signal {
return { id: 1, senseId, payload, ts: Date.now() };
}
describe("ReflexScheduler — throttle + pending deferred trigger", () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});
it("trigger during throttle window fires deferred trigger after window ends", () => {
const triggered: string[] = [];
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: 2000, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
// First trigger fires immediately (outside throttle window)
bus.emit(makeSignal("cpu-usage"));
scheduler.onComputeComplete("cpu-usage");
expect(triggered.length).toBe(1);
// Second trigger arrives 500ms later — still within throttle window (2000ms)
vi.advanceTimersByTime(500);
bus.emit(makeSignal("cpu-usage"));
// Should not fire yet (throttled)
expect(triggered.length).toBe(1);
// Advance past the throttle window end; deferred trigger should now fire
vi.advanceTimersByTime(1600);
expect(triggered.length).toBe(2);
scheduler.stop();
});
it("multiple triggers during throttle window only produce one deferred trigger", () => {
const triggered: string[] = [];
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: 2000, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
// First trigger fires
bus.emit(makeSignal("cpu-usage"));
scheduler.onComputeComplete("cpu-usage");
expect(triggered.length).toBe(1);
// Multiple triggers within throttle window — should not stack
vi.advanceTimersByTime(300);
bus.emit(makeSignal("cpu-usage"));
vi.advanceTimersByTime(300);
bus.emit(makeSignal("cpu-usage"));
vi.advanceTimersByTime(300);
bus.emit(makeSignal("cpu-usage"));
expect(triggered.length).toBe(1);
// Advance past window — exactly one deferred trigger fires
vi.advanceTimersByTime(1200);
scheduler.onComputeComplete("cpu-usage");
expect(triggered.length).toBe(2);
scheduler.stop();
});
it("deferred trigger does not fire after stop()", () => {
const triggered: string[] = [];
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: 2000, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
bus.emit(makeSignal("cpu-usage"));
scheduler.onComputeComplete("cpu-usage");
expect(triggered.length).toBe(1);
// Trigger during throttle window — schedules deferred
vi.advanceTimersByTime(500);
bus.emit(makeSignal("cpu-usage"));
expect(triggered.length).toBe(1);
// Stop before window ends
scheduler.stop();
// Advance past window — deferred timer should have been cleared
vi.advanceTimersByTime(2000);
expect(triggered.length).toBe(1);
});
});
@@ -0,0 +1,310 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { NerveConfig, Signal } from "@uncaged/nerve-core";
import { createReflexScheduler } from "../reflex-scheduler.js";
import { createSignalBus } from "../signal-bus.js";
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
return {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"disk-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"system-health": { group: "derived", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
...overrides,
};
}
function makeSignal(senseId: string, payload: unknown = 1): Signal {
return { id: 1, senseId, payload, ts: Date.now() };
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describe("ReflexScheduler — interval reflex", () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});
it("fires triggerFn on schedule", () => {
const triggered: string[] = [];
const config = makeConfig({
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 1000, on: null }],
});
const bus = createSignalBus();
// Use a ref so the triggerFn can call back into the scheduler
const ref: { scheduler: ReturnType<typeof createReflexScheduler> | null } = {
scheduler: null,
};
const scheduler = createReflexScheduler(config, bus, (name) => {
triggered.push(name);
// Immediately complete compute so the scheduler is not blocked by in-flight state
ref.scheduler?.onComputeComplete(name);
});
ref.scheduler = scheduler;
vi.advanceTimersByTime(3000);
expect(triggered.length).toBeGreaterThanOrEqual(3);
expect(triggered.every((n) => n === "cpu-usage")).toBe(true);
scheduler.stop();
});
it("stops firing after stop() is called", () => {
const triggered: string[] = [];
const config = makeConfig({
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 500, on: null }],
});
const bus = createSignalBus();
const ref: { scheduler: ReturnType<typeof createReflexScheduler> | null } = {
scheduler: null,
};
const scheduler = createReflexScheduler(config, bus, (name) => {
triggered.push(name);
ref.scheduler?.onComputeComplete(name);
});
ref.scheduler = scheduler;
vi.advanceTimersByTime(1000);
const countBefore = triggered.length;
scheduler.stop();
vi.advanceTimersByTime(2000);
expect(triggered.length).toBe(countBefore);
});
it("starts from current time — does not compensate for past intervals", () => {
const triggered: string[] = [];
const config = makeConfig({
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 1000, on: null }],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
// Only advance 500ms — should be 0 triggers (not catching up)
vi.advanceTimersByTime(500);
expect(triggered.length).toBe(0);
scheduler.stop();
});
});
describe("ReflexScheduler — event (on) reflex", () => {
it("triggers target sense when watched sense emits a signal", () => {
const triggered: string[] = [];
const config = makeConfig({
reflexes: [
{ kind: "sense", sense: "system-health", interval: null, on: ["cpu-usage", "disk-usage"] },
],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
bus.emit(makeSignal("cpu-usage"));
scheduler.onComputeComplete("system-health");
bus.emit(makeSignal("disk-usage"));
scheduler.onComputeComplete("system-health");
expect(triggered.length).toBe(2);
expect(triggered.every((n) => n === "system-health")).toBe(true);
scheduler.stop();
});
it("does not trigger for signals from non-watched senses", () => {
const triggered: string[] = [];
const config = makeConfig({
reflexes: [{ kind: "sense", sense: "system-health", interval: null, on: ["cpu-usage"] }],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
bus.emit(makeSignal("disk-usage"));
expect(triggered.length).toBe(0);
scheduler.stop();
});
it("stops responding after stop()", () => {
const triggered: string[] = [];
const config = makeConfig({
reflexes: [{ kind: "sense", sense: "system-health", interval: null, on: ["cpu-usage"] }],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
scheduler.stop();
bus.emit(makeSignal("cpu-usage"));
expect(triggered.length).toBe(0);
});
});
describe("ReflexScheduler — throttle", () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});
it("skips rapid triggers within throttle window", () => {
const triggered: string[] = [];
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: 2000, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
bus.emit(makeSignal("cpu-usage"));
scheduler.onComputeComplete("cpu-usage");
// Immediately trigger again — within throttle window
bus.emit(makeSignal("cpu-usage"));
expect(triggered.length).toBe(1);
scheduler.stop();
});
it("allows trigger after throttle window has passed", () => {
const triggered: string[] = [];
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: 1000, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
bus.emit(makeSignal("cpu-usage"));
scheduler.onComputeComplete("cpu-usage");
vi.advanceTimersByTime(1500);
bus.emit(makeSignal("cpu-usage"));
scheduler.onComputeComplete("cpu-usage");
expect(triggered.length).toBe(2);
scheduler.stop();
});
});
describe("ReflexScheduler — merge/coalesce", () => {
it("concurrent triggers collapse to at most one pending run", () => {
const triggered: string[] = [];
const config = makeConfig({
reflexes: [{ kind: "sense", sense: "system-health", interval: null, on: ["cpu-usage"] }],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
// First trigger starts compute
bus.emit(makeSignal("cpu-usage"));
expect(triggered.length).toBe(1);
// Three more arrive while first is in-flight — all should coalesce to one pending
bus.emit(makeSignal("cpu-usage"));
bus.emit(makeSignal("cpu-usage"));
bus.emit(makeSignal("cpu-usage"));
expect(triggered.length).toBe(1);
// Complete first compute → pending drains as exactly one more run
scheduler.onComputeComplete("system-health");
expect(triggered.length).toBe(2);
// Complete second compute → no more pending
scheduler.onComputeComplete("system-health");
expect(triggered.length).toBe(2);
scheduler.stop();
});
it("no new trigger while in-flight without pending → no extra run after complete", () => {
const triggered: string[] = [];
const config = makeConfig({
reflexes: [{ kind: "sense", sense: "system-health", interval: null, on: ["cpu-usage"] }],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
bus.emit(makeSignal("cpu-usage"));
expect(triggered.length).toBe(1);
// Complete with no pending
scheduler.onComputeComplete("system-health");
expect(triggered.length).toBe(1);
scheduler.stop();
});
});
describe("ReflexScheduler — interval + on combined", () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});
it("fires both via interval and event", () => {
const triggered: string[] = [];
const config = makeConfig({
reflexes: [{ kind: "sense", sense: "system-health", interval: 1000, on: ["cpu-usage"] }],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
// Event trigger
bus.emit(makeSignal("cpu-usage"));
scheduler.onComputeComplete("system-health");
// Interval trigger
vi.advanceTimersByTime(1000);
scheduler.onComputeComplete("system-health");
expect(triggered.length).toBeGreaterThanOrEqual(2);
scheduler.stop();
});
});
describe("ReflexScheduler — workflow reflexes ignored", () => {
it("does not set up any scheduling for workflow kind reflexes", () => {
const triggered: string[] = [];
const config: NerveConfig = {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "workflow", workflow: "my-workflow", on: ["cpu-usage"] }],
workflows: {
"my-workflow": { concurrency: 1, overflow: "drop" },
},
};
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
bus.emit(makeSignal("cpu-usage"));
expect(triggered.length).toBe(0);
scheduler.stop();
});
});
@@ -0,0 +1,99 @@
import { describe, expect, it, vi } from "vitest";
import type { Signal } from "@uncaged/nerve-core";
import { createSignalBus } from "../signal-bus.js";
function makeSignal(senseId: string, payload: unknown = 1): Signal {
return { id: 1, senseId, payload, ts: Date.now() };
}
describe("createSignalBus", () => {
it("delivers emitted signal to a subscriber", () => {
const bus = createSignalBus();
const received: Signal[] = [];
bus.subscribe((s) => received.push(s));
const sig = makeSignal("cpu-usage", 42);
bus.emit(sig);
expect(received).toHaveLength(1);
expect(received[0]).toBe(sig);
});
it("delivers to multiple subscribers", () => {
const bus = createSignalBus();
const a: Signal[] = [];
const b: Signal[] = [];
bus.subscribe((s) => a.push(s));
bus.subscribe((s) => b.push(s));
bus.emit(makeSignal("cpu-usage"));
expect(a).toHaveLength(1);
expect(b).toHaveLength(1);
});
it("unsubscribe stops delivery", () => {
const bus = createSignalBus();
const received: Signal[] = [];
const unsub = bus.subscribe((s) => received.push(s));
bus.emit(makeSignal("cpu-usage"));
unsub();
bus.emit(makeSignal("cpu-usage"));
expect(received).toHaveLength(1);
});
it("remaining subscribers still receive after one unsubscribes", () => {
const bus = createSignalBus();
const a: Signal[] = [];
const b: Signal[] = [];
const unsubA = bus.subscribe((s) => a.push(s));
bus.subscribe((s) => b.push(s));
unsubA();
bus.emit(makeSignal("cpu-usage"));
expect(a).toHaveLength(0);
expect(b).toHaveLength(1);
});
it("emit with no subscribers does nothing", () => {
const bus = createSignalBus();
expect(() => bus.emit(makeSignal("cpu-usage"))).not.toThrow();
});
it("dispatch is synchronous", () => {
const bus = createSignalBus();
const order: string[] = [];
bus.subscribe(() => order.push("handler"));
order.push("before");
bus.emit(makeSignal("cpu-usage"));
order.push("after");
expect(order).toEqual(["before", "handler", "after"]);
});
it("handler exceptions don't prevent other handlers from running", () => {
const bus = createSignalBus();
const received: Signal[] = [];
bus.subscribe(() => {
throw new Error("boom");
});
bus.subscribe((s) => received.push(s));
expect(() => bus.emit(makeSignal("cpu-usage"))).toThrow("boom");
expect(received).toHaveLength(1);
});
it("same handler can be subscribed once and fires once per emit", () => {
const bus = createSignalBus();
const handler = vi.fn();
bus.subscribe(handler);
bus.emit(makeSignal("cpu-usage"));
bus.emit(makeSignal("cpu-usage"));
expect(handler).toHaveBeenCalledTimes(2);
});
});
+113
View File
@@ -0,0 +1,113 @@
/**
* File Watcher — watches nerveRoot for file changes and signals the kernel.
*
* Uses Node.js fs.watch (no external dependencies).
*
* Watched events:
* - .ts file under senses/ modified → callback with { kind: "sense", senseName, filePath }
* - nerve.yaml modified → callback with { kind: "config", filePath }
*
* Debounces rapid changes (e.g. editor save flicker) with a configurable delay.
*
* Linux compatibility note: fs.watch with { recursive: true } relies on inotify, which may
* not reliably report the filename for events in deeply nested subdirectories. On Linux,
* the `filename` argument passed to the callback can be null for some inotify events, and
* recursive watching is only available in Node.js ≥22 on Linux. If null-filename events
* become a problem, consider using a dedicated watcher library (e.g. chokidar).
*/
import { watch } from "node:fs";
import type { FSWatcher } from "node:fs";
import { join, relative, sep } from "node:path";
export type SenseFileChange = {
kind: "sense";
senseName: string;
filePath: string;
};
export type ConfigFileChange = {
kind: "config";
filePath: string;
};
export type FileChange = SenseFileChange | ConfigFileChange;
export type FileChangeHandler = (change: FileChange) => void;
export type FileWatcher = {
close: () => void;
};
const DEFAULT_DEBOUNCE_MS = 300;
export function createFileWatcher(
nerveRoot: string,
handler: FileChangeHandler,
debounceMs: number = DEFAULT_DEBOUNCE_MS,
): FileWatcher {
const watchers: FSWatcher[] = [];
const debounceTimers = new Map<string, ReturnType<typeof setTimeout>>();
function debounced(key: string, fn: () => void): void {
const existing = debounceTimers.get(key);
if (existing !== undefined) clearTimeout(existing);
debounceTimers.set(
key,
setTimeout(() => {
debounceTimers.delete(key);
fn();
}, debounceMs),
);
}
function handleFsEvent(_eventType: string, filename: string | null): void {
if (filename === null) return;
const normalized = filename.split(sep).join("/");
if (normalized === "nerve.yaml") {
debounced("config", () => {
handler({ kind: "config", filePath: join(nerveRoot, "nerve.yaml") });
});
return;
}
if (normalized.startsWith("senses/") && normalized.endsWith(".ts")) {
const rel = relative("senses", normalized);
const senseName = rel.split("/")[0];
if (senseName) {
debounced(`sense:${senseName}`, () => {
handler({
kind: "sense",
senseName,
filePath: join(nerveRoot, filename),
});
});
}
}
}
try {
const w = watch(nerveRoot, { recursive: true }, (eventType, filename) => {
handleFsEvent(eventType, filename);
});
watchers.push(w);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[file-watcher] Failed to watch "${nerveRoot}": ${msg}\n`);
}
function close(): void {
for (const timer of debounceTimers.values()) {
clearTimeout(timer);
}
debounceTimers.clear();
for (const w of watchers) {
w.close();
}
watchers.length = 0;
}
return { close };
}
+13
View File
@@ -1,6 +1,8 @@
export type {
ComputeMessage,
ShutdownMessage,
HealthRequestMessage,
HealthResponseMessage,
ParentToWorkerMessage,
SignalMessage,
ErrorMessage,
@@ -8,6 +10,8 @@ export type {
WorkerToParentMessage,
} from "./ipc.js";
export type { SignalBus, SignalHandler, Unsubscribe } from "./signal-bus.js";
export type { ComputeFn, DrizzleDB, PeerMap, SenseRuntime } from "./sense-runtime.js";
export {
@@ -17,3 +21,12 @@ export {
loadComputeFn,
executeCompute,
} from "./sense-runtime.js";
export { createKernel } from "./kernel.js";
export type { Kernel, KernelOptions, KernelHealth } from "./kernel.js";
export { createFileWatcher } from "./file-watcher.js";
export type { FileWatcher, FileChange, FileChangeHandler } from "./file-watcher.js";
export { createLogStore } from "./log-store.js";
export type { LogStore, LogEntry, LogQuery } from "./log-store.js";
+75 -5
View File
@@ -17,8 +17,13 @@ export type ShutdownMessage = {
type: "shutdown";
};
/** Parent → Worker: request health info from worker */
export type HealthRequestMessage = {
type: "health-request";
};
/** Union of all messages the parent sends to a worker */
export type ParentToWorkerMessage = ComputeMessage | ShutdownMessage;
export type ParentToWorkerMessage = ComputeMessage | ShutdownMessage | HealthRequestMessage;
/** Worker → Parent: compute produced a signal */
export type SignalMessage = {
@@ -39,8 +44,21 @@ export type ReadyMessage = {
type: "ready";
};
/** Worker → Parent: health info response */
export type HealthResponseMessage = {
type: "health-response";
senses: string[];
inFlightCount: number;
};
/** Union of all messages a worker sends to the parent */
export type WorkerToParentMessage = SignalMessage | ErrorMessage | ReadyMessage;
export type WorkerToParentMessage =
| SignalMessage
| ErrorMessage
| ReadyMessage
| HealthResponseMessage;
const PARENT_MSG_TYPES = new Set(["compute", "shutdown", "health-request"]);
/** Validate and parse an unknown IPC message received from the parent process. */
export function parseParentMessage(raw: unknown): Result<ParentToWorkerMessage> {
@@ -51,9 +69,61 @@ export function parseParentMessage(raw: unknown): Result<ParentToWorkerMessage>
if (typeof obj.type !== "string") {
return err(new Error("IPC message missing string 'type' field"));
}
const type = obj.type;
if (type !== "compute" && type !== "shutdown") {
return err(new Error(`Unknown IPC message type: "${type}"`));
if (!PARENT_MSG_TYPES.has(obj.type)) {
return err(new Error(`Unknown IPC message type: "${obj.type}"`));
}
return ok(raw as ParentToWorkerMessage);
}
function parseSignalMsg(obj: Record<string, unknown>, raw: unknown): Result<WorkerToParentMessage> {
if (typeof obj.sense !== "string") {
return err(new Error("Worker 'signal' message missing string 'sense' field"));
}
if (!("payload" in obj)) {
return err(new Error("Worker 'signal' message missing 'payload' field"));
}
return ok(raw as SignalMessage);
}
function parseErrorMsg(obj: Record<string, unknown>, raw: unknown): Result<WorkerToParentMessage> {
if (typeof obj.sense !== "string") {
return err(new Error("Worker 'error' message missing string 'sense' field"));
}
if (typeof obj.error !== "string") {
return err(new Error("Worker 'error' message missing string 'error' field"));
}
return ok(raw as ErrorMessage);
}
function parseHealthResponseMsg(
obj: Record<string, unknown>,
raw: unknown,
): Result<WorkerToParentMessage> {
if (!Array.isArray(obj.senses)) {
return err(new Error("Worker 'health-response' message missing 'senses' array"));
}
if (typeof obj.inFlightCount !== "number") {
return err(new Error("Worker 'health-response' message missing 'inFlightCount' number"));
}
return ok(raw as HealthResponseMessage);
}
const WORKER_MSG_TYPES = new Set(["signal", "error", "ready", "health-response"]);
/** Validate and parse an unknown IPC message received from a worker process. */
export function parseWorkerMessage(raw: unknown): Result<WorkerToParentMessage> {
if (raw === null || typeof raw !== "object") {
return err(new Error("Worker IPC message is not an object"));
}
const obj = raw as Record<string, unknown>;
if (typeof obj.type !== "string") {
return err(new Error("Worker IPC message missing string 'type' field"));
}
if (!WORKER_MSG_TYPES.has(obj.type)) {
return err(new Error(`Unknown worker IPC message type: "${obj.type}"`));
}
if (obj.type === "signal") return parseSignalMsg(obj, raw);
if (obj.type === "error") return parseErrorMsg(obj, raw);
if (obj.type === "health-response") return parseHealthResponseMsg(obj, raw);
return ok({ type: "ready" });
}
+479
View File
@@ -0,0 +1,479 @@
/**
* Kernel — the main orchestrator that ties sense workers, signal bus, and
* reflex scheduler together.
*
* Responsibilities:
* - Spawn one child process per sense group (via fork)
* - Route SignalMessage from workers → SignalBus
* - Route ErrorMessage from workers → stderr log
* - Drive compute triggers via ReflexScheduler
* - Graceful shutdown: stop scheduler, send shutdown to all workers
* - Hot reload: restartGroup, reloadConfig, file watcher integration
* - Health reporting: getHealth
*/
import { fork } from "node:child_process";
import type { ChildProcess } from "node:child_process";
import { readFileSync } from "node:fs";
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import type { NerveConfig, Signal } from "@uncaged/nerve-core";
import { parseNerveConfig } from "@uncaged/nerve-core";
import { createFileWatcher } from "./file-watcher.js";
import type { FileWatcher } from "./file-watcher.js";
import type { ComputeMessage, ShutdownMessage } from "./ipc.js";
import { parseWorkerMessage } from "./ipc.js";
import { createLogStore } from "./log-store.js";
import type { LogStore } from "./log-store.js";
import { createReflexScheduler } from "./reflex-scheduler.js";
import type { ReflexScheduler } from "./reflex-scheduler.js";
import { createSignalBus } from "./signal-bus.js";
import type { SignalBus } from "./signal-bus.js";
export type KernelHealth = {
uptime: number;
activeSenses: number;
activeGroups: number;
pendingComputes: number;
memoryUsage: NodeJS.MemoryUsage;
};
export type Kernel = {
stop: () => Promise<void>;
groups: Set<string>;
senseCount: number;
bus: SignalBus;
logStore: LogStore;
/** Resolves when all workers have sent their initial "ready" message. */
ready: Promise<void>;
/** Returns the PID of the worker process for a given group, or null if not found. */
getWorkerPid: (group: string) => number | null;
/** Sends a compute message to the worker responsible for the given sense. */
triggerCompute: (senseName: string) => void;
/** Gracefully restart a group worker (wait for exit, then respawn). */
restartGroup: (group: string) => Promise<void>;
/** Reload config from a new NerveConfig, incrementally updating scheduler and workers.
* Note: any pending/throttled computes in the old scheduler are silently dropped on reload.
* In-flight state is not preserved across reloadConfig. */
reloadConfig: (newConfig: NerveConfig) => void;
/** Return daemon health info. */
getHealth: () => KernelHealth;
};
type WorkerEntry = {
group: string;
process: ChildProcess;
};
function resolveWorkerScript(): string {
const __filename = fileURLToPath(import.meta.url);
const __dir = dirname(__filename);
return join(__dir, "sense-worker.js");
}
function spawnWorker(nerveRoot: string, group: string, workerScript: string): ChildProcess {
return fork(workerScript, ["--group", group, "--root", nerveRoot], {
stdio: ["ignore", "inherit", "inherit", "ipc"],
});
}
function sendCompute(worker: ChildProcess, senseName: string): void {
// worker.connected is false when the IPC channel has been closed (e.g. worker crashed)
if (worker.connected === false) return;
const msg: ComputeMessage = { type: "compute", sense: senseName };
try {
worker.send(msg);
} catch {
// IPC channel closed between connected check and send
}
}
function sendShutdown(worker: ChildProcess): void {
if (worker.connected === false) return;
const msg: ShutdownMessage = { type: "shutdown" };
try {
worker.send(msg);
} catch {
// IPC channel closed between connected check and send
}
}
function groupForSense(config: NerveConfig, senseName: string): string | null {
const senseConfig = config.senses[senseName];
if (senseConfig === undefined) return null;
return senseConfig.group;
}
export type KernelOptions = {
workerScript: string;
enableFileWatcher?: boolean;
/** Override the LogStore instance (useful for testing). */
logStore?: LogStore;
};
function defaultKernelOptions(): KernelOptions {
return { workerScript: resolveWorkerScript(), enableFileWatcher: false };
}
export function createKernel(
initialConfig: NerveConfig,
nerveRoot: string,
options: KernelOptions = defaultKernelOptions(),
): Kernel {
const bus: SignalBus = createSignalBus();
const workerScript = options.workerScript;
const startTime = Date.now();
const logStore: LogStore = options.logStore ?? createLogStore(join(nerveRoot, "data", "logs.db"));
logStore.append({
source: "system",
type: "start",
refId: null,
payload: null,
ts: startTime,
});
let config = initialConfig;
let _signalIdCounter = 0;
function nextSignalId(): number {
_signalIdCounter += 1;
return _signalIdCounter;
}
const groups = new Set<string>();
for (const senseConfig of Object.values(config.senses)) {
groups.add(senseConfig.group);
}
const workers = new Map<string, WorkerEntry>();
let stopped = false;
let scheduler: ReflexScheduler = null as unknown as ReflexScheduler;
let readyResolve: (() => void) | undefined;
const ready = new Promise<void>((resolve) => {
readyResolve = resolve;
});
let pendingReadyCount = groups.size > 0 ? groups.size : 0;
function sensesForGroup(group: string): string[] {
return Object.entries(config.senses)
.filter(([, sc]) => sc.group === group)
.map(([name]) => name);
}
function handleWorkerMessage(raw: unknown): void {
const result = parseWorkerMessage(raw);
if (!result.ok) {
process.stderr.write(`[kernel] invalid worker message: ${result.error.message}\n`);
return;
}
const msg = result.value;
if (msg.type === "ready") {
pendingReadyCount -= 1;
if (pendingReadyCount <= 0) {
readyResolve?.();
}
return;
}
if (msg.type === "error") {
process.stderr.write(`[kernel] sense "${msg.sense}" error: ${msg.error}\n`);
logStore.append({
source: "system",
type: "error",
refId: msg.sense,
payload: JSON.stringify({ error: msg.error }),
ts: Date.now(),
});
scheduler.onComputeComplete(msg.sense);
return;
}
if (msg.type === "signal") {
const signal: Signal = {
id: nextSignalId(),
senseId: msg.sense,
payload: msg.payload,
ts: Date.now(),
};
logStore.append({
source: "reflex",
type: "run_complete",
refId: msg.sense,
payload: JSON.stringify(msg.payload),
ts: signal.ts,
});
bus.emit(signal);
scheduler.onComputeComplete(msg.sense);
}
// health-response is handled externally by the caller; no action needed here
}
function startWorker(group: string): Promise<void> {
const child = spawnWorker(nerveRoot, group, workerScript);
let workerReadyResolve: (() => void) | undefined;
const workerReady = new Promise<void>((resolve) => {
workerReadyResolve = resolve;
});
child.on("message", (raw: unknown) => {
const result = parseWorkerMessage(raw);
if (result.ok && result.value.type === "ready") {
workerReadyResolve?.();
}
handleWorkerMessage(raw);
});
child.on("exit", (code) => {
process.stderr.write(
`[kernel] worker for group "${group}" exited with code ${code ?? "null"}\n`,
);
// Resolve ready in case the worker exits before sending ready (prevents hangs)
workerReadyResolve?.();
if (!stopped && code !== 0) {
process.stderr.write(`[kernel] respawning worker for group "${group}" in 1s\n`);
for (const senseName of sensesForGroup(group)) {
scheduler.onComputeComplete(senseName);
}
setTimeout(() => {
if (!stopped) {
startWorker(group);
}
}, 1000);
}
});
workers.set(group, { group, process: child });
return workerReady;
}
function triggerFn(senseName: string): void {
const group = groupForSense(config, senseName);
if (group === null) {
process.stderr.write(`[kernel] triggerFn: unknown sense "${senseName}"\n`);
return;
}
const entry = workers.get(group);
if (entry === undefined) {
process.stderr.write(`[kernel] triggerFn: no worker for group "${group}"\n`);
return;
}
sendCompute(entry.process, senseName);
}
scheduler = createReflexScheduler(config, bus, triggerFn, { logStore });
if (groups.size === 0) {
readyResolve?.();
}
for (const group of groups) {
startWorker(group);
}
function waitForExit(child: ChildProcess, timeoutMs: number): Promise<void> {
return new Promise((resolve) => {
const timer = setTimeout(() => {
child.kill("SIGKILL");
resolve();
}, timeoutMs);
child.once("exit", () => {
clearTimeout(timer);
resolve();
});
});
}
// --- restartGroup: gracefully stop worker, then respawn and await ready ---
async function restartGroup(group: string): Promise<void> {
const entry = workers.get(group);
if (entry === undefined) return;
for (const senseName of sensesForGroup(group)) {
scheduler.onComputeComplete(senseName);
}
sendShutdown(entry.process);
await waitForExit(entry.process, 5000);
if (!stopped) {
await startWorker(group);
}
}
function collectGroups(cfg: NerveConfig): Set<string> {
const result = new Set<string>();
for (const sc of Object.values(cfg.senses)) {
result.add(sc.group);
}
return result;
}
function sensesForGroupInConfig(cfg: NerveConfig, group: string): Set<string> {
const result = new Set<string>();
for (const [name, sc] of Object.entries(cfg.senses)) {
if (sc.group === group) result.add(name);
}
return result;
}
function removeStaleGroups(oldGroups: Set<string>, newGroups: Set<string>): void {
for (const g of oldGroups) {
if (newGroups.has(g)) continue;
const entry = workers.get(g);
if (entry !== undefined) {
sendShutdown(entry.process);
workers.delete(g);
}
groups.delete(g);
}
}
function addNewGroups(oldGroups: Set<string>, newGroups: Set<string>): void {
for (const g of newGroups) {
if (oldGroups.has(g)) continue;
groups.add(g);
if (!stopped) startWorker(g);
}
}
function reloadConfig(newConfig: NerveConfig): void {
const oldGroups = collectGroups(config);
const oldConfig = config;
config = newConfig;
// Note: pending/throttled computes in the old scheduler are silently dropped here.
// In-flight state is not preserved across reloadConfig.
scheduler.stop();
scheduler = createReflexScheduler(config, bus, triggerFn, { logStore });
const newGroups = collectGroups(newConfig);
removeStaleGroups(oldGroups, newGroups);
addNewGroups(oldGroups, newGroups);
// Restart existing groups that gained new senses — the running worker process
// was spawned with the old config and will report "Unknown sense" for any newly
// added sense until it is restarted.
for (const g of newGroups) {
if (!oldGroups.has(g)) continue; // already handled by addNewGroups
const oldSenses = sensesForGroupInConfig(oldConfig, g);
const newSenses = sensesForGroupInConfig(newConfig, g);
const gained = [...newSenses].some((s) => !oldSenses.has(s));
if (gained) {
restartGroup(g).catch((e) => {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[kernel] reloadConfig restartGroup error for "${g}": ${msg}\n`);
});
}
}
}
function getHealth(): KernelHealth {
return {
uptime: Date.now() - startTime,
activeSenses: Object.keys(config.senses).length,
activeGroups: workers.size,
pendingComputes: 0,
memoryUsage: process.memoryUsage(),
};
}
function handleSenseFileChange(senseName: string): void {
const sc = config.senses[senseName];
if (sc === undefined) return;
process.stderr.write(
`[kernel] sense file changed: "${senseName}", restarting group "${sc.group}"\n`,
);
logStore.append({
source: "system",
type: "sense_reload",
refId: senseName,
payload: null,
ts: Date.now(),
});
restartGroup(sc.group).catch((e) => {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[kernel] restartGroup error: ${msg}\n`);
});
}
function handleConfigFileChange(): void {
process.stderr.write("[kernel] nerve.yaml changed, reloading config\n");
logStore.append({
source: "system",
type: "config_reload",
refId: null,
payload: null,
ts: Date.now(),
});
try {
const raw = readFileSync(join(nerveRoot, "nerve.yaml"), "utf8");
const parseResult = parseNerveConfig(raw);
if (!parseResult.ok) {
process.stderr.write(
`[kernel] config parse error, keeping current config: ${parseResult.error.message}\n`,
);
return;
}
reloadConfig(parseResult.value);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[kernel] failed to read nerve.yaml, keeping current config: ${msg}\n`);
}
}
let fileWatcher: FileWatcher | null = null;
if (options.enableFileWatcher) {
fileWatcher = createFileWatcher(nerveRoot, (change) => {
if (change.kind === "sense") handleSenseFileChange(change.senseName);
if (change.kind === "config") handleConfigFileChange();
});
}
async function stop(): Promise<void> {
stopped = true;
if (fileWatcher !== null) {
fileWatcher.close();
fileWatcher = null;
}
scheduler.stop();
const exitPromises: Promise<void>[] = [];
for (const entry of workers.values()) {
sendShutdown(entry.process);
exitPromises.push(waitForExit(entry.process, 5000));
}
await Promise.all(exitPromises);
logStore.append({
source: "system",
type: "stop",
refId: null,
payload: null,
ts: Date.now(),
});
logStore.close();
}
function getWorkerPid(group: string): number | null {
return workers.get(group)?.process.pid ?? null;
}
const senseCount = Object.keys(config.senses).length;
return {
stop,
groups,
senseCount,
bus,
logStore,
ready,
getWorkerPid,
triggerCompute: triggerFn,
restartGroup,
reloadConfig,
getHealth,
};
}
+150
View File
@@ -0,0 +1,150 @@
/**
* Log Store — append-only structured log storage backed by SQLite.
*
* Stores system, reflex, and workflow log entries in a single table.
* Logs are data assets for audit/analysis — they MUST NOT trigger reflexes.
*
* Also provides a `meta` key-value table for bookkeeping (e.g. archive watermarks).
*/
import { mkdirSync } from "node:fs";
import { dirname } from "node:path";
import Database from "better-sqlite3";
import type BetterSqlite3 from "better-sqlite3";
export type LogEntry = {
id?: number;
source: string;
type: string;
refId: string | null;
payload: string | null;
ts: number;
};
export type LogQuery = {
source?: string;
type?: string;
refId?: string;
since?: number;
until?: number;
limit?: number;
};
export type LogStore = {
append: (entry: Omit<LogEntry, "id">) => LogEntry;
query: (filter?: LogQuery) => LogEntry[];
getMeta: (key: string) => string | null;
setMeta: (key: string, value: string) => void;
close: () => void;
};
const SCHEMA_SQL = `
CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT NOT NULL,
type TEXT NOT NULL,
ref_id TEXT,
payload TEXT,
ts INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_logs_source_type ON logs(source, type);
CREATE INDEX IF NOT EXISTS idx_logs_ts ON logs(ts);
CREATE INDEX IF NOT EXISTS idx_logs_ref_id ON logs(ref_id);
CREATE TABLE IF NOT EXISTS meta (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
`;
export function createLogStore(dbPath: string): LogStore {
mkdirSync(dirname(dbPath), { recursive: true });
const sqlite: BetterSqlite3.Database = new Database(dbPath);
sqlite.pragma("journal_mode = WAL");
sqlite.exec(SCHEMA_SQL);
const insertStmt = sqlite.prepare(
"INSERT INTO logs (source, type, ref_id, payload, ts) VALUES (@source, @type, @refId, @payload, @ts)",
);
const getMetaStmt = sqlite.prepare("SELECT value FROM meta WHERE key = ?");
const setMetaStmt = sqlite.prepare(
"INSERT INTO meta (key, value) VALUES (@key, @value) ON CONFLICT(key) DO UPDATE SET value = @value",
);
function append(entry: Omit<LogEntry, "id">): LogEntry {
const info = insertStmt.run({
source: entry.source,
type: entry.type,
refId: entry.refId,
payload: entry.payload,
ts: entry.ts,
});
return { ...entry, id: Number(info.lastInsertRowid) };
}
function query(filter: LogQuery = {}): LogEntry[] {
const conditions: string[] = [];
const params: Record<string, unknown> = {};
if (filter.source !== undefined) {
conditions.push("source = @source");
params.source = filter.source;
}
if (filter.type !== undefined) {
conditions.push("type = @type");
params.type = filter.type;
}
if (filter.refId !== undefined) {
conditions.push("ref_id = @refId");
params.refId = filter.refId;
}
if (filter.since !== undefined) {
conditions.push("ts >= @since");
params.since = filter.since;
}
if (filter.until !== undefined) {
conditions.push("ts <= @until");
params.until = filter.until;
}
const where = conditions.length > 0 ? `WHERE ${conditions.join(" AND ")}` : "";
const limit = filter.limit !== undefined ? `LIMIT ${filter.limit}` : "";
const sql = `SELECT id, source, type, ref_id, payload, ts FROM logs ${where} ORDER BY id ASC ${limit}`;
const rows = sqlite.prepare(sql).all(params) as Array<{
id: number;
source: string;
type: string;
ref_id: string | null;
payload: string | null;
ts: number;
}>;
return rows.map((r) => ({
id: r.id,
source: r.source,
type: r.type,
refId: r.ref_id,
payload: r.payload,
ts: r.ts,
}));
}
function getMeta(key: string): string | null {
const row = getMetaStmt.get(key) as { value: string } | undefined;
return row?.value ?? null;
}
function setMeta(key: string, value: string): void {
setMetaStmt.run({ key, value });
}
function close(): void {
sqlite.close();
}
return { append, query, getMeta, setMeta, close };
}
+196
View File
@@ -0,0 +1,196 @@
/**
* Reflex Scheduler — drives sense compute cycles based on ReflexConfig.
*
* Supports:
* - interval: periodic setInterval-based triggering
* - on[]: event-driven triggering via the signal bus
* - throttle: skip triggers that arrive too soon after the last compute
* - merge/coalesce: if compute is in-flight, record one pending trigger;
* run it once after the current compute completes (no unbounded queue)
*/
import type { NerveConfig } from "@uncaged/nerve-core";
import type { LogStore } from "./log-store.js";
import type { SignalBus, Unsubscribe } from "./signal-bus.js";
/** Sends a compute message to the worker responsible for the given sense. */
export type TriggerFn = (senseName: string) => void;
/** Per-sense mutable state tracked by the scheduler. */
type SenseState = {
lastComputeAt: number;
inFlight: boolean;
pending: boolean;
deferredTimer: ReturnType<typeof setTimeout> | null;
};
/** Handle returned by createReflexScheduler — call stop() for graceful shutdown. */
export type ReflexScheduler = {
/** Notify scheduler that a compute cycle finished. Drains the pending flag. */
onComputeComplete: (senseName: string) => void;
stop: () => void;
};
function makeSenseState(): SenseState {
return { lastComputeAt: 0, inFlight: false, pending: false, deferredTimer: null };
}
export type ReflexSchedulerOptions = {
logStore?: LogStore;
};
/**
* Create and start a reflex scheduler.
*
* @param config Full NerveConfig (reads senses for throttle/timeout, reflexes for schedule).
* @param bus SignalBus to subscribe for event-driven reflexes.
* @param triggerFn Called with the sense name when a compute should be dispatched.
* @param opts Optional: logStore for structured logging.
* @returns ReflexScheduler with stop() and onComputeComplete() methods.
*/
export function createReflexScheduler(
config: NerveConfig,
bus: SignalBus,
triggerFn: TriggerFn,
opts?: ReflexSchedulerOptions,
): ReflexScheduler {
const logStore = opts?.logStore;
const intervals: ReturnType<typeof setInterval>[] = [];
const unsubscribers: Unsubscribe[] = [];
const states = new Map<string, SenseState>();
function getState(senseName: string): SenseState {
let state = states.get(senseName);
if (state === undefined) {
state = makeSenseState();
states.set(senseName, state);
}
return state;
}
function dispatchCompute(senseName: string): void {
const state = getState(senseName);
state.inFlight = true;
state.pending = false;
state.lastComputeAt = Date.now();
logStore?.append({
source: "reflex",
type: "run_start",
refId: senseName,
payload: null,
ts: Date.now(),
});
triggerFn(senseName);
}
/**
* Attempt to trigger compute for a sense.
* Respects throttle window and merge/coalesce semantics.
* If within throttle window, schedules a single deferred trigger at window end (fix #3).
*/
function maybeTrigger(senseName: string): void {
const senseConfig = config.senses[senseName];
const throttleMs = senseConfig?.throttle ?? null;
const state = getState(senseName);
const now = Date.now();
if (throttleMs !== null) {
const elapsed = now - state.lastComputeAt;
if (elapsed < throttleMs) {
// Schedule one deferred trigger at the end of the throttle window (no stacking)
if (state.deferredTimer === null) {
const remaining = throttleMs - elapsed;
state.deferredTimer = setTimeout(() => {
state.deferredTimer = null;
maybeTrigger(senseName);
}, remaining);
}
return;
}
}
if (state.inFlight) {
state.pending = true;
return;
}
dispatchCompute(senseName);
}
/**
* Called by the kernel when a compute cycle completes (signal or error received).
* Drains the pending flag if set.
*/
function onComputeComplete(senseName: string): void {
const state = states.get(senseName);
if (state === undefined) return;
state.inFlight = false;
if (!state.pending) return;
const senseConfig = config.senses[senseName];
const throttleMs = senseConfig?.throttle ?? null;
const now = Date.now();
if (throttleMs !== null && now - state.lastComputeAt < throttleMs) {
// Schedule deferred trigger if not already pending (fix #3)
if (state.deferredTimer === null) {
const remaining = throttleMs - (now - state.lastComputeAt);
state.deferredTimer = setTimeout(() => {
state.deferredTimer = null;
const s = states.get(senseName);
if (s !== undefined && !s.inFlight) {
dispatchCompute(senseName);
}
}, remaining);
}
state.pending = false;
return;
}
dispatchCompute(senseName);
}
for (const reflex of config.reflexes) {
if (reflex.kind !== "sense") continue;
const senseReflex = reflex;
const senseName = senseReflex.sense;
if (senseReflex.interval !== null) {
const id = setInterval(() => {
maybeTrigger(senseName);
}, senseReflex.interval);
intervals.push(id);
}
if (senseReflex.on !== null && senseReflex.on.length > 0) {
const watchedSenses = new Set(senseReflex.on);
const unsub = bus.subscribe((signal) => {
if (watchedSenses.has(signal.senseId)) {
maybeTrigger(senseName);
}
});
unsubscribers.push(unsub);
}
}
function stop(): void {
for (const id of intervals) {
clearInterval(id);
}
for (const unsub of unsubscribers) {
unsub();
}
for (const state of states.values()) {
if (state.deferredTimer !== null) {
clearTimeout(state.deferredTimer);
state.deferredTimer = null;
}
}
intervals.length = 0;
unsubscribers.length = 0;
}
return { onComputeComplete, stop };
}
+119 -41
View File
@@ -79,18 +79,12 @@ async function initSense(
const dbResult = openSenseDb(dbPath, migrationsDir);
if (!dbResult.ok) {
process.stderr.write(
`[sense-worker] Failed to init DB for "${senseName}": ${dbResult.error.message}\n`,
);
process.exit(1);
throw new Error(`Failed to init DB for "${senseName}": ${dbResult.error.message}`);
}
const computeResult = await loadComputeFn(senseIndexPath);
if (!computeResult.ok) {
process.stderr.write(
`[sense-worker] Failed to load compute for "${senseName}": ${computeResult.error.message}\n`,
);
process.exit(1);
throw new Error(`Failed to load compute for "${senseName}": ${computeResult.error.message}`);
}
const { db } = dbResult.value;
@@ -129,12 +123,75 @@ function buildPeers(
return Object.fromEntries(entries);
}
// ---------------------------------------------------------------------------
// Grace period: hard kill after soft timeout
//
// Trade-off: grace period kills the entire worker process (process.exit), which
// terminates all senses in the group — not just the one that timed out. This is
// intentional: a hung compute may hold locks or corrupt shared state. Restarting
// the full worker ensures a clean slate, but other senses in the same group will
// lose any in-flight work until the kernel respawns the process.
// ---------------------------------------------------------------------------
const gracePeriodTimers = new Map<string, ReturnType<typeof setTimeout>>();
function scheduleGracePeriodKill(senseName: string, gracePeriodMs: number): void {
if (gracePeriodTimers.has(senseName)) return;
process.stderr.write(`[sense-worker] grace period for "${senseName}" (${gracePeriodMs}ms)\n`);
const timer = setTimeout(() => {
process.stderr.write(`[sense-worker] grace period expired for "${senseName}", hard kill\n`);
process.exit(1);
}, gracePeriodMs);
gracePeriodTimers.set(senseName, timer);
}
function clearGracePeriodTimer(senseName: string): void {
const timer = gracePeriodTimers.get(senseName);
if (timer === undefined) return;
clearTimeout(timer);
gracePeriodTimers.delete(senseName);
}
// ---------------------------------------------------------------------------
// Compute execution with error isolation
// ---------------------------------------------------------------------------
async function runCompute(
senseName: string,
runtime: SenseRuntime,
peers: PeerMap,
timeoutMs: number,
gracePeriodMs: number | null,
): Promise<void> {
try {
const result = await executeCompute(runtime, peers, timeoutMs);
if (!result.ok) {
sendError(senseName, result.error.message);
if (gracePeriodMs !== null && result.error.message.includes("timed out")) {
scheduleGracePeriodKill(senseName, gracePeriodMs);
}
return;
}
clearGracePeriodTimer(senseName);
if (result.value !== null) {
sendSignal(senseName, result.value);
}
} catch (e: unknown) {
const errMsg = e instanceof Error ? e.message : String(e);
sendError(senseName, errMsg);
}
}
// ---------------------------------------------------------------------------
// IPC message dispatch
// ---------------------------------------------------------------------------
function handleMessage(
raw: unknown,
runtimes: Map<string, SenseRuntime>,
peers: PeerMap,
group: string,
timeoutMs: number,
senseConfigs: Map<string, { timeout: number | null; gracePeriod: number | null }>,
inFlight: Map<string, Promise<void>>,
): void {
const parseResult = parseParentMessage(raw);
@@ -149,33 +206,36 @@ function handleMessage(
process.exit(0);
}
if (msg.type === "compute") {
const runtime = runtimes.get(msg.sense);
if (!runtime) {
sendError(msg.sense, `Unknown sense "${msg.sense}" in group "${group}"`);
return;
}
// Serialize computes for the same sense
const previous = inFlight.get(msg.sense) ?? Promise.resolve();
const next = previous.then(async () => {
const result = await executeCompute(runtime, peers, timeoutMs);
if (!result.ok) {
sendError(msg.sense, result.error.message);
return;
}
if (result.value !== null) {
sendSignal(msg.sense, result.value);
}
if (msg.type === "health-request") {
send({
type: "health-response",
senses: [...runtimes.keys()],
inFlightCount: inFlight.size,
});
return;
}
const tracked = next.catch((e: unknown) => {
if (msg.type !== "compute") return;
const runtime = runtimes.get(msg.sense);
if (!runtime) {
sendError(msg.sense, `Unknown sense "${msg.sense}" in group "${group}"`);
return;
}
// Look up timeout/gracePeriod per-sense at compute time (RFC §5.3: these are per-sense)
const sc = senseConfigs.get(msg.sense);
const timeoutMs = sc?.timeout ?? DEFAULT_TIMEOUT_MS;
const gracePeriodMs = sc?.gracePeriod ?? null;
const previous = inFlight.get(msg.sense) ?? Promise.resolve();
const next = previous
.then(() => runCompute(msg.sense, runtime, peers, timeoutMs, gracePeriodMs))
.catch((e: unknown) => {
const errMsg = e instanceof Error ? e.message : String(e);
sendError(msg.sense, errMsg);
});
inFlight.set(msg.sense, tracked);
}
inFlight.set(msg.sense, next);
}
// ---------------------------------------------------------------------------
@@ -198,29 +258,47 @@ async function bootstrap(nerveRoot: string, group: string): Promise<void> {
const runtimes = new Map<string, SenseRuntime>();
const ownDbs = new Map<string, DrizzleDB>();
const failedSenses: string[] = [];
for (const senseName of groupSenses) {
const { db, runtime } = await initSense(nerveRoot, senseName);
ownDbs.set(senseName, db);
runtimes.set(senseName, runtime);
try {
const { db, runtime } = await initSense(nerveRoot, senseName);
ownDbs.set(senseName, db);
runtimes.set(senseName, runtime);
} catch (e: unknown) {
const eMsg = e instanceof Error ? e.message : String(e);
process.stderr.write(
`[sense-worker] Failed to load sense "${senseName}", skipping: ${eMsg}\n`,
);
failedSenses.push(senseName);
}
}
// If ALL senses failed, exit with error so kernel respawns
if (runtimes.size === 0) {
process.stderr.write(`[sense-worker] All senses in group "${group}" failed to load, exiting\n`);
process.exit(1);
}
const groupSenseNames = new Set(groupSenses);
const peers = buildPeers(nerveRoot, Object.keys(config.senses), ownDbs, groupSenseNames);
// Read timeout from config (uses first group sense's config, or default)
const firstSenseConfig = config.senses[groupSenses[0]];
const timeoutMs =
typeof (firstSenseConfig as Record<string, unknown>).timeoutMs === "number"
? ((firstSenseConfig as Record<string, unknown>).timeoutMs as number)
: DEFAULT_TIMEOUT_MS;
// Build per-sense timeout/gracePeriod map (RFC §5.3: these are per-sense, not per-group)
const senseConfigs = new Map<string, { timeout: number | null; gracePeriod: number | null }>();
for (const senseName of groupSenses) {
const sc = config.senses[senseName];
senseConfigs.set(senseName, {
timeout: sc?.timeout ?? null,
gracePeriod: sc?.gracePeriod ?? null,
});
}
const inFlight = new Map<string, Promise<void>>();
sendReady();
process.on("message", (raw: unknown) => {
handleMessage(raw, runtimes, peers, group, timeoutMs, inFlight);
handleMessage(raw, runtimes, peers, group, senseConfigs, inFlight);
});
}
+51
View File
@@ -0,0 +1,51 @@
/**
* In-memory signal bus for routing signals between sense workers and reflex subscribers.
* Synchronous dispatch — no persistence, no async queuing.
*
* If a handler throws, the error is logged and remaining handlers still run.
* The first error is re-thrown after all handlers complete so callers can observe it.
*/
import type { Signal } from "@uncaged/nerve-core";
export type SignalHandler = (signal: Signal) => void;
export type Unsubscribe = () => void;
export type SignalBus = {
emit: (signal: Signal) => void;
subscribe: (handler: SignalHandler) => Unsubscribe;
};
export function createSignalBus(): SignalBus {
const handlers = new Set<SignalHandler>();
function emit(signal: Signal): void {
let firstError: unknown = null;
let hasError = false;
for (const handler of handlers) {
try {
handler(signal);
} catch (e) {
console.error("[signal-bus] handler error:", e);
if (!hasError) {
firstError = e;
hasError = true;
}
}
}
if (hasError) {
throw firstError;
}
}
function subscribe(handler: SignalHandler): Unsubscribe {
handlers.add(handler);
return () => {
handlers.delete(handler);
};
}
return { emit, subscribe };
}
+2 -1
View File
@@ -2,7 +2,8 @@
"extends": "../../tsconfig.json",
"compilerOptions": {
"outDir": "dist",
"rootDir": "src"
"rootDir": "src",
"composite": false
},
"include": ["src"]
}
+1 -1
View File
@@ -1,7 +1,7 @@
import { defineConfig } from "tsup";
export default defineConfig({
entry: ["src/index.ts"],
entry: ["src/index.ts", "src/sense-worker.ts"],
format: ["esm"],
dts: true,
clean: true,
+38 -3
View File
@@ -18,7 +18,21 @@ importers:
specifier: ^5.5.0
version: 5.9.3
packages/cli: {}
packages/cli:
dependencies:
'@uncaged/nerve-core':
specifier: workspace:*
version: link:../core
'@uncaged/nerve-daemon':
specifier: workspace:*
version: link:../daemon
citty:
specifier: ^0.1.6
version: 0.1.6
devDependencies:
'@types/node':
specifier: ^22.0.0
version: 22.19.17
packages/core:
dependencies:
@@ -552,6 +566,9 @@ packages:
'@types/estree@1.0.8':
resolution: {integrity: sha512-dWHzHa2WqEXI/O1E9OjrocMTKJl2mSrEolh1Iomrv6U+JuNwaHXsXx9bLu5gG7BUWFIN0skIQJQ/L1rIex4X6w==}
'@types/node@22.19.17':
resolution: {integrity: sha512-wGdMcf+vPYM6jikpS/qhg6WiqSV/OhG+jeeHT/KlVqxYfD40iYJf9/AE1uQxVWFvU7MipKRkRv8NSHiCGgPr8Q==}
'@types/node@25.6.0':
resolution: {integrity: sha512-+qIYRKdNYJwY3vRCZMdJbPLJAtGjQBudzZzdzwQYkEPQd+PJGixUL5QfvCLDaULoLv+RhT3LDkwEfKaAkgSmNQ==}
@@ -632,6 +649,9 @@ packages:
chownr@1.1.4:
resolution: {integrity: sha512-jJ0bqzaylmJtVnNgzTeSOs8DPavpbYgEr/b0YL8/2GO3xJEhInFmhKMUnEJQjZumK7KXGFhUy89PrsJWlakBVg==}
citty@0.1.6:
resolution: {integrity: sha512-tskPPKEs8D2KPafUypv2gxwJP8h/OaJmC82QQGGDQcHvXX43xF2VDACcJVmZ0EuSxkpO9Kc4MlrA3q0+FG58AQ==}
commander@4.1.1:
resolution: {integrity: sha512-NOKm8xhkzAjzFx8B2v5OAHT+u5pRQc2UCa2Vq9jYL/31o2wi9mxBA7LIFs3sV5VSC49z6pEhfbMULvShKj26WA==}
engines: {node: '>= 6'}
@@ -1135,6 +1155,9 @@ packages:
ufo@1.6.3:
resolution: {integrity: sha512-yDJTmhydvl5lJzBmy/hyOAA0d+aqCBuwl818haVdYCRrWV84o7YyeVm4QlVHStqNrrJSTb6jKuFAVqAFsr+K3Q==}
undici-types@6.21.0:
resolution: {integrity: sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==}
undici-types@7.19.2:
resolution: {integrity: sha512-qYVnV5OEm2AW8cJMCpdV20CDyaN3g0AjDlOGf1OW4iaDEx8MwdtChUp4zu4H0VP3nDRF/8RKWH+IPp9uW0YGZg==}
@@ -1527,7 +1550,7 @@ snapshots:
'@types/better-sqlite3@7.6.13':
dependencies:
'@types/node': 25.6.0
'@types/node': 22.19.17
'@types/chai@5.2.3':
dependencies:
@@ -1538,9 +1561,14 @@ snapshots:
'@types/estree@1.0.8': {}
'@types/node@22.19.17':
dependencies:
undici-types: 6.21.0
'@types/node@25.6.0':
dependencies:
undici-types: 7.19.2
optional: true
'@vitest/expect@4.1.5':
dependencies:
@@ -1626,6 +1654,10 @@ snapshots:
chownr@1.1.4: {}
citty@0.1.6:
dependencies:
consola: 3.4.2
commander@4.1.1: {}
confbox@0.1.8: {}
@@ -2050,7 +2082,10 @@ snapshots:
ufo@1.6.3: {}
undici-types@7.19.2: {}
undici-types@6.21.0: {}
undici-types@7.19.2:
optional: true
util-deprecate@1.0.2: {}