fix(daemon): address all 6 PR review items

🔴 Must fix:
1. compute timeout — AbortController + configurable timeoutMs, soft timeout
   returns error Result (RFC §5.3)
2. migration journal — _migrations table tracks applied files, skips
   already-executed migrations
3. IPC validation — parseParentMessage() validates type field, rejects
   malformed messages with Result error

⚠️ Also fixed:
4. Self DB excluded from peers map
5. Per-sense compute serialization (mutex via Promise chain)
6. Unhandled rejection — .catch() on compute promise, errors sent via IPC

+10 new tests (25 total), biome check + vitest all green.

小橘 🍊(NEKO Team)
This commit is contained in:
2026-04-22 08:42:03 +00:00
parent a38986acdb
commit c80a6b9fa8
4 changed files with 292 additions and 32 deletions
@@ -7,6 +7,7 @@ import { drizzle } from "drizzle-orm/better-sqlite3";
import { integer, real, sqliteTable } from "drizzle-orm/sqlite-core";
import { describe, expect, it } from "vitest";
import { parseParentMessage } from "../ipc.js";
import { executeCompute, openPeerDb, openSenseDb, runMigrations } from "../sense-runtime.js";
import type { DrizzleDB, PeerMap, SenseRuntime } from "../sense-runtime.js";
@@ -299,4 +300,133 @@ describe("executeCompute", () => {
expect(rows[0].value).toBe(1.23);
dbSqlite.close();
});
it("returns err when compute exceeds timeoutMs", async () => {
const { runtime, sqlite } = makeRuntime(
(_db, _peers, options) =>
new Promise<null>((resolve, reject) => {
const t = setTimeout(() => resolve(null), 5_000);
options?.signal.addEventListener("abort", () => {
clearTimeout(t);
reject(new Error("aborted"));
});
}),
);
const result = await executeCompute(runtime, emptyPeers, 50);
expect(result.ok).toBe(false);
if (result.ok) return;
expect(result.error.message).toMatch(/timed out/i);
sqlite.close();
});
it("completes within timeout when compute is fast", async () => {
const { runtime, sqlite } = makeRuntime(async () => 42);
const result = await executeCompute(runtime, emptyPeers, 5_000);
expect(result.ok).toBe(true);
if (!result.ok) return;
expect(result.value).toBe(42);
sqlite.close();
});
it("passes AbortSignal to compute fn", async () => {
let capturedSignal: AbortSignal | undefined;
const { runtime, sqlite } = makeRuntime(async (_db, _peers, options) => {
capturedSignal = options?.signal;
return null;
});
await executeCompute(runtime, emptyPeers, 1_000);
expect(capturedSignal).toBeInstanceOf(AbortSignal);
sqlite.close();
});
});
// ---------------------------------------------------------------------------
// parseParentMessage (IPC validation)
// ---------------------------------------------------------------------------
describe("parseParentMessage", () => {
it("accepts a valid compute message", () => {
const result = parseParentMessage({ type: "compute", sense: "cpu" });
expect(result.ok).toBe(true);
if (!result.ok) return;
expect(result.value.type).toBe("compute");
});
it("accepts a valid shutdown message", () => {
const result = parseParentMessage({ type: "shutdown" });
expect(result.ok).toBe(true);
if (!result.ok) return;
expect(result.value.type).toBe("shutdown");
});
it("returns err for non-object input", () => {
expect(parseParentMessage(null).ok).toBe(false);
expect(parseParentMessage("string").ok).toBe(false);
expect(parseParentMessage(42).ok).toBe(false);
});
it("returns err when type field is missing", () => {
const result = parseParentMessage({ sense: "cpu" });
expect(result.ok).toBe(false);
if (result.ok) return;
expect(result.error.message).toMatch(/type/);
});
it("returns err for unknown type value", () => {
const result = parseParentMessage({ type: "unknown" });
expect(result.ok).toBe(false);
if (result.ok) return;
expect(result.error.message).toMatch(/unknown/i);
});
});
// ---------------------------------------------------------------------------
// runMigrations – journal (idempotency)
// ---------------------------------------------------------------------------
describe("runMigrations journal", () => {
it("does not re-run an already-applied migration", () => {
const sqlite = new Database(":memory:");
const dir = mkdtempSync(join(tmpdir(), "nerve-journal-"));
writeFileSync(join(dir, "0001_init.sql"), INIT_SQL);
const first = runMigrations(sqlite, dir);
expect(first.ok).toBe(true);
// Insert a row so we can verify second run doesn't fail on CREATE TABLE
sqlite.exec("INSERT INTO samples (ts, value) VALUES (1, 1.0)");
// Run again — migration must NOT re-run (would fail without IF NOT EXISTS but
// the journal prevents it even for non-idempotent SQL)
const nonIdempotentSql = "CREATE TABLE samples2 (id INTEGER PRIMARY KEY)";
writeFileSync(join(dir, "0002_samples2.sql"), nonIdempotentSql);
// First time: creates samples2
const second = runMigrations(sqlite, dir);
expect(second.ok).toBe(true);
// Second time: 0002 already in journal, must not re-run
const third = runMigrations(sqlite, dir);
expect(third.ok).toBe(true);
sqlite.close();
});
it("tracks migrations in _migrations table", () => {
const sqlite = new Database(":memory:");
const dir = mkdtempSync(join(tmpdir(), "nerve-journal2-"));
writeFileSync(join(dir, "0001_init.sql"), INIT_SQL);
runMigrations(sqlite, dir);
const rows = sqlite.prepare("SELECT name FROM _migrations ORDER BY name").all() as Array<{
name: string;
}>;
expect(rows).toHaveLength(1);
expect(rows[0].name).toBe("0001_init.sql");
sqlite.close();
});
});
+19
View File
@@ -3,6 +3,9 @@
* Protocol per RFC §5.2: hub-and-spoke, all messages through engine.
*/
import type { Result } from "@uncaged/nerve-core";
import { err, ok } from "@uncaged/nerve-core";
/** Parent → Worker: trigger one compute cycle for a sense */
export type ComputeMessage = {
type: "compute";
@@ -38,3 +41,19 @@ export type ReadyMessage = {
/** Union of all messages a worker sends to the parent */
export type WorkerToParentMessage = SignalMessage | ErrorMessage | ReadyMessage;
/** Validate and parse an unknown IPC message received from the parent process. */
export function parseParentMessage(raw: unknown): Result<ParentToWorkerMessage> {
if (raw === null || typeof raw !== "object") {
return err(new Error("IPC message is not an object"));
}
const obj = raw as Record<string, unknown>;
if (typeof obj.type !== "string") {
return err(new Error("IPC message missing string 'type' field"));
}
const type = obj.type;
if (type !== "compute" && type !== "shutdown") {
return err(new Error(`Unknown IPC message type: "${type}"`));
}
return ok(raw as ParentToWorkerMessage);
}
+103 -27
View File
@@ -14,12 +14,21 @@ export type DrizzleDB = BetterSQLite3Database<Record<string, never>>;
/** Read-only map of peer sense name → their Drizzle DB */
export type PeerMap = Readonly<Record<string, DrizzleDB>>;
/** Options passed to a compute function */
export type ComputeOptions = {
signal: AbortSignal;
};
/**
* The shape every sense's index.ts must export.
* Engine injects `db` (read-write) and `peers` (read-only).
* Engine injects `db` (read-write), `peers` (read-only), and `options`.
* Returns T when a signal should be emitted, null for silence.
*/
export type ComputeFn<T = unknown> = (db: DrizzleDB, peers: PeerMap) => Promise<T | null>;
export type ComputeFn<T = unknown> = (
db: DrizzleDB,
peers: PeerMap,
options?: ComputeOptions,
) => Promise<T | null>;
/** All state held for one sense inside a worker */
export type SenseRuntime = {
@@ -28,38 +37,81 @@ export type SenseRuntime = {
compute: ComputeFn;
};
/**
* Run all *.sql migration files in the given directory against a
* better-sqlite3 Database, in lexicographic order.
*/
export function runMigrations(sqlite: Database.Database, migrationsDir: string): Result<void> {
let files: string[];
function ensureMigrationsTable(sqlite: Database.Database): Result<void> {
try {
files = readdirSync(migrationsDir)
sqlite.exec(
`CREATE TABLE IF NOT EXISTS _migrations (
name TEXT PRIMARY KEY,
applied_at INTEGER NOT NULL
)`,
);
return ok(undefined);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
return err(new Error(`Failed to create _migrations table: ${msg}`));
}
}
function listMigrationFiles(migrationsDir: string): Result<string[]> {
try {
const files = readdirSync(migrationsDir)
.filter((f) => f.endsWith(".sql"))
.sort();
return ok(files);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
return err(new Error(`Failed to read migrations directory "${migrationsDir}": ${msg}`));
}
}
for (const file of files) {
const filePath = join(migrationsDir, file);
let sql: string;
try {
sql = readFileSync(filePath, "utf8");
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
return err(new Error(`Failed to read migration file "${filePath}": ${msg}`));
}
function applyMigrationFile(
sqlite: Database.Database,
file: string,
filePath: string,
): Result<void> {
let sql: string;
try {
sql = readFileSync(filePath, "utf8");
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
return err(new Error(`Failed to read migration file "${filePath}": ${msg}`));
}
try {
const insertJournal = sqlite.prepare("INSERT INTO _migrations (name, applied_at) VALUES (?, ?)");
try {
sqlite.transaction(() => {
sqlite.exec(sql);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
return err(new Error(`Migration "${file}" failed: ${msg}`));
}
insertJournal.run(file, Date.now());
})();
return ok(undefined);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
return err(new Error(`Migration "${file}" failed: ${msg}`));
}
}
/**
* Run all *.sql migration files in the given directory against a
* better-sqlite3 Database, in lexicographic order.
* Tracks applied migrations in _migrations table to avoid re-running.
*/
export function runMigrations(sqlite: Database.Database, migrationsDir: string): Result<void> {
const tableResult = ensureMigrationsTable(sqlite);
if (!tableResult.ok) return tableResult;
const filesResult = listMigrationFiles(migrationsDir);
if (!filesResult.ok) return filesResult;
const applied = new Set<string>(
(sqlite.prepare("SELECT name FROM _migrations").all() as Array<{ name: string }>).map(
(r) => r.name,
),
);
for (const file of filesResult.value) {
if (applied.has(file)) continue;
const result = applyMigrationFile(sqlite, file, join(migrationsDir, file));
if (!result.ok) return result;
}
return ok(undefined);
@@ -136,18 +188,42 @@ export async function loadComputeFn(senseIndexPath: string): Promise<Result<Comp
}
/**
* Execute a sense's compute function and return the raw result.
* Caller decides whether to emit a signal (non-null) or stay silent (null).
* Execute a sense's compute function with an optional soft timeout.
* If timeoutMs is provided and compute takes longer, the AbortSignal is
* triggered and an error Result is returned.
*/
export async function executeCompute(
runtime: SenseRuntime,
peers: PeerMap,
timeoutMs?: number,
): Promise<Result<unknown | null>> {
const controller = new AbortController();
const options: ComputeOptions = { signal: controller.signal };
let timer: ReturnType<typeof setTimeout> | undefined;
const timeoutPromise =
timeoutMs !== undefined
? new Promise<never>((_, reject) => {
timer = setTimeout(() => {
controller.abort();
reject(new Error(`compute("${runtime.name}") timed out after ${timeoutMs}ms`));
}, timeoutMs);
})
: null;
try {
const result = await runtime.compute(runtime.db, peers);
const computePromise = runtime.compute(runtime.db, peers, options);
const result = timeoutPromise
? await Promise.race([computePromise, timeoutPromise])
: await computePromise;
return ok(result);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
if (controller.signal.aborted) {
return err(new Error(`compute("${runtime.name}") timed out after ${timeoutMs as number}ms`));
}
return err(new Error(`compute("${runtime.name}") threw: ${msg}`));
} finally {
if (timer !== undefined) clearTimeout(timer);
}
}
+40 -5
View File
@@ -19,7 +19,8 @@ import { join, resolve } from "node:path";
import { parseNerveConfig } from "@uncaged/nerve-core";
import type { NerveConfig } from "@uncaged/nerve-core";
import type { ParentToWorkerMessage, WorkerToParentMessage } from "./ipc.js";
import type { WorkerToParentMessage } from "./ipc.js";
import { parseParentMessage } from "./ipc.js";
import { executeCompute, loadComputeFn, openPeerDb, openSenseDb } from "./sense-runtime.js";
import type { DrizzleDB, PeerMap, SenseRuntime } from "./sense-runtime.js";
@@ -100,10 +101,14 @@ function buildPeers(
nerveRoot: string,
allSenseNames: string[],
ownDbs: Map<string, DrizzleDB>,
groupSenseNames: Set<string>,
): PeerMap {
const entries: [string, DrizzleDB][] = [];
for (const peerName of allSenseNames) {
// Exclude senses that belong to this worker's own group — they are not peers
if (groupSenseNames.has(peerName)) continue;
const own = ownDbs.get(peerName);
if (own !== undefined) {
entries.push([peerName, own]);
@@ -129,8 +134,16 @@ function handleMessage(
runtimes: Map<string, SenseRuntime>,
peers: PeerMap,
group: string,
timeoutMs: number,
inFlight: Map<string, Promise<void>>,
): void {
const msg = raw as ParentToWorkerMessage;
const parseResult = parseParentMessage(raw);
if (!parseResult.ok) {
process.stderr.write(`[sense-worker] Invalid IPC message: ${parseResult.error.message}\n`);
return;
}
const msg = parseResult.value;
if (msg.type === "shutdown") {
process.exit(0);
@@ -143,7 +156,10 @@ function handleMessage(
return;
}
executeCompute(runtime, peers).then((result) => {
// Serialize computes for the same sense
const previous = inFlight.get(msg.sense) ?? Promise.resolve();
const next = previous.then(async () => {
const result = await executeCompute(runtime, peers, timeoutMs);
if (!result.ok) {
sendError(msg.sense, result.error.message);
return;
@@ -152,6 +168,13 @@ function handleMessage(
sendSignal(msg.sense, result.value);
}
});
const tracked = next.catch((e: unknown) => {
const errMsg = e instanceof Error ? e.message : String(e);
sendError(msg.sense, errMsg);
});
inFlight.set(msg.sense, tracked);
}
}
@@ -159,6 +182,8 @@ function handleMessage(
// Bootstrap
// ---------------------------------------------------------------------------
const DEFAULT_TIMEOUT_MS = 30_000;
async function bootstrap(nerveRoot: string, group: string): Promise<void> {
const config = readConfig(nerveRoot);
@@ -180,12 +205,22 @@ async function bootstrap(nerveRoot: string, group: string): Promise<void> {
runtimes.set(senseName, runtime);
}
const peers = buildPeers(nerveRoot, Object.keys(config.senses), ownDbs);
const groupSenseNames = new Set(groupSenses);
const peers = buildPeers(nerveRoot, Object.keys(config.senses), ownDbs, groupSenseNames);
// Read timeout from config (uses first group sense's config, or default)
const firstSenseConfig = config.senses[groupSenses[0]];
const timeoutMs =
typeof (firstSenseConfig as Record<string, unknown>).timeoutMs === "number"
? ((firstSenseConfig as Record<string, unknown>).timeoutMs as number)
: DEFAULT_TIMEOUT_MS;
const inFlight = new Map<string, Promise<void>>();
sendReady();
process.on("message", (raw: unknown) => {
handleMessage(raw, runtimes, peers, group);
handleMessage(raw, runtimes, peers, group, timeoutMs, inFlight);
});
}