This repository has been archived on 2026-06-01. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
nerve/packages/daemon/src/sense-worker.ts
T

333 lines
10 KiB
TypeScript

/**
* Sense Worker runtime bootstrap.
*
* Entry point for `nerve worker sense --group <name>`.
* Receives the group name via CLI args, reads nerve.yaml, initialises one
* SenseRuntime per sense in the group, then signals ready and enters the
* IPC event loop.
*
* Layout assumptions (nerve user config at `~/.uncaged-nerve/`):
* dist/senses/<name>/index.js ← bundled compute (esbuild)
* data/senses/<name>.json ← persisted sense state
* nerve.yaml ← config
*/
import "./experimental-warning-suppression.js";
import { spawn } from "node:child_process";
import { readFileSync } from "node:fs";
import { join, resolve } from "node:path";
import { parseNerveConfig } from "@uncaged/nerve-core";
import type { NerveConfig, SenseTrigger } from "@uncaged/nerve-core";
import type { WorkerToParentMessage } from "./ipc.js";
import { parseParentMessage } from "./ipc.js";
import { executeCompute, loadSenseModule, readState } from "./sense-runtime.js";
import type { SenseRuntime } from "./sense-runtime.js";
import { ignoreSessionBroadcastSignals } from "./worker-signals.js";
// ---------------------------------------------------------------------------
// IPC helpers
// ---------------------------------------------------------------------------
function send(msg: WorkerToParentMessage): void {
if (process.send) {
process.send(msg);
}
}
function sendReady(): void {
send({ type: "ready" });
}
function sendComputeResult(
sense: string,
value: { state: unknown; trigger: SenseTrigger | null },
): void {
send({ type: "compute-result", sense, state: value.state, trigger: value.trigger });
}
function executeShellTriggerIfNeeded(nerveRoot: string, trigger: SenseTrigger | null): void {
if (trigger === null || trigger.kind !== "shell") return;
const child = spawn(trigger.command, {
shell: true,
cwd: nerveRoot,
detached: true,
stdio: ["ignore", "ignore", "pipe"],
});
child.on("error", (err) => {
process.stderr.write(`[sense-worker] shell trigger failed: ${err.message}\n`);
});
if (child.stderr) {
let stderrBuf = "";
child.stderr.on("data", (chunk: Buffer) => {
stderrBuf += chunk.toString();
});
child.on("close", (code) => {
if (code !== null && code !== 0 && stderrBuf.length > 0) {
process.stderr.write(
`[sense-worker] shell trigger exited with code ${code}: ${stderrBuf.trimEnd()}\n`,
);
}
});
}
child.unref();
}
function sendError(sense: string, error: string): void {
send({ type: "error", sense, error });
}
// ---------------------------------------------------------------------------
// Initialisation helpers
// ---------------------------------------------------------------------------
function readConfig(nerveRoot: string): NerveConfig {
const configPath = join(nerveRoot, "nerve.yaml");
let configRaw: string;
try {
configRaw = readFileSync(configPath, "utf8");
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[sense-worker] Cannot read ${configPath}: ${msg}\n`);
process.exit(1);
}
const configResult = parseNerveConfig(configRaw);
if (!configResult.ok) {
process.stderr.write(`[sense-worker] Config parse error: ${configResult.error.message}\n`);
process.exit(1);
}
return configResult.value;
}
async function initSense(nerveRoot: string, senseName: string): Promise<SenseRuntime> {
const statePath = join(nerveRoot, "data", "senses", `${senseName}.json`);
const senseIndexPath = resolve(join(nerveRoot, "dist", "senses", senseName, "index.js"));
const moduleResult = await loadSenseModule(senseIndexPath);
if (!moduleResult.ok) {
throw new Error(`Failed to load module for "${senseName}": ${moduleResult.error.message}`);
}
const { compute, initialState } = moduleResult.value;
const state = readState(statePath, initialState);
return {
name: senseName,
compute,
state,
statePath,
};
}
// ---------------------------------------------------------------------------
// Grace period: hard kill after soft timeout
//
// Trade-off: grace period kills the entire worker process (process.exit), which
// terminates all senses in the group — not just the one that timed out. This is
// intentional: a hung compute may hold locks or corrupt shared state. Restarting
// the full worker ensures a clean slate, but other senses in the same group will
// lose any in-flight work until the kernel respawns the process.
// ---------------------------------------------------------------------------
const gracePeriodTimers = new Map<string, ReturnType<typeof setTimeout>>();
function scheduleGracePeriodKill(senseName: string, gracePeriodMs: number): void {
if (gracePeriodTimers.has(senseName)) return;
process.stderr.write(`[sense-worker] grace period for "${senseName}" (${gracePeriodMs}ms)\n`);
const timer = setTimeout(() => {
process.stderr.write(`[sense-worker] grace period expired for "${senseName}", hard kill\n`);
process.exit(1);
}, gracePeriodMs);
gracePeriodTimers.set(senseName, timer);
}
function clearGracePeriodTimer(senseName: string): void {
const timer = gracePeriodTimers.get(senseName);
if (timer === undefined) return;
clearTimeout(timer);
gracePeriodTimers.delete(senseName);
}
// ---------------------------------------------------------------------------
// Compute execution with error isolation
// ---------------------------------------------------------------------------
async function runCompute(
senseName: string,
runtime: SenseRuntime,
timeoutMs: number,
gracePeriodMs: number | null,
nerveRoot: string,
): Promise<void> {
try {
const result = await executeCompute(runtime, timeoutMs);
if (!result.ok) {
sendError(senseName, result.error.message);
if (gracePeriodMs !== null && result.error.message.includes("timed out")) {
scheduleGracePeriodKill(senseName, gracePeriodMs);
}
return;
}
clearGracePeriodTimer(senseName);
executeShellTriggerIfNeeded(nerveRoot, result.value.trigger);
sendComputeResult(senseName, result.value);
} catch (e: unknown) {
const errMsg = e instanceof Error ? e.message : String(e);
sendError(senseName, errMsg);
}
}
// ---------------------------------------------------------------------------
// IPC message dispatch
// ---------------------------------------------------------------------------
function handleMessage(
raw: unknown,
runtimes: Map<string, SenseRuntime>,
group: string,
senseConfigs: Map<string, { timeout: number | null; gracePeriod: number | null }>,
inFlight: Map<string, Promise<void>>,
nerveRoot: string,
): void {
const parseResult = parseParentMessage(raw);
if (!parseResult.ok) {
process.stderr.write(`[sense-worker] Invalid IPC message: ${parseResult.error.message}\n`);
return;
}
const msg = parseResult.value;
if (msg.type === "shutdown") {
process.exit(0);
}
if (msg.type === "health-request") {
send({
type: "health-response",
senses: [...runtimes.keys()],
inFlightCount: inFlight.size,
});
return;
}
if (msg.type !== "compute") return;
const runtime = runtimes.get(msg.sense);
if (!runtime) {
sendError(msg.sense, `Unknown sense "${msg.sense}" in group "${group}"`);
return;
}
const sc = senseConfigs.get(msg.sense);
const timeoutMs = sc?.timeout ?? DEFAULT_TIMEOUT_MS;
const gracePeriodMs = sc?.gracePeriod ?? null;
const previous = inFlight.get(msg.sense) ?? Promise.resolve();
const next = previous
.then(() => runCompute(msg.sense, runtime, timeoutMs, gracePeriodMs, nerveRoot))
.catch((e: unknown) => {
const errMsg = e instanceof Error ? e.message : String(e);
sendError(msg.sense, errMsg);
});
inFlight.set(msg.sense, next);
}
// ---------------------------------------------------------------------------
// Bootstrap
// ---------------------------------------------------------------------------
const DEFAULT_TIMEOUT_MS = 30_000;
async function bootstrap(nerveRoot: string, group: string): Promise<void> {
const config = readConfig(nerveRoot);
const groupSenses = Object.keys(config.senses).filter(
(name) => config.senses[name].group === group,
);
if (groupSenses.length === 0) {
process.stderr.write(`[sense-worker] No senses found for group "${group}"\n`);
process.exit(1);
}
const runtimes = new Map<string, SenseRuntime>();
const failedSenses: string[] = [];
for (const senseName of groupSenses) {
try {
const runtime = await initSense(nerveRoot, senseName);
runtimes.set(senseName, runtime);
} catch (e: unknown) {
const eMsg = e instanceof Error ? e.message : String(e);
process.stderr.write(
`[sense-worker] Failed to load sense "${senseName}", skipping: ${eMsg}\n`,
);
failedSenses.push(senseName);
}
}
if (runtimes.size === 0) {
process.stderr.write(`[sense-worker] All senses in group "${group}" failed to load, exiting\n`);
process.exit(1);
}
const senseConfigs = new Map<string, { timeout: number | null; gracePeriod: number | null }>();
for (const senseName of groupSenses) {
const sc = config.senses[senseName];
senseConfigs.set(senseName, {
timeout: sc?.timeout ?? null,
gracePeriod: sc?.gracePeriod ?? null,
});
}
const inFlight = new Map<string, Promise<void>>();
sendReady();
process.on("message", (raw: unknown) => {
handleMessage(raw, runtimes, group, senseConfigs, inFlight, nerveRoot);
});
}
// ---------------------------------------------------------------------------
// CLI entrypoint
// ---------------------------------------------------------------------------
function parseArgs(): { nerveRoot: string; group: string } | null {
const args = process.argv.slice(2);
let group: string | null = null;
let nerveRoot: string | null = null;
for (let i = 0; i < args.length; i++) {
if (args[i] === "--group" && i + 1 < args.length) {
group = args[i + 1];
i++;
} else if (args[i] === "--root" && i + 1 < args.length) {
nerveRoot = args[i + 1];
i++;
}
}
if (!group || !nerveRoot) return null;
return { nerveRoot, group };
}
const parsed = parseArgs();
if (!parsed) {
process.stderr.write("Usage: sense-worker --group <name> --root <nerve-root-dir>\n");
process.exit(1);
}
if (typeof process.send === "function") {
ignoreSessionBroadcastSignals();
}
bootstrap(parsed.nerveRoot, parsed.group).catch((e) => {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[sense-worker] Unhandled bootstrap error: ${msg}\n`);
process.exit(1);
});