Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 097a4790be | |||
| ad2b40dd4f | |||
| 31d1eae44a | |||
| b4ef669607 | |||
| 22b3690327 | |||
| 7bb5df301d | |||
| 9443406703 | |||
| d9355a6299 | |||
| bf60047186 |
@@ -0,0 +1,40 @@
|
||||
# Example nerve.yaml demonstrating Signal Bus & Reflex Scheduler (Phase 3)
|
||||
#
|
||||
# Layout:
|
||||
# - cpu-usage: periodic every 10s, throttled to 5s minimum between computes
|
||||
# - disk-usage: periodic every 30s
|
||||
# - system-health: derived sense, triggered whenever cpu-usage OR disk-usage emits
|
||||
|
||||
senses:
|
||||
cpu-usage:
|
||||
group: system
|
||||
throttle: 5s
|
||||
timeout: 8s
|
||||
grace_period: null
|
||||
|
||||
disk-usage:
|
||||
group: system
|
||||
throttle: null
|
||||
timeout: 15s
|
||||
grace_period: null
|
||||
|
||||
system-health:
|
||||
group: derived
|
||||
throttle: 2s
|
||||
timeout: 10s
|
||||
grace_period: null
|
||||
|
||||
reflexes:
|
||||
# cpu-usage runs on a 10-second interval
|
||||
- sense: cpu-usage
|
||||
interval: 10s
|
||||
|
||||
# disk-usage runs on a 30-second interval
|
||||
- sense: disk-usage
|
||||
interval: 30s
|
||||
|
||||
# system-health is event-driven: fires whenever cpu-usage or disk-usage emits a signal
|
||||
- sense: system-health
|
||||
on:
|
||||
- cpu-usage
|
||||
- disk-usage
|
||||
@@ -10,5 +10,13 @@
|
||||
"types": "dist/index.d.ts",
|
||||
"scripts": {
|
||||
"build": "tsup"
|
||||
},
|
||||
"dependencies": {
|
||||
"@uncaged/nerve-core": "workspace:*",
|
||||
"@uncaged/nerve-daemon": "workspace:*",
|
||||
"citty": "^0.1.6"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/node": "^22.0.0"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,25 @@
|
||||
#!/usr/bin/env node
|
||||
|
||||
import { defineCommand, runMain } from "citty";
|
||||
|
||||
import { initCommand } from "./commands/init.js";
|
||||
import { startCommand } from "./commands/start.js";
|
||||
import { statusCommand } from "./commands/status.js";
|
||||
import { stopCommand } from "./commands/stop.js";
|
||||
import { validateCommand } from "./commands/validate.js";
|
||||
|
||||
const main = defineCommand({
|
||||
meta: {
|
||||
name: "nerve",
|
||||
description: "Nerve — an AI agent kernel",
|
||||
},
|
||||
subCommands: {
|
||||
init: initCommand,
|
||||
start: startCommand,
|
||||
stop: stopCommand,
|
||||
status: statusCommand,
|
||||
validate: validateCommand,
|
||||
},
|
||||
});
|
||||
|
||||
runMain(main);
|
||||
@@ -0,0 +1,170 @@
|
||||
import { existsSync, mkdirSync, writeFileSync } from "node:fs";
|
||||
import { dirname, join } from "node:path";
|
||||
|
||||
import { defineCommand } from "citty";
|
||||
|
||||
import { getNerveRoot } from "../workspace.js";
|
||||
|
||||
const NERVE_YAML = `# nerve.yaml — Nerve workspace configuration
|
||||
senses:
|
||||
cpu-usage:
|
||||
group: system
|
||||
throttle: 5s
|
||||
timeout: 10s
|
||||
grace_period: null
|
||||
|
||||
reflexes:
|
||||
- kind: sense
|
||||
sense: cpu-usage
|
||||
interval: 10s
|
||||
`;
|
||||
|
||||
const PACKAGE_JSON = `{
|
||||
"name": "my-nerve-workspace",
|
||||
"version": "0.0.1",
|
||||
"private": true,
|
||||
"type": "module",
|
||||
"dependencies": {
|
||||
"@uncaged/nerve-core": "latest",
|
||||
"drizzle-orm": "latest"
|
||||
},
|
||||
"devDependencies": {
|
||||
"drizzle-kit": "latest"
|
||||
}
|
||||
}
|
||||
`;
|
||||
|
||||
const GITIGNORE = `data/
|
||||
node_modules/
|
||||
`;
|
||||
|
||||
const CPU_SCHEMA_TS = `import { integer, real, sqliteTable, text } from "drizzle-orm/sqlite-core";
|
||||
|
||||
export const cpuUsage = sqliteTable("cpu_usage", {
|
||||
id: integer("id").primaryKey({ autoIncrement: true }),
|
||||
ts: integer("ts").notNull(),
|
||||
model: text("model").notNull(),
|
||||
loadPercent: real("load_percent").notNull(),
|
||||
});
|
||||
`;
|
||||
|
||||
const CPU_INDEX_TS = `import { cpus } from "node:os";
|
||||
|
||||
export async function compute(): Promise<unknown> {
|
||||
const cpuList = cpus();
|
||||
|
||||
let totalIdle = 0;
|
||||
let totalTick = 0;
|
||||
for (const cpu of cpuList) {
|
||||
for (const [, time] of Object.entries(cpu.times)) {
|
||||
totalTick += time;
|
||||
}
|
||||
totalIdle += cpu.times.idle;
|
||||
}
|
||||
|
||||
const loadPercent = totalTick === 0 ? 0 : ((totalTick - totalIdle) / totalTick) * 100;
|
||||
|
||||
return {
|
||||
model: cpuList[0]?.model ?? "unknown",
|
||||
loadPercent: Math.round(loadPercent * 100) / 100,
|
||||
ts: Date.now(),
|
||||
};
|
||||
}
|
||||
`;
|
||||
|
||||
const CPU_MIGRATION_SQL = `CREATE TABLE IF NOT EXISTS cpu_usage (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
ts INTEGER NOT NULL,
|
||||
model TEXT NOT NULL,
|
||||
load_percent REAL NOT NULL
|
||||
);
|
||||
`;
|
||||
|
||||
function writeFile(filePath: string, content: string): void {
|
||||
mkdirSync(dirname(filePath), { recursive: true });
|
||||
writeFileSync(filePath, content, "utf8");
|
||||
}
|
||||
|
||||
async function runCommand(cmd: string, args: string[], cwd: string): Promise<void> {
|
||||
const { spawn } = await import("node:child_process");
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const child = spawn(cmd, args, { cwd, stdio: "inherit" });
|
||||
child.on("close", (code) => {
|
||||
if (code === 0) resolve();
|
||||
else reject(new Error(`${cmd} exited with code ${code}`));
|
||||
});
|
||||
child.on("error", reject);
|
||||
});
|
||||
}
|
||||
|
||||
async function detectPackageManager(): Promise<{ cmd: string; args: string[] }> {
|
||||
const { execFile } = await import("node:child_process");
|
||||
const { promisify } = await import("node:util");
|
||||
const execFileAsync = promisify(execFile);
|
||||
|
||||
for (const pm of ["pnpm", "yarn", "npm"]) {
|
||||
try {
|
||||
await execFileAsync(pm, ["--version"]);
|
||||
const args = pm === "pnpm" ? ["install", "--no-cache"] : ["install"];
|
||||
return { cmd: pm, args };
|
||||
} catch {
|
||||
// not available, try next
|
||||
}
|
||||
}
|
||||
return { cmd: "npm", args: ["install"] };
|
||||
}
|
||||
|
||||
export const initCommand = defineCommand({
|
||||
meta: {
|
||||
name: "init",
|
||||
description: "Initialize the ~/.uncaged-nerve/ workspace",
|
||||
},
|
||||
args: {
|
||||
force: {
|
||||
type: "boolean",
|
||||
description: "Reinitialize even if workspace already exists (preserves data/)",
|
||||
default: false,
|
||||
},
|
||||
},
|
||||
async run({ args }) {
|
||||
const nerveRoot = getNerveRoot();
|
||||
|
||||
if (existsSync(nerveRoot) && !args.force) {
|
||||
process.stderr.write("⚠️ ~/.uncaged-nerve/ already exists. Use --force to reinitialize.\n");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
mkdirSync(join(nerveRoot, "data"), { recursive: true });
|
||||
mkdirSync(join(nerveRoot, "senses", "cpu-usage", "migrations"), { recursive: true });
|
||||
|
||||
writeFile(join(nerveRoot, "nerve.yaml"), NERVE_YAML);
|
||||
writeFile(join(nerveRoot, "package.json"), PACKAGE_JSON);
|
||||
writeFile(join(nerveRoot, ".gitignore"), GITIGNORE);
|
||||
writeFile(join(nerveRoot, "senses", "cpu-usage", "schema.ts"), CPU_SCHEMA_TS);
|
||||
writeFile(join(nerveRoot, "senses", "cpu-usage", "index.ts"), CPU_INDEX_TS);
|
||||
writeFile(
|
||||
join(nerveRoot, "senses", "cpu-usage", "migrations", "0001_init.sql"),
|
||||
CPU_MIGRATION_SQL,
|
||||
);
|
||||
|
||||
process.stdout.write("Installing dependencies…\n");
|
||||
try {
|
||||
const { cmd, args } = await detectPackageManager();
|
||||
await runCommand(cmd, args, nerveRoot);
|
||||
} catch {
|
||||
process.stdout.write("⚠️ Install failed — you may need to install dependencies manually.\n");
|
||||
}
|
||||
|
||||
if (!existsSync(join(nerveRoot, ".git"))) {
|
||||
try {
|
||||
await runCommand("git", ["init"], nerveRoot);
|
||||
} catch {
|
||||
process.stdout.write("⚠️ git init failed — skipping.\n");
|
||||
}
|
||||
}
|
||||
|
||||
process.stdout.write(
|
||||
"✅ Workspace created at ~/.uncaged-nerve/\n 1 example sense: cpu-usage\n Run `nerve start` to launch the daemon.\n",
|
||||
);
|
||||
},
|
||||
});
|
||||
@@ -0,0 +1,145 @@
|
||||
import { createWriteStream } from "node:fs";
|
||||
import { readFileSync } from "node:fs";
|
||||
import { mkdir } from "node:fs/promises";
|
||||
import { join } from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
|
||||
import { parseNerveConfig } from "@uncaged/nerve-core";
|
||||
import { createKernel } from "@uncaged/nerve-daemon";
|
||||
import { defineCommand } from "citty";
|
||||
|
||||
import { getLogPath, getNerveRoot, isRunning, readPidFile, writePidFile } from "../workspace.js";
|
||||
|
||||
function readConfig(nerveRoot: string): ReturnType<typeof parseNerveConfig> {
|
||||
const configPath = join(nerveRoot, "nerve.yaml");
|
||||
let raw: string;
|
||||
try {
|
||||
raw = readFileSync(configPath, "utf8");
|
||||
} catch (e) {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
return { ok: false, error: new Error(`❌ Cannot read ${configPath}: ${msg}`) };
|
||||
}
|
||||
return parseNerveConfig(raw);
|
||||
}
|
||||
|
||||
async function runForeground(nerveRoot: string): Promise<void> {
|
||||
const configResult = readConfig(nerveRoot);
|
||||
if (!configResult.ok) {
|
||||
process.stderr.write(`${configResult.error.message}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const config = configResult.value;
|
||||
const kernel = createKernel(config, nerveRoot);
|
||||
|
||||
const senseNames = Object.keys(config.senses);
|
||||
const groups = [...kernel.groups];
|
||||
|
||||
process.stdout.write(
|
||||
`✅ Nerve starting — ${senseNames.length} sense(s), ${groups.length} group(s)\n`,
|
||||
);
|
||||
for (const group of groups) {
|
||||
const groupSenses = Object.entries(config.senses)
|
||||
.filter(([, sc]) => sc.group === group)
|
||||
.map(([name]) => name);
|
||||
process.stdout.write(` group "${group}": ${groupSenses.join(", ")}\n`);
|
||||
}
|
||||
process.stdout.write(" Press Ctrl+C to stop.\n");
|
||||
|
||||
let shuttingDown = false;
|
||||
|
||||
async function shutdown(): Promise<void> {
|
||||
if (shuttingDown) return;
|
||||
shuttingDown = true;
|
||||
process.stdout.write("\n[nerve] Shutting down…\n");
|
||||
await kernel.stop();
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
process.on("SIGINT", () => {
|
||||
shutdown().catch((e: unknown) => {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`[nerve] Shutdown error: ${msg}\n`);
|
||||
process.exit(1);
|
||||
});
|
||||
});
|
||||
|
||||
process.on("SIGTERM", () => {
|
||||
shutdown().catch((e: unknown) => {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`[nerve] Shutdown error: ${msg}\n`);
|
||||
process.exit(1);
|
||||
});
|
||||
});
|
||||
|
||||
await kernel.ready;
|
||||
}
|
||||
|
||||
async function runDaemon(nerveRoot: string): Promise<void> {
|
||||
if (isRunning()) {
|
||||
const pid = readPidFile();
|
||||
process.stderr.write(`⚠️ Nerve daemon is already running (pid ${pid}).\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const configResult = readConfig(nerveRoot);
|
||||
if (!configResult.ok) {
|
||||
process.stderr.write(`${configResult.error.message}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const logPath = getLogPath();
|
||||
await mkdir(join(nerveRoot, "logs"), { recursive: true });
|
||||
|
||||
const { spawn } = await import("node:child_process");
|
||||
const logStream = createWriteStream(logPath, { flags: "a" });
|
||||
await new Promise<void>((resolve) => {
|
||||
if (logStream.pending) logStream.once("open", () => resolve());
|
||||
else resolve();
|
||||
});
|
||||
|
||||
const selfPath = fileURLToPath(import.meta.url);
|
||||
|
||||
const child = spawn(process.execPath, [selfPath, "start"], {
|
||||
detached: true,
|
||||
stdio: ["ignore", logStream.fd, logStream.fd],
|
||||
env: { ...process.env, NERVE_DAEMON_MODE: "1" },
|
||||
});
|
||||
|
||||
child.unref();
|
||||
|
||||
const pid = child.pid;
|
||||
if (!pid) {
|
||||
process.stderr.write("❌ Failed to spawn daemon process.\n");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
writePidFile(pid);
|
||||
process.stdout.write(`✅ Nerve daemon started (pid ${pid}).\n`);
|
||||
process.stdout.write(` Logs: ${logPath}\n`);
|
||||
process.stdout.write(" Run `nerve stop` to stop.\n");
|
||||
}
|
||||
|
||||
export const startCommand = defineCommand({
|
||||
meta: {
|
||||
name: "start",
|
||||
description: "Start the nerve daemon",
|
||||
},
|
||||
args: {
|
||||
daemon: {
|
||||
type: "boolean",
|
||||
alias: "d",
|
||||
description: "Run as background daemon",
|
||||
default: false,
|
||||
},
|
||||
},
|
||||
async run({ args }) {
|
||||
const nerveRoot = getNerveRoot();
|
||||
|
||||
if (args.daemon) {
|
||||
await runDaemon(nerveRoot);
|
||||
} else {
|
||||
await runForeground(nerveRoot);
|
||||
}
|
||||
},
|
||||
});
|
||||
@@ -0,0 +1,79 @@
|
||||
import { readFileSync, statSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
|
||||
import { parseNerveConfig } from "@uncaged/nerve-core";
|
||||
import { defineCommand } from "citty";
|
||||
|
||||
import { getNerveRoot, getPidPath, isRunning, readPidFile } from "../workspace.js";
|
||||
|
||||
function formatUptime(ms: number): string {
|
||||
const totalSeconds = Math.floor(ms / 1000);
|
||||
const hours = Math.floor(totalSeconds / 3600);
|
||||
const minutes = Math.floor((totalSeconds % 3600) / 60);
|
||||
const seconds = totalSeconds % 60;
|
||||
if (hours > 0) return `${hours}h ${minutes}m ${seconds}s`;
|
||||
if (minutes > 0) return `${minutes}m ${seconds}s`;
|
||||
return `${seconds}s`;
|
||||
}
|
||||
|
||||
function getUptimeMs(pid: number): number | null {
|
||||
try {
|
||||
const pidStat = readFileSync(`/proc/${pid}/stat`, "utf8").split(" ");
|
||||
const startJiffies = Number(pidStat[21]);
|
||||
const clkTck = 100;
|
||||
const uptimeRaw = readFileSync("/proc/uptime", "utf8").split(" ")[0];
|
||||
const systemUptimeSec = Number.parseFloat(uptimeRaw);
|
||||
const processStartSec = startJiffies / clkTck;
|
||||
return (systemUptimeSec - processStartSec) * 1000;
|
||||
} catch {
|
||||
// /proc not available (non-Linux); fall back to PID file mtime
|
||||
try {
|
||||
const pidMtime = statSync(getPidPath()).mtimeMs;
|
||||
return Date.now() - pidMtime;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const statusCommand = defineCommand({
|
||||
meta: {
|
||||
name: "status",
|
||||
description: "Show nerve daemon status",
|
||||
},
|
||||
async run() {
|
||||
if (!isRunning()) {
|
||||
process.stdout.write("😴 Nerve daemon is not running.\n");
|
||||
return;
|
||||
}
|
||||
|
||||
const pid = readPidFile() as number;
|
||||
|
||||
const configPath = join(getNerveRoot(), "nerve.yaml");
|
||||
let senseList: string[] = [];
|
||||
let workerGroups: string[] = [];
|
||||
|
||||
try {
|
||||
const raw = readFileSync(configPath, "utf8");
|
||||
const result = parseNerveConfig(raw);
|
||||
if (result.ok) {
|
||||
senseList = Object.keys(result.value.senses);
|
||||
workerGroups = [...new Set(Object.values(result.value.senses).map((s) => s.group))];
|
||||
}
|
||||
} catch {
|
||||
// config may not be readable; continue with what we have
|
||||
}
|
||||
|
||||
const uptimeMs = getUptimeMs(pid);
|
||||
const uptimeStr = uptimeMs !== null ? formatUptime(uptimeMs) : "unknown";
|
||||
|
||||
process.stdout.write("✅ Nerve daemon is running.\n");
|
||||
process.stdout.write(` pid: ${pid}\n`);
|
||||
process.stdout.write(` uptime: ${uptimeStr}\n`);
|
||||
process.stdout.write(` senses: ${senseList.length > 0 ? senseList.join(", ") : "(none)"}\n`);
|
||||
process.stdout.write(
|
||||
` workers: ${workerGroups.length > 0 ? workerGroups.join(", ") : "(none)"}\n`,
|
||||
);
|
||||
process.stdout.write(" signals: (pending SignalBus persistence)\n");
|
||||
},
|
||||
});
|
||||
@@ -0,0 +1,58 @@
|
||||
import { defineCommand } from "citty";
|
||||
|
||||
import { isRunning, readPidFile, removePidFile } from "../workspace.js";
|
||||
|
||||
async function waitForExit(pid: number, timeoutMs: number): Promise<boolean> {
|
||||
const start = Date.now();
|
||||
while (Date.now() - start < timeoutMs) {
|
||||
try {
|
||||
process.kill(pid, 0);
|
||||
} catch {
|
||||
return true;
|
||||
}
|
||||
await new Promise<void>((resolve) => setTimeout(resolve, 200));
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
export const stopCommand = defineCommand({
|
||||
meta: {
|
||||
name: "stop",
|
||||
description: "Stop the nerve daemon",
|
||||
},
|
||||
async run() {
|
||||
const pid = readPidFile();
|
||||
if (pid === null) {
|
||||
process.stdout.write("⚠️ No PID file found — daemon may not be running.\n");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!isRunning()) {
|
||||
process.stdout.write("⚠️ Daemon is not running (stale PID file). Cleaning up.\n");
|
||||
removePidFile();
|
||||
return;
|
||||
}
|
||||
|
||||
process.stdout.write(`Stopping nerve daemon (pid ${pid})…\n`);
|
||||
try {
|
||||
process.kill(pid, "SIGTERM");
|
||||
} catch (e) {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`❌ Failed to send SIGTERM: ${msg}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const graceful = await waitForExit(pid, 10_000);
|
||||
if (!graceful) {
|
||||
process.stdout.write("⚠️ Daemon did not exit in 10s — sending SIGKILL.\n");
|
||||
try {
|
||||
process.kill(pid, "SIGKILL");
|
||||
} catch {
|
||||
// already dead
|
||||
}
|
||||
}
|
||||
|
||||
removePidFile();
|
||||
process.stdout.write("✅ Daemon stopped.\n");
|
||||
},
|
||||
});
|
||||
@@ -0,0 +1,40 @@
|
||||
import { readFileSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
|
||||
import { parseNerveConfig } from "@uncaged/nerve-core";
|
||||
import { defineCommand } from "citty";
|
||||
|
||||
import { getNerveRoot } from "../workspace.js";
|
||||
|
||||
export const validateCommand = defineCommand({
|
||||
meta: {
|
||||
name: "validate",
|
||||
description: "Validate nerve.yaml configuration",
|
||||
},
|
||||
async run() {
|
||||
const configPath = join(getNerveRoot(), "nerve.yaml");
|
||||
let raw: string;
|
||||
try {
|
||||
raw = readFileSync(configPath, "utf8");
|
||||
} catch (e) {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`❌ Cannot read ${configPath}: ${msg}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const result = parseNerveConfig(raw);
|
||||
if (!result.ok) {
|
||||
process.stderr.write(`❌ Config validation failed: ${result.error.message}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const config = result.value;
|
||||
const senseCount = Object.keys(config.senses).length;
|
||||
const reflexCount = config.reflexes.length;
|
||||
const workflowCount = config.workflows ? Object.keys(config.workflows).length : 0;
|
||||
|
||||
process.stdout.write(
|
||||
`✅ nerve.yaml is valid — ${senseCount} sense(s), ${reflexCount} reflex(es), ${workflowCount} workflow(s)\n`,
|
||||
);
|
||||
},
|
||||
});
|
||||
@@ -1 +1,12 @@
|
||||
// TODO: implement
|
||||
export {
|
||||
getNerveRoot,
|
||||
getPidPath,
|
||||
getLogPath,
|
||||
readPidFile,
|
||||
writePidFile,
|
||||
removePidFile,
|
||||
isRunning,
|
||||
} from "./workspace.js";
|
||||
|
||||
export { createKernel } from "@uncaged/nerve-daemon";
|
||||
export type { Kernel } from "@uncaged/nerve-daemon";
|
||||
|
||||
@@ -0,0 +1,45 @@
|
||||
import { existsSync, readFileSync, rmSync, writeFileSync } from "node:fs";
|
||||
import { homedir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
|
||||
export function getNerveRoot(): string {
|
||||
return join(homedir(), ".uncaged-nerve");
|
||||
}
|
||||
|
||||
export function getPidPath(): string {
|
||||
return join(getNerveRoot(), "nerve.pid");
|
||||
}
|
||||
|
||||
export function getLogPath(): string {
|
||||
return join(getNerveRoot(), "logs", "nerve.log");
|
||||
}
|
||||
|
||||
export function readPidFile(): number | null {
|
||||
const pidPath = getPidPath();
|
||||
if (!existsSync(pidPath)) return null;
|
||||
const raw = readFileSync(pidPath, "utf8").trim();
|
||||
const pid = Number.parseInt(raw, 10);
|
||||
return Number.isNaN(pid) ? null : pid;
|
||||
}
|
||||
|
||||
export function writePidFile(pid: number): void {
|
||||
writeFileSync(getPidPath(), String(pid), "utf8");
|
||||
}
|
||||
|
||||
export function removePidFile(): void {
|
||||
const pidPath = getPidPath();
|
||||
if (existsSync(pidPath)) {
|
||||
rmSync(pidPath);
|
||||
}
|
||||
}
|
||||
|
||||
export function isRunning(): boolean {
|
||||
const pid = readPidFile();
|
||||
if (pid === null) return false;
|
||||
try {
|
||||
process.kill(pid, 0);
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -2,7 +2,9 @@
|
||||
"extends": "../../tsconfig.json",
|
||||
"compilerOptions": {
|
||||
"outDir": "dist",
|
||||
"rootDir": "src"
|
||||
"rootDir": "src",
|
||||
"composite": false,
|
||||
"types": ["node"]
|
||||
},
|
||||
"include": ["src"]
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { defineConfig } from "tsup";
|
||||
|
||||
export default defineConfig({
|
||||
entry: ["src/index.ts"],
|
||||
entry: ["src/index.ts", "src/cli.ts"],
|
||||
format: ["esm"],
|
||||
dts: true,
|
||||
clean: true,
|
||||
|
||||
@@ -98,6 +98,12 @@ function parseSenseReflex(
|
||||
const intervalResult = parseDurationField(obj.interval, `reflexes[${index}].interval`);
|
||||
if (!intervalResult.ok) return intervalResult;
|
||||
|
||||
if (intervalResult.value === null && on !== null && on.length === 0) {
|
||||
return err(
|
||||
new Error(`reflexes[${index}]: sense reflex must have at least one of "interval" or "on"`),
|
||||
);
|
||||
}
|
||||
|
||||
return ok({
|
||||
kind: "sense" as const,
|
||||
sense: obj.sense,
|
||||
|
||||
@@ -2,7 +2,8 @@
|
||||
"extends": "../../tsconfig.json",
|
||||
"compilerOptions": {
|
||||
"outDir": "dist",
|
||||
"rootDir": "src"
|
||||
"rootDir": "src",
|
||||
"composite": false
|
||||
},
|
||||
"include": ["src"]
|
||||
}
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
/**
|
||||
* Mock sense worker that implements the IPC protocol for integration testing.
|
||||
*
|
||||
* This file intentionally uses the .mjs extension (rather than .ts) because it
|
||||
* is spawned as a child process via fork() at runtime and must execute without
|
||||
* any TypeScript compilation step. Node.js can run .mjs files directly as
|
||||
* ESModules, whereas .ts files would require ts-node/tsx to be available in the
|
||||
* child process environment.
|
||||
*
|
||||
* Behaviour:
|
||||
* - Sends { type: "ready" } on startup
|
||||
* - On { type: "compute", sense } → sends back { type: "signal", sense, payload: 42 }
|
||||
* - On { type: "shutdown" } → exits cleanly with code 0
|
||||
*/
|
||||
|
||||
process.send({ type: "ready" });
|
||||
|
||||
process.on("message", (msg) => {
|
||||
if (!msg || typeof msg !== "object") return;
|
||||
|
||||
if (msg.type === "shutdown") {
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
if (msg.type === "compute" && typeof msg.sense === "string") {
|
||||
process.send({ type: "signal", sense: msg.sense, payload: 42 });
|
||||
}
|
||||
});
|
||||
@@ -0,0 +1,207 @@
|
||||
/**
|
||||
* Integration tests for the Kernel / Process Manager.
|
||||
*
|
||||
* These tests use REAL child processes via a mock worker fixture that
|
||||
* implements the IPC protocol (fixtures/mock-worker.mjs). No build
|
||||
* artifacts are required.
|
||||
*/
|
||||
|
||||
import { dirname, join } from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
|
||||
import type { Signal } from "@uncaged/nerve-core";
|
||||
import type { NerveConfig } from "@uncaged/nerve-core";
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
|
||||
import { createKernel } from "../kernel.js";
|
||||
import type { Kernel } from "../kernel.js";
|
||||
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dir = dirname(__filename);
|
||||
const MOCK_WORKER = join(__dir, "fixtures", "mock-worker.mjs");
|
||||
|
||||
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
return {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
/** Poll until predicate returns true, with a timeout. */
|
||||
async function pollUntil(
|
||||
predicate: () => boolean,
|
||||
timeoutMs: number,
|
||||
intervalMs = 50,
|
||||
): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const timer = setTimeout(
|
||||
() => reject(new Error(`pollUntil timed out after ${timeoutMs}ms`)),
|
||||
timeoutMs,
|
||||
);
|
||||
const check = setInterval(() => {
|
||||
if (predicate()) {
|
||||
clearTimeout(timer);
|
||||
clearInterval(check);
|
||||
resolve();
|
||||
}
|
||||
}, intervalMs);
|
||||
});
|
||||
}
|
||||
|
||||
describe("kernel integration — real child processes", () => {
|
||||
let kernel: Kernel | null = null;
|
||||
|
||||
afterEach(async () => {
|
||||
if (kernel !== null) {
|
||||
await kernel.stop();
|
||||
kernel = null;
|
||||
}
|
||||
});
|
||||
|
||||
it("returns correct groups and senseCount", () => {
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"disk-io": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
});
|
||||
kernel = createKernel(config, "/tmp/nerve-integration-test", {
|
||||
workerScript: MOCK_WORKER,
|
||||
});
|
||||
|
||||
expect(kernel.groups.size).toBe(2);
|
||||
expect(kernel.groups.has("system")).toBe(true);
|
||||
expect(kernel.groups.has("network")).toBe(true);
|
||||
expect(kernel.senseCount).toBe(3);
|
||||
});
|
||||
|
||||
it("workers start and respond to compute messages with signals", async () => {
|
||||
const config = makeConfig();
|
||||
kernel = createKernel(config, "/tmp/nerve-integration-test", {
|
||||
workerScript: MOCK_WORKER,
|
||||
});
|
||||
|
||||
// Wait for all workers to be ready (event-based, not fixed delay)
|
||||
await kernel.ready;
|
||||
|
||||
// Subscribe to the bus before triggering compute
|
||||
const received: Signal[] = [];
|
||||
const unsub = kernel.bus.subscribe((signal) => {
|
||||
received.push(signal);
|
||||
});
|
||||
|
||||
// Trigger a compute for "cpu-usage" through the kernel's triggerCompute
|
||||
kernel.triggerCompute("cpu-usage");
|
||||
|
||||
// Poll until a signal arrives on the bus (event-driven, no fixed delay)
|
||||
await pollUntil(() => received.length > 0, 3000);
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
|
||||
|
||||
unsub();
|
||||
}, 10_000);
|
||||
|
||||
it("graceful shutdown: stop() resolves after all workers exit", async () => {
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
});
|
||||
kernel = createKernel(config, "/tmp/nerve-integration-test", {
|
||||
workerScript: MOCK_WORKER,
|
||||
});
|
||||
|
||||
// Wait for all workers to be ready (event-based)
|
||||
await kernel.ready;
|
||||
|
||||
const stopPromise = kernel.stop();
|
||||
kernel = null;
|
||||
|
||||
// stop() should resolve within 5s (our shutdown timeout)
|
||||
await expect(stopPromise).resolves.toBeUndefined();
|
||||
}, 10_000);
|
||||
|
||||
it("compute round-trip: worker receives compute and sends signal back through bus", async () => {
|
||||
const config = makeConfig();
|
||||
kernel = createKernel(config, "/tmp/nerve-integration-test", {
|
||||
workerScript: MOCK_WORKER,
|
||||
});
|
||||
|
||||
// Wait for all workers to be ready (event-based, not fixed delay)
|
||||
await kernel.ready;
|
||||
|
||||
const received: Signal[] = [];
|
||||
const unsub = kernel.bus.subscribe((signal) => {
|
||||
received.push(signal);
|
||||
});
|
||||
|
||||
// Trigger compute via the kernel — the kernel sends IPC to the worker,
|
||||
// the mock worker responds with a signal message, and the kernel routes it to the bus.
|
||||
kernel.triggerCompute("cpu-usage");
|
||||
|
||||
// Poll for the signal on the bus (no fixed delay)
|
||||
await pollUntil(() => received.length > 0, 3000);
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
|
||||
|
||||
unsub();
|
||||
}, 10_000);
|
||||
|
||||
it("crash recovery: kernel respawns worker after unexpected exit and new worker is functional", async () => {
|
||||
const config = makeConfig();
|
||||
kernel = createKernel(config, "/tmp/nerve-integration-test", {
|
||||
workerScript: MOCK_WORKER,
|
||||
});
|
||||
|
||||
// Wait for initial worker to be ready (event-based)
|
||||
await kernel.ready;
|
||||
|
||||
const originalPid = kernel.getWorkerPid("system");
|
||||
expect(originalPid).not.toBeNull();
|
||||
expect(originalPid).toBeGreaterThan(0);
|
||||
|
||||
// Kill the kernel's own worker to simulate a crash (SIGKILL, code != 0)
|
||||
process.kill(originalPid as number, "SIGKILL");
|
||||
|
||||
// Poll until the kernel respawns and registers a new worker with a different PID
|
||||
// (respawn delay is 1s, then fork(), then workers.set())
|
||||
await pollUntil(() => {
|
||||
const pid = kernel?.getWorkerPid("system");
|
||||
return pid !== null && pid !== originalPid;
|
||||
}, 5000);
|
||||
|
||||
const newPid = kernel.getWorkerPid("system");
|
||||
expect(newPid).not.toBeNull();
|
||||
expect(newPid).not.toBe(originalPid);
|
||||
|
||||
// Wait a bit for the new worker to send its "ready" message and be fully up.
|
||||
// Poll until the new worker responds to a compute message on the bus.
|
||||
const postRespawnSignals: Signal[] = [];
|
||||
const unsub = kernel.bus.subscribe((signal) => {
|
||||
postRespawnSignals.push(signal);
|
||||
});
|
||||
|
||||
// Trigger compute through the kernel to the new worker
|
||||
kernel.triggerCompute("cpu-usage");
|
||||
|
||||
// Poll for the signal — verifies the new worker is fully functional
|
||||
await pollUntil(() => postRespawnSignals.length > 0, 5000);
|
||||
|
||||
expect(postRespawnSignals).toHaveLength(1);
|
||||
expect(postRespawnSignals[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
|
||||
|
||||
unsub();
|
||||
|
||||
// Kernel should still stop gracefully after respawn
|
||||
await kernel.stop();
|
||||
kernel = null;
|
||||
}, 15_000);
|
||||
});
|
||||
@@ -0,0 +1,200 @@
|
||||
import { EventEmitter } from "node:events";
|
||||
|
||||
import type { NerveConfig } from "@uncaged/nerve-core";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Mock child_process.fork before importing kernel
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const mockChildren: MockChild[] = [];
|
||||
|
||||
type MockChild = EventEmitter & {
|
||||
send: ReturnType<typeof vi.fn>;
|
||||
kill: ReturnType<typeof vi.fn>;
|
||||
pid: number;
|
||||
};
|
||||
|
||||
function makeMockChild(pid = 1): MockChild {
|
||||
const child = new EventEmitter() as MockChild;
|
||||
child.send = vi.fn((msg: unknown) => {
|
||||
// Auto-exit on shutdown so stop() resolves
|
||||
if (
|
||||
msg !== null &&
|
||||
typeof msg === "object" &&
|
||||
(msg as Record<string, unknown>).type === "shutdown"
|
||||
) {
|
||||
setImmediate(() => child.emit("exit", 0, null));
|
||||
}
|
||||
});
|
||||
child.kill = vi.fn((_signal?: string) => {
|
||||
child.emit("exit", null, _signal ?? "SIGKILL");
|
||||
});
|
||||
child.pid = pid;
|
||||
return child;
|
||||
}
|
||||
|
||||
vi.mock("node:child_process", () => ({
|
||||
fork: vi.fn((_script: string, _args: string[], _opts: unknown) => {
|
||||
const child = makeMockChild(mockChildren.length + 1);
|
||||
mockChildren.push(child);
|
||||
return child;
|
||||
}),
|
||||
}));
|
||||
|
||||
// Import after mock is set up
|
||||
const { createKernel } = await import("../kernel.js");
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
return {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("kernel — message routing", () => {
|
||||
beforeEach(() => {
|
||||
mockChildren.length = 0;
|
||||
vi.useFakeTimers({ shouldAdvanceTime: true });
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("routes signal message to bus without throwing", async () => {
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
});
|
||||
const kernel = createKernel(config, "/tmp/nerve-test");
|
||||
|
||||
expect(mockChildren.length).toBe(1);
|
||||
const child = mockChildren[0];
|
||||
|
||||
expect(() => {
|
||||
child.emit("message", { type: "signal", sense: "cpu-usage", payload: 42 });
|
||||
}).not.toThrow();
|
||||
|
||||
await kernel.stop();
|
||||
});
|
||||
|
||||
it("routes error message to stderr", async () => {
|
||||
const stderrSpy = vi.spyOn(process.stderr, "write").mockImplementation(() => true);
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
});
|
||||
const kernel = createKernel(config, "/tmp/nerve-test");
|
||||
|
||||
const child = mockChildren[0];
|
||||
child.emit("message", { type: "error", sense: "cpu-usage", error: "compute failed" });
|
||||
|
||||
expect(stderrSpy).toHaveBeenCalledWith(
|
||||
expect.stringContaining('sense "cpu-usage" error: compute failed'),
|
||||
);
|
||||
|
||||
stderrSpy.mockRestore();
|
||||
await kernel.stop();
|
||||
});
|
||||
|
||||
it("ignores ready messages without logging", async () => {
|
||||
const stderrSpy = vi.spyOn(process.stderr, "write").mockImplementation(() => true);
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
});
|
||||
const kernel = createKernel(config, "/tmp/nerve-test");
|
||||
|
||||
const child = mockChildren[0];
|
||||
const callsBefore = stderrSpy.mock.calls.length;
|
||||
child.emit("message", { type: "ready" });
|
||||
|
||||
expect(stderrSpy.mock.calls.length).toBe(callsBefore);
|
||||
|
||||
stderrSpy.mockRestore();
|
||||
await kernel.stop();
|
||||
});
|
||||
|
||||
it("logs invalid worker messages without throwing", async () => {
|
||||
const stderrSpy = vi.spyOn(process.stderr, "write").mockImplementation(() => true);
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
});
|
||||
const kernel = createKernel(config, "/tmp/nerve-test");
|
||||
|
||||
const child = mockChildren[0];
|
||||
expect(() => child.emit("message", { type: "unknown-type" })).not.toThrow();
|
||||
|
||||
expect(stderrSpy).toHaveBeenCalledWith(expect.stringContaining("invalid worker message"));
|
||||
|
||||
stderrSpy.mockRestore();
|
||||
await kernel.stop();
|
||||
});
|
||||
});
|
||||
|
||||
describe("kernel — groupForSense mapping", () => {
|
||||
beforeEach(() => {
|
||||
mockChildren.length = 0;
|
||||
vi.useFakeTimers({ shouldAdvanceTime: true });
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("spawns one worker per unique group", async () => {
|
||||
const config: NerveConfig = {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"disk-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"net-usage": { group: "network", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
};
|
||||
const kernel = createKernel(config, "/tmp/nerve-test");
|
||||
|
||||
// system and network = 2 unique groups
|
||||
expect(mockChildren.length).toBe(2);
|
||||
await kernel.stop();
|
||||
});
|
||||
|
||||
it("sends compute to the correct worker on interval trigger", async () => {
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 500, on: null }],
|
||||
});
|
||||
createKernel(config, "/tmp/nerve-test");
|
||||
|
||||
const child = mockChildren[0];
|
||||
vi.advanceTimersByTime(500);
|
||||
|
||||
expect(child.send).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ type: "compute", sense: "cpu-usage" }),
|
||||
);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,120 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
import type { NerveConfig, Signal } from "@uncaged/nerve-core";
|
||||
import { createReflexScheduler } from "../reflex-scheduler.js";
|
||||
import { createSignalBus } from "../signal-bus.js";
|
||||
|
||||
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
return {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
function makeSignal(senseId: string, payload: unknown = 1): Signal {
|
||||
return { id: 1, senseId, payload, ts: Date.now() };
|
||||
}
|
||||
|
||||
describe("ReflexScheduler — throttle + pending deferred trigger", () => {
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("trigger during throttle window fires deferred trigger after window ends", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: 2000, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
// First trigger fires immediately (outside throttle window)
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onComputeComplete("cpu-usage");
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
// Second trigger arrives 500ms later — still within throttle window (2000ms)
|
||||
vi.advanceTimersByTime(500);
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
// Should not fire yet (throttled)
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
// Advance past the throttle window end; deferred trigger should now fire
|
||||
vi.advanceTimersByTime(1600);
|
||||
expect(triggered.length).toBe(2);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
|
||||
it("multiple triggers during throttle window only produce one deferred trigger", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: 2000, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
// First trigger fires
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onComputeComplete("cpu-usage");
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
// Multiple triggers within throttle window — should not stack
|
||||
vi.advanceTimersByTime(300);
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
vi.advanceTimersByTime(300);
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
vi.advanceTimersByTime(300);
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
// Advance past window — exactly one deferred trigger fires
|
||||
vi.advanceTimersByTime(1200);
|
||||
scheduler.onComputeComplete("cpu-usage");
|
||||
expect(triggered.length).toBe(2);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
|
||||
it("deferred trigger does not fire after stop()", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: 2000, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onComputeComplete("cpu-usage");
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
// Trigger during throttle window — schedules deferred
|
||||
vi.advanceTimersByTime(500);
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
// Stop before window ends
|
||||
scheduler.stop();
|
||||
|
||||
// Advance past window — deferred timer should have been cleared
|
||||
vi.advanceTimersByTime(2000);
|
||||
expect(triggered.length).toBe(1);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,310 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
import type { NerveConfig, Signal } from "@uncaged/nerve-core";
|
||||
import { createReflexScheduler } from "../reflex-scheduler.js";
|
||||
import { createSignalBus } from "../signal-bus.js";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
return {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"disk-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"system-health": { group: "derived", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
function makeSignal(senseId: string, payload: unknown = 1): Signal {
|
||||
return { id: 1, senseId, payload, ts: Date.now() };
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("ReflexScheduler — interval reflex", () => {
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
});
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("fires triggerFn on schedule", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 1000, on: null }],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
// Use a ref so the triggerFn can call back into the scheduler
|
||||
const ref: { scheduler: ReturnType<typeof createReflexScheduler> | null } = {
|
||||
scheduler: null,
|
||||
};
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => {
|
||||
triggered.push(name);
|
||||
// Immediately complete compute so the scheduler is not blocked by in-flight state
|
||||
ref.scheduler?.onComputeComplete(name);
|
||||
});
|
||||
ref.scheduler = scheduler;
|
||||
|
||||
vi.advanceTimersByTime(3000);
|
||||
|
||||
expect(triggered.length).toBeGreaterThanOrEqual(3);
|
||||
expect(triggered.every((n) => n === "cpu-usage")).toBe(true);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
|
||||
it("stops firing after stop() is called", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 500, on: null }],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const ref: { scheduler: ReturnType<typeof createReflexScheduler> | null } = {
|
||||
scheduler: null,
|
||||
};
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => {
|
||||
triggered.push(name);
|
||||
ref.scheduler?.onComputeComplete(name);
|
||||
});
|
||||
ref.scheduler = scheduler;
|
||||
|
||||
vi.advanceTimersByTime(1000);
|
||||
const countBefore = triggered.length;
|
||||
scheduler.stop();
|
||||
vi.advanceTimersByTime(2000);
|
||||
|
||||
expect(triggered.length).toBe(countBefore);
|
||||
});
|
||||
|
||||
it("starts from current time — does not compensate for past intervals", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 1000, on: null }],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
// Only advance 500ms — should be 0 triggers (not catching up)
|
||||
vi.advanceTimersByTime(500);
|
||||
expect(triggered.length).toBe(0);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
});
|
||||
|
||||
describe("ReflexScheduler — event (on) reflex", () => {
|
||||
it("triggers target sense when watched sense emits a signal", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
reflexes: [
|
||||
{ kind: "sense", sense: "system-health", interval: null, on: ["cpu-usage", "disk-usage"] },
|
||||
],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onComputeComplete("system-health");
|
||||
bus.emit(makeSignal("disk-usage"));
|
||||
scheduler.onComputeComplete("system-health");
|
||||
|
||||
expect(triggered.length).toBe(2);
|
||||
expect(triggered.every((n) => n === "system-health")).toBe(true);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
|
||||
it("does not trigger for signals from non-watched senses", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
reflexes: [{ kind: "sense", sense: "system-health", interval: null, on: ["cpu-usage"] }],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
bus.emit(makeSignal("disk-usage"));
|
||||
|
||||
expect(triggered.length).toBe(0);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
|
||||
it("stops responding after stop()", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
reflexes: [{ kind: "sense", sense: "system-health", interval: null, on: ["cpu-usage"] }],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
scheduler.stop();
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
|
||||
expect(triggered.length).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe("ReflexScheduler — throttle", () => {
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
});
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("skips rapid triggers within throttle window", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: 2000, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onComputeComplete("cpu-usage");
|
||||
|
||||
// Immediately trigger again — within throttle window
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
|
||||
it("allows trigger after throttle window has passed", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: 1000, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onComputeComplete("cpu-usage");
|
||||
|
||||
vi.advanceTimersByTime(1500);
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onComputeComplete("cpu-usage");
|
||||
|
||||
expect(triggered.length).toBe(2);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
});
|
||||
|
||||
describe("ReflexScheduler — merge/coalesce", () => {
|
||||
it("concurrent triggers collapse to at most one pending run", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
reflexes: [{ kind: "sense", sense: "system-health", interval: null, on: ["cpu-usage"] }],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
// First trigger starts compute
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
// Three more arrive while first is in-flight — all should coalesce to one pending
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
// Complete first compute → pending drains as exactly one more run
|
||||
scheduler.onComputeComplete("system-health");
|
||||
expect(triggered.length).toBe(2);
|
||||
|
||||
// Complete second compute → no more pending
|
||||
scheduler.onComputeComplete("system-health");
|
||||
expect(triggered.length).toBe(2);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
|
||||
it("no new trigger while in-flight without pending → no extra run after complete", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
reflexes: [{ kind: "sense", sense: "system-health", interval: null, on: ["cpu-usage"] }],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
// Complete with no pending
|
||||
scheduler.onComputeComplete("system-health");
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
});
|
||||
|
||||
describe("ReflexScheduler — interval + on combined", () => {
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
});
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("fires both via interval and event", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
reflexes: [{ kind: "sense", sense: "system-health", interval: 1000, on: ["cpu-usage"] }],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
// Event trigger
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onComputeComplete("system-health");
|
||||
|
||||
// Interval trigger
|
||||
vi.advanceTimersByTime(1000);
|
||||
scheduler.onComputeComplete("system-health");
|
||||
|
||||
expect(triggered.length).toBeGreaterThanOrEqual(2);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
});
|
||||
|
||||
describe("ReflexScheduler — workflow reflexes ignored", () => {
|
||||
it("does not set up any scheduling for workflow kind reflexes", () => {
|
||||
const triggered: string[] = [];
|
||||
const config: NerveConfig = {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [{ kind: "workflow", workflow: "my-workflow", on: ["cpu-usage"] }],
|
||||
workflows: {
|
||||
"my-workflow": { concurrency: 1, overflow: "drop" },
|
||||
},
|
||||
};
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
|
||||
expect(triggered.length).toBe(0);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,99 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
|
||||
import type { Signal } from "@uncaged/nerve-core";
|
||||
import { createSignalBus } from "../signal-bus.js";
|
||||
|
||||
function makeSignal(senseId: string, payload: unknown = 1): Signal {
|
||||
return { id: 1, senseId, payload, ts: Date.now() };
|
||||
}
|
||||
|
||||
describe("createSignalBus", () => {
|
||||
it("delivers emitted signal to a subscriber", () => {
|
||||
const bus = createSignalBus();
|
||||
const received: Signal[] = [];
|
||||
bus.subscribe((s) => received.push(s));
|
||||
|
||||
const sig = makeSignal("cpu-usage", 42);
|
||||
bus.emit(sig);
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0]).toBe(sig);
|
||||
});
|
||||
|
||||
it("delivers to multiple subscribers", () => {
|
||||
const bus = createSignalBus();
|
||||
const a: Signal[] = [];
|
||||
const b: Signal[] = [];
|
||||
bus.subscribe((s) => a.push(s));
|
||||
bus.subscribe((s) => b.push(s));
|
||||
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
|
||||
expect(a).toHaveLength(1);
|
||||
expect(b).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("unsubscribe stops delivery", () => {
|
||||
const bus = createSignalBus();
|
||||
const received: Signal[] = [];
|
||||
const unsub = bus.subscribe((s) => received.push(s));
|
||||
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
unsub();
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("remaining subscribers still receive after one unsubscribes", () => {
|
||||
const bus = createSignalBus();
|
||||
const a: Signal[] = [];
|
||||
const b: Signal[] = [];
|
||||
const unsubA = bus.subscribe((s) => a.push(s));
|
||||
bus.subscribe((s) => b.push(s));
|
||||
|
||||
unsubA();
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
|
||||
expect(a).toHaveLength(0);
|
||||
expect(b).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("emit with no subscribers does nothing", () => {
|
||||
const bus = createSignalBus();
|
||||
expect(() => bus.emit(makeSignal("cpu-usage"))).not.toThrow();
|
||||
});
|
||||
|
||||
it("dispatch is synchronous", () => {
|
||||
const bus = createSignalBus();
|
||||
const order: string[] = [];
|
||||
bus.subscribe(() => order.push("handler"));
|
||||
order.push("before");
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
order.push("after");
|
||||
expect(order).toEqual(["before", "handler", "after"]);
|
||||
});
|
||||
|
||||
it("handler exceptions don't prevent other handlers from running", () => {
|
||||
const bus = createSignalBus();
|
||||
const received: Signal[] = [];
|
||||
bus.subscribe(() => {
|
||||
throw new Error("boom");
|
||||
});
|
||||
bus.subscribe((s) => received.push(s));
|
||||
|
||||
expect(() => bus.emit(makeSignal("cpu-usage"))).toThrow("boom");
|
||||
expect(received).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("same handler can be subscribed once and fires once per emit", () => {
|
||||
const bus = createSignalBus();
|
||||
const handler = vi.fn();
|
||||
bus.subscribe(handler);
|
||||
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
|
||||
expect(handler).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
@@ -8,6 +8,8 @@ export type {
|
||||
WorkerToParentMessage,
|
||||
} from "./ipc.js";
|
||||
|
||||
export type { SignalBus, SignalHandler, Unsubscribe } from "./signal-bus.js";
|
||||
|
||||
export type { ComputeFn, DrizzleDB, PeerMap, SenseRuntime } from "./sense-runtime.js";
|
||||
|
||||
export {
|
||||
@@ -17,3 +19,6 @@ export {
|
||||
loadComputeFn,
|
||||
executeCompute,
|
||||
} from "./sense-runtime.js";
|
||||
|
||||
export { createKernel } from "./kernel.js";
|
||||
export type { Kernel, KernelOptions } from "./kernel.js";
|
||||
|
||||
@@ -57,3 +57,37 @@ export function parseParentMessage(raw: unknown): Result<ParentToWorkerMessage>
|
||||
}
|
||||
return ok(raw as ParentToWorkerMessage);
|
||||
}
|
||||
|
||||
/** Validate and parse an unknown IPC message received from a worker process. */
|
||||
export function parseWorkerMessage(raw: unknown): Result<WorkerToParentMessage> {
|
||||
if (raw === null || typeof raw !== "object") {
|
||||
return err(new Error("Worker IPC message is not an object"));
|
||||
}
|
||||
const obj = raw as Record<string, unknown>;
|
||||
if (typeof obj.type !== "string") {
|
||||
return err(new Error("Worker IPC message missing string 'type' field"));
|
||||
}
|
||||
const type = obj.type;
|
||||
if (type !== "signal" && type !== "error" && type !== "ready") {
|
||||
return err(new Error(`Unknown worker IPC message type: "${type}"`));
|
||||
}
|
||||
if (type === "signal") {
|
||||
if (typeof obj.sense !== "string") {
|
||||
return err(new Error("Worker 'signal' message missing string 'sense' field"));
|
||||
}
|
||||
if (!("payload" in obj)) {
|
||||
return err(new Error("Worker 'signal' message missing 'payload' field"));
|
||||
}
|
||||
return ok(raw as SignalMessage);
|
||||
}
|
||||
if (type === "error") {
|
||||
if (typeof obj.sense !== "string") {
|
||||
return err(new Error("Worker 'error' message missing string 'sense' field"));
|
||||
}
|
||||
if (typeof obj.error !== "string") {
|
||||
return err(new Error("Worker 'error' message missing string 'error' field"));
|
||||
}
|
||||
return ok(raw as ErrorMessage);
|
||||
}
|
||||
return ok({ type: "ready" });
|
||||
}
|
||||
|
||||
@@ -0,0 +1,247 @@
|
||||
/**
|
||||
* Kernel — the main orchestrator that ties sense workers, signal bus, and
|
||||
* reflex scheduler together.
|
||||
*
|
||||
* Responsibilities:
|
||||
* - Spawn one child process per sense group (via fork)
|
||||
* - Route SignalMessage from workers → SignalBus
|
||||
* - Route ErrorMessage from workers → stderr log
|
||||
* - Drive compute triggers via ReflexScheduler
|
||||
* - Graceful shutdown: stop scheduler, send shutdown to all workers
|
||||
*/
|
||||
|
||||
import { fork } from "node:child_process";
|
||||
import type { ChildProcess } from "node:child_process";
|
||||
import { dirname, join } from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
|
||||
import type { NerveConfig, Signal } from "@uncaged/nerve-core";
|
||||
|
||||
import type { ComputeMessage, ShutdownMessage } from "./ipc.js";
|
||||
import { parseWorkerMessage } from "./ipc.js";
|
||||
import { createReflexScheduler } from "./reflex-scheduler.js";
|
||||
import type { ReflexScheduler } from "./reflex-scheduler.js";
|
||||
import { createSignalBus } from "./signal-bus.js";
|
||||
import type { SignalBus } from "./signal-bus.js";
|
||||
|
||||
export type Kernel = {
|
||||
stop: () => Promise<void>;
|
||||
groups: Set<string>;
|
||||
senseCount: number;
|
||||
bus: SignalBus;
|
||||
/** Resolves when all workers have sent their initial "ready" message. */
|
||||
ready: Promise<void>;
|
||||
/** Returns the PID of the worker process for a given group, or null if not found. */
|
||||
getWorkerPid: (group: string) => number | null;
|
||||
/** Sends a compute message to the worker responsible for the given sense. */
|
||||
triggerCompute: (senseName: string) => void;
|
||||
};
|
||||
|
||||
type WorkerEntry = {
|
||||
group: string;
|
||||
process: ChildProcess;
|
||||
};
|
||||
|
||||
function resolveWorkerScript(): string {
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dir = dirname(__filename);
|
||||
return join(__dir, "sense-worker.js");
|
||||
}
|
||||
|
||||
function spawnWorker(nerveRoot: string, group: string, workerScript: string): ChildProcess {
|
||||
return fork(workerScript, ["--group", group, "--root", nerveRoot], {
|
||||
stdio: ["ignore", "inherit", "inherit", "ipc"],
|
||||
});
|
||||
}
|
||||
|
||||
function sendCompute(worker: ChildProcess, senseName: string): void {
|
||||
// worker.connected is false when the IPC channel has been closed (e.g. worker crashed)
|
||||
if (worker.connected === false) return;
|
||||
const msg: ComputeMessage = { type: "compute", sense: senseName };
|
||||
try {
|
||||
worker.send(msg);
|
||||
} catch {
|
||||
// IPC channel closed between connected check and send
|
||||
}
|
||||
}
|
||||
|
||||
function sendShutdown(worker: ChildProcess): void {
|
||||
// worker.connected is false when the IPC channel has been closed (e.g. worker crashed)
|
||||
if (worker.connected === false) return;
|
||||
const msg: ShutdownMessage = { type: "shutdown" };
|
||||
try {
|
||||
worker.send(msg);
|
||||
} catch {
|
||||
// IPC channel closed between connected check and send
|
||||
}
|
||||
}
|
||||
|
||||
function groupForSense(config: NerveConfig, senseName: string): string | null {
|
||||
const senseConfig = config.senses[senseName];
|
||||
if (senseConfig === undefined) return null;
|
||||
return senseConfig.group;
|
||||
}
|
||||
|
||||
export type KernelOptions = {
|
||||
workerScript: string;
|
||||
};
|
||||
|
||||
function defaultKernelOptions(): KernelOptions {
|
||||
return { workerScript: resolveWorkerScript() };
|
||||
}
|
||||
|
||||
export function createKernel(
|
||||
config: NerveConfig,
|
||||
nerveRoot: string,
|
||||
options: KernelOptions = defaultKernelOptions(),
|
||||
): Kernel {
|
||||
const bus: SignalBus = createSignalBus();
|
||||
const workerScript = options.workerScript;
|
||||
|
||||
// Signal ID counter is instance-scoped (fix #2)
|
||||
let _signalIdCounter = 0;
|
||||
function nextSignalId(): number {
|
||||
_signalIdCounter += 1;
|
||||
return _signalIdCounter;
|
||||
}
|
||||
|
||||
const groups = new Set<string>();
|
||||
for (const senseConfig of Object.values(config.senses)) {
|
||||
groups.add(senseConfig.group);
|
||||
}
|
||||
|
||||
const workers = new Map<string, WorkerEntry>();
|
||||
let stopped = false;
|
||||
// eslint-disable-next-line prefer-const
|
||||
let scheduler: ReflexScheduler = null as unknown as ReflexScheduler;
|
||||
|
||||
let readyResolve: (() => void) | undefined;
|
||||
const ready = new Promise<void>((resolve) => {
|
||||
readyResolve = resolve;
|
||||
});
|
||||
let pendingReadyCount = groups.size > 0 ? groups.size : 0;
|
||||
|
||||
function sensesForGroup(group: string): string[] {
|
||||
return Object.entries(config.senses)
|
||||
.filter(([, sc]) => sc.group === group)
|
||||
.map(([name]) => name);
|
||||
}
|
||||
|
||||
function handleWorkerMessage(raw: unknown): void {
|
||||
const result = parseWorkerMessage(raw);
|
||||
if (!result.ok) {
|
||||
process.stderr.write(`[kernel] invalid worker message: ${result.error.message}\n`);
|
||||
return;
|
||||
}
|
||||
const msg = result.value;
|
||||
|
||||
if (msg.type === "ready") {
|
||||
pendingReadyCount -= 1;
|
||||
if (pendingReadyCount <= 0) {
|
||||
readyResolve?.();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.type === "error") {
|
||||
process.stderr.write(`[kernel] sense "${msg.sense}" error: ${msg.error}\n`);
|
||||
scheduler.onComputeComplete(msg.sense);
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.type === "signal") {
|
||||
const signal: Signal = {
|
||||
id: nextSignalId(),
|
||||
senseId: msg.sense,
|
||||
payload: msg.payload,
|
||||
ts: Date.now(),
|
||||
};
|
||||
bus.emit(signal);
|
||||
scheduler.onComputeComplete(msg.sense);
|
||||
}
|
||||
}
|
||||
|
||||
function startWorker(group: string): void {
|
||||
const child = spawnWorker(nerveRoot, group, workerScript);
|
||||
|
||||
child.on("message", handleWorkerMessage);
|
||||
|
||||
child.on("exit", (code) => {
|
||||
process.stderr.write(
|
||||
`[kernel] worker for group "${group}" exited with code ${code ?? "null"}\n`,
|
||||
);
|
||||
// Respawn on unexpected exit (fix #4)
|
||||
if (!stopped && code !== 0) {
|
||||
process.stderr.write(`[kernel] respawning worker for group "${group}" in 1s\n`);
|
||||
// Clear any stuck in-flight state for all senses in this group
|
||||
for (const senseName of sensesForGroup(group)) {
|
||||
scheduler.onComputeComplete(senseName);
|
||||
}
|
||||
setTimeout(() => {
|
||||
if (!stopped) {
|
||||
startWorker(group);
|
||||
}
|
||||
}, 1000);
|
||||
}
|
||||
});
|
||||
|
||||
workers.set(group, { group, process: child });
|
||||
}
|
||||
|
||||
function triggerFn(senseName: string): void {
|
||||
const group = groupForSense(config, senseName);
|
||||
if (group === null) {
|
||||
process.stderr.write(`[kernel] triggerFn: unknown sense "${senseName}"\n`);
|
||||
return;
|
||||
}
|
||||
const entry = workers.get(group);
|
||||
if (entry === undefined) {
|
||||
process.stderr.write(`[kernel] triggerFn: no worker for group "${group}"\n`);
|
||||
return;
|
||||
}
|
||||
sendCompute(entry.process, senseName);
|
||||
}
|
||||
|
||||
scheduler = createReflexScheduler(config, bus, triggerFn);
|
||||
|
||||
if (groups.size === 0) {
|
||||
readyResolve?.();
|
||||
}
|
||||
|
||||
for (const group of groups) {
|
||||
startWorker(group);
|
||||
}
|
||||
|
||||
// Wait for a worker process to exit, with a timeout + SIGKILL fallback (fix #5)
|
||||
function waitForExit(child: ChildProcess, timeoutMs: number): Promise<void> {
|
||||
return new Promise((resolve) => {
|
||||
const timer = setTimeout(() => {
|
||||
child.kill("SIGKILL");
|
||||
resolve();
|
||||
}, timeoutMs);
|
||||
child.once("exit", () => {
|
||||
clearTimeout(timer);
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
async function stop(): Promise<void> {
|
||||
stopped = true;
|
||||
scheduler.stop();
|
||||
const exitPromises: Promise<void>[] = [];
|
||||
for (const entry of workers.values()) {
|
||||
sendShutdown(entry.process);
|
||||
exitPromises.push(waitForExit(entry.process, 5000));
|
||||
}
|
||||
await Promise.all(exitPromises);
|
||||
}
|
||||
|
||||
function getWorkerPid(group: string): number | null {
|
||||
return workers.get(group)?.process.pid ?? null;
|
||||
}
|
||||
|
||||
const senseCount = Object.keys(config.senses).length;
|
||||
|
||||
return { stop, groups, senseCount, bus, ready, getWorkerPid, triggerCompute: triggerFn };
|
||||
}
|
||||
@@ -0,0 +1,181 @@
|
||||
/**
|
||||
* Reflex Scheduler — drives sense compute cycles based on ReflexConfig.
|
||||
*
|
||||
* Supports:
|
||||
* - interval: periodic setInterval-based triggering
|
||||
* - on[]: event-driven triggering via the signal bus
|
||||
* - throttle: skip triggers that arrive too soon after the last compute
|
||||
* - merge/coalesce: if compute is in-flight, record one pending trigger;
|
||||
* run it once after the current compute completes (no unbounded queue)
|
||||
*/
|
||||
|
||||
import type { NerveConfig } from "@uncaged/nerve-core";
|
||||
import type { SignalBus, Unsubscribe } from "./signal-bus.js";
|
||||
|
||||
/** Sends a compute message to the worker responsible for the given sense. */
|
||||
export type TriggerFn = (senseName: string) => void;
|
||||
|
||||
/** Per-sense mutable state tracked by the scheduler. */
|
||||
type SenseState = {
|
||||
lastComputeAt: number;
|
||||
inFlight: boolean;
|
||||
pending: boolean;
|
||||
deferredTimer: ReturnType<typeof setTimeout> | null;
|
||||
};
|
||||
|
||||
/** Handle returned by createReflexScheduler — call stop() for graceful shutdown. */
|
||||
export type ReflexScheduler = {
|
||||
/** Notify scheduler that a compute cycle finished. Drains the pending flag. */
|
||||
onComputeComplete: (senseName: string) => void;
|
||||
stop: () => void;
|
||||
};
|
||||
|
||||
function makeSenseState(): SenseState {
|
||||
return { lastComputeAt: 0, inFlight: false, pending: false, deferredTimer: null };
|
||||
}
|
||||
|
||||
/**
|
||||
* Create and start a reflex scheduler.
|
||||
*
|
||||
* @param config Full NerveConfig (reads senses for throttle/timeout, reflexes for schedule).
|
||||
* @param bus SignalBus to subscribe for event-driven reflexes.
|
||||
* @param triggerFn Called with the sense name when a compute should be dispatched.
|
||||
* @returns ReflexScheduler with stop() and onComputeComplete() methods.
|
||||
*/
|
||||
export function createReflexScheduler(
|
||||
config: NerveConfig,
|
||||
bus: SignalBus,
|
||||
triggerFn: TriggerFn,
|
||||
): ReflexScheduler {
|
||||
const intervals: ReturnType<typeof setInterval>[] = [];
|
||||
const unsubscribers: Unsubscribe[] = [];
|
||||
const states = new Map<string, SenseState>();
|
||||
|
||||
function getState(senseName: string): SenseState {
|
||||
let state = states.get(senseName);
|
||||
if (state === undefined) {
|
||||
state = makeSenseState();
|
||||
states.set(senseName, state);
|
||||
}
|
||||
return state;
|
||||
}
|
||||
|
||||
function dispatchCompute(senseName: string): void {
|
||||
const state = getState(senseName);
|
||||
state.inFlight = true;
|
||||
state.pending = false;
|
||||
state.lastComputeAt = Date.now();
|
||||
triggerFn(senseName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to trigger compute for a sense.
|
||||
* Respects throttle window and merge/coalesce semantics.
|
||||
* If within throttle window, schedules a single deferred trigger at window end (fix #3).
|
||||
*/
|
||||
function maybeTrigger(senseName: string): void {
|
||||
const senseConfig = config.senses[senseName];
|
||||
const throttleMs = senseConfig?.throttle ?? null;
|
||||
const state = getState(senseName);
|
||||
const now = Date.now();
|
||||
|
||||
if (throttleMs !== null) {
|
||||
const elapsed = now - state.lastComputeAt;
|
||||
if (elapsed < throttleMs) {
|
||||
// Schedule one deferred trigger at the end of the throttle window (no stacking)
|
||||
if (state.deferredTimer === null) {
|
||||
const remaining = throttleMs - elapsed;
|
||||
state.deferredTimer = setTimeout(() => {
|
||||
state.deferredTimer = null;
|
||||
maybeTrigger(senseName);
|
||||
}, remaining);
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (state.inFlight) {
|
||||
state.pending = true;
|
||||
return;
|
||||
}
|
||||
|
||||
dispatchCompute(senseName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by the kernel when a compute cycle completes (signal or error received).
|
||||
* Drains the pending flag if set.
|
||||
*/
|
||||
function onComputeComplete(senseName: string): void {
|
||||
const state = states.get(senseName);
|
||||
if (state === undefined) return;
|
||||
|
||||
state.inFlight = false;
|
||||
|
||||
if (!state.pending) return;
|
||||
|
||||
const senseConfig = config.senses[senseName];
|
||||
const throttleMs = senseConfig?.throttle ?? null;
|
||||
const now = Date.now();
|
||||
|
||||
if (throttleMs !== null && now - state.lastComputeAt < throttleMs) {
|
||||
// Schedule deferred trigger if not already pending (fix #3)
|
||||
if (state.deferredTimer === null) {
|
||||
const remaining = throttleMs - (now - state.lastComputeAt);
|
||||
state.deferredTimer = setTimeout(() => {
|
||||
state.deferredTimer = null;
|
||||
const s = states.get(senseName);
|
||||
if (s !== undefined && !s.inFlight) {
|
||||
dispatchCompute(senseName);
|
||||
}
|
||||
}, remaining);
|
||||
}
|
||||
state.pending = false;
|
||||
return;
|
||||
}
|
||||
|
||||
dispatchCompute(senseName);
|
||||
}
|
||||
|
||||
for (const reflex of config.reflexes) {
|
||||
if (reflex.kind !== "sense") continue;
|
||||
const senseReflex = reflex;
|
||||
const senseName = senseReflex.sense;
|
||||
|
||||
if (senseReflex.interval !== null) {
|
||||
const id = setInterval(() => {
|
||||
maybeTrigger(senseName);
|
||||
}, senseReflex.interval);
|
||||
intervals.push(id);
|
||||
}
|
||||
|
||||
if (senseReflex.on !== null && senseReflex.on.length > 0) {
|
||||
const watchedSenses = new Set(senseReflex.on);
|
||||
const unsub = bus.subscribe((signal) => {
|
||||
if (watchedSenses.has(signal.senseId)) {
|
||||
maybeTrigger(senseName);
|
||||
}
|
||||
});
|
||||
unsubscribers.push(unsub);
|
||||
}
|
||||
}
|
||||
|
||||
function stop(): void {
|
||||
for (const id of intervals) {
|
||||
clearInterval(id);
|
||||
}
|
||||
for (const unsub of unsubscribers) {
|
||||
unsub();
|
||||
}
|
||||
for (const state of states.values()) {
|
||||
if (state.deferredTimer !== null) {
|
||||
clearTimeout(state.deferredTimer);
|
||||
state.deferredTimer = null;
|
||||
}
|
||||
}
|
||||
intervals.length = 0;
|
||||
unsubscribers.length = 0;
|
||||
}
|
||||
|
||||
return { onComputeComplete, stop };
|
||||
}
|
||||
@@ -0,0 +1,51 @@
|
||||
/**
|
||||
* In-memory signal bus for routing signals between sense workers and reflex subscribers.
|
||||
* Synchronous dispatch — no persistence, no async queuing.
|
||||
*
|
||||
* If a handler throws, the error is logged and remaining handlers still run.
|
||||
* The first error is re-thrown after all handlers complete so callers can observe it.
|
||||
*/
|
||||
|
||||
import type { Signal } from "@uncaged/nerve-core";
|
||||
|
||||
export type SignalHandler = (signal: Signal) => void;
|
||||
export type Unsubscribe = () => void;
|
||||
|
||||
export type SignalBus = {
|
||||
emit: (signal: Signal) => void;
|
||||
subscribe: (handler: SignalHandler) => Unsubscribe;
|
||||
};
|
||||
|
||||
export function createSignalBus(): SignalBus {
|
||||
const handlers = new Set<SignalHandler>();
|
||||
|
||||
function emit(signal: Signal): void {
|
||||
let firstError: unknown = null;
|
||||
let hasError = false;
|
||||
|
||||
for (const handler of handlers) {
|
||||
try {
|
||||
handler(signal);
|
||||
} catch (e) {
|
||||
console.error("[signal-bus] handler error:", e);
|
||||
if (!hasError) {
|
||||
firstError = e;
|
||||
hasError = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (hasError) {
|
||||
throw firstError;
|
||||
}
|
||||
}
|
||||
|
||||
function subscribe(handler: SignalHandler): Unsubscribe {
|
||||
handlers.add(handler);
|
||||
return () => {
|
||||
handlers.delete(handler);
|
||||
};
|
||||
}
|
||||
|
||||
return { emit, subscribe };
|
||||
}
|
||||
@@ -2,7 +2,8 @@
|
||||
"extends": "../../tsconfig.json",
|
||||
"compilerOptions": {
|
||||
"outDir": "dist",
|
||||
"rootDir": "src"
|
||||
"rootDir": "src",
|
||||
"composite": false
|
||||
},
|
||||
"include": ["src"]
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { defineConfig } from "tsup";
|
||||
|
||||
export default defineConfig({
|
||||
entry: ["src/index.ts"],
|
||||
entry: ["src/index.ts", "src/sense-worker.ts"],
|
||||
format: ["esm"],
|
||||
dts: true,
|
||||
clean: true,
|
||||
|
||||
Generated
+38
-3
@@ -18,7 +18,21 @@ importers:
|
||||
specifier: ^5.5.0
|
||||
version: 5.9.3
|
||||
|
||||
packages/cli: {}
|
||||
packages/cli:
|
||||
dependencies:
|
||||
'@uncaged/nerve-core':
|
||||
specifier: workspace:*
|
||||
version: link:../core
|
||||
'@uncaged/nerve-daemon':
|
||||
specifier: workspace:*
|
||||
version: link:../daemon
|
||||
citty:
|
||||
specifier: ^0.1.6
|
||||
version: 0.1.6
|
||||
devDependencies:
|
||||
'@types/node':
|
||||
specifier: ^22.0.0
|
||||
version: 22.19.17
|
||||
|
||||
packages/core:
|
||||
dependencies:
|
||||
@@ -552,6 +566,9 @@ packages:
|
||||
'@types/estree@1.0.8':
|
||||
resolution: {integrity: sha512-dWHzHa2WqEXI/O1E9OjrocMTKJl2mSrEolh1Iomrv6U+JuNwaHXsXx9bLu5gG7BUWFIN0skIQJQ/L1rIex4X6w==}
|
||||
|
||||
'@types/node@22.19.17':
|
||||
resolution: {integrity: sha512-wGdMcf+vPYM6jikpS/qhg6WiqSV/OhG+jeeHT/KlVqxYfD40iYJf9/AE1uQxVWFvU7MipKRkRv8NSHiCGgPr8Q==}
|
||||
|
||||
'@types/node@25.6.0':
|
||||
resolution: {integrity: sha512-+qIYRKdNYJwY3vRCZMdJbPLJAtGjQBudzZzdzwQYkEPQd+PJGixUL5QfvCLDaULoLv+RhT3LDkwEfKaAkgSmNQ==}
|
||||
|
||||
@@ -632,6 +649,9 @@ packages:
|
||||
chownr@1.1.4:
|
||||
resolution: {integrity: sha512-jJ0bqzaylmJtVnNgzTeSOs8DPavpbYgEr/b0YL8/2GO3xJEhInFmhKMUnEJQjZumK7KXGFhUy89PrsJWlakBVg==}
|
||||
|
||||
citty@0.1.6:
|
||||
resolution: {integrity: sha512-tskPPKEs8D2KPafUypv2gxwJP8h/OaJmC82QQGGDQcHvXX43xF2VDACcJVmZ0EuSxkpO9Kc4MlrA3q0+FG58AQ==}
|
||||
|
||||
commander@4.1.1:
|
||||
resolution: {integrity: sha512-NOKm8xhkzAjzFx8B2v5OAHT+u5pRQc2UCa2Vq9jYL/31o2wi9mxBA7LIFs3sV5VSC49z6pEhfbMULvShKj26WA==}
|
||||
engines: {node: '>= 6'}
|
||||
@@ -1135,6 +1155,9 @@ packages:
|
||||
ufo@1.6.3:
|
||||
resolution: {integrity: sha512-yDJTmhydvl5lJzBmy/hyOAA0d+aqCBuwl818haVdYCRrWV84o7YyeVm4QlVHStqNrrJSTb6jKuFAVqAFsr+K3Q==}
|
||||
|
||||
undici-types@6.21.0:
|
||||
resolution: {integrity: sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==}
|
||||
|
||||
undici-types@7.19.2:
|
||||
resolution: {integrity: sha512-qYVnV5OEm2AW8cJMCpdV20CDyaN3g0AjDlOGf1OW4iaDEx8MwdtChUp4zu4H0VP3nDRF/8RKWH+IPp9uW0YGZg==}
|
||||
|
||||
@@ -1527,7 +1550,7 @@ snapshots:
|
||||
|
||||
'@types/better-sqlite3@7.6.13':
|
||||
dependencies:
|
||||
'@types/node': 25.6.0
|
||||
'@types/node': 22.19.17
|
||||
|
||||
'@types/chai@5.2.3':
|
||||
dependencies:
|
||||
@@ -1538,9 +1561,14 @@ snapshots:
|
||||
|
||||
'@types/estree@1.0.8': {}
|
||||
|
||||
'@types/node@22.19.17':
|
||||
dependencies:
|
||||
undici-types: 6.21.0
|
||||
|
||||
'@types/node@25.6.0':
|
||||
dependencies:
|
||||
undici-types: 7.19.2
|
||||
optional: true
|
||||
|
||||
'@vitest/expect@4.1.5':
|
||||
dependencies:
|
||||
@@ -1626,6 +1654,10 @@ snapshots:
|
||||
|
||||
chownr@1.1.4: {}
|
||||
|
||||
citty@0.1.6:
|
||||
dependencies:
|
||||
consola: 3.4.2
|
||||
|
||||
commander@4.1.1: {}
|
||||
|
||||
confbox@0.1.8: {}
|
||||
@@ -2050,7 +2082,10 @@ snapshots:
|
||||
|
||||
ufo@1.6.3: {}
|
||||
|
||||
undici-types@7.19.2: {}
|
||||
undici-types@6.21.0: {}
|
||||
|
||||
undici-types@7.19.2:
|
||||
optional: true
|
||||
|
||||
util-deprecate@1.0.2: {}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user