From c80a6b9fa881133bd9df2c6d43242a020ac314fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Wed, 22 Apr 2026 08:42:03 +0000 Subject: [PATCH] fix(daemon): address all 6 PR review items MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🔴 Must fix: 1. compute timeout — AbortController + configurable timeoutMs, soft timeout returns error Result (RFC §5.3) 2. migration journal — _migrations table tracks applied files, skips already-executed migrations 3. IPC validation — parseParentMessage() validates type field, rejects malformed messages with Result error ⚠️ Also fixed: 4. Self DB excluded from peers map 5. Per-sense compute serialization (mutex via Promise chain) 6. Unhandled rejection — .catch() on compute promise, errors sent via IPC +10 new tests (25 total), biome check + vitest all green. 小橘 🍊(NEKO Team) --- .../src/__tests__/sense-runtime.test.ts | 130 ++++++++++++++++++ packages/daemon/src/ipc.ts | 19 +++ packages/daemon/src/sense-runtime.ts | 130 ++++++++++++++---- packages/daemon/src/sense-worker.ts | 45 +++++- 4 files changed, 292 insertions(+), 32 deletions(-) diff --git a/packages/daemon/src/__tests__/sense-runtime.test.ts b/packages/daemon/src/__tests__/sense-runtime.test.ts index 71a6e48..fddf8be 100644 --- a/packages/daemon/src/__tests__/sense-runtime.test.ts +++ b/packages/daemon/src/__tests__/sense-runtime.test.ts @@ -7,6 +7,7 @@ import { drizzle } from "drizzle-orm/better-sqlite3"; import { integer, real, sqliteTable } from "drizzle-orm/sqlite-core"; import { describe, expect, it } from "vitest"; +import { parseParentMessage } from "../ipc.js"; import { executeCompute, openPeerDb, openSenseDb, runMigrations } from "../sense-runtime.js"; import type { DrizzleDB, PeerMap, SenseRuntime } from "../sense-runtime.js"; @@ -299,4 +300,133 @@ describe("executeCompute", () => { expect(rows[0].value).toBe(1.23); dbSqlite.close(); }); + + it("returns err when compute exceeds timeoutMs", async () => { + const { runtime, sqlite } = makeRuntime( + (_db, _peers, options) => + new Promise((resolve, reject) => { + const t = setTimeout(() => resolve(null), 5_000); + options?.signal.addEventListener("abort", () => { + clearTimeout(t); + reject(new Error("aborted")); + }); + }), + ); + + const result = await executeCompute(runtime, emptyPeers, 50); + expect(result.ok).toBe(false); + if (result.ok) return; + expect(result.error.message).toMatch(/timed out/i); + sqlite.close(); + }); + + it("completes within timeout when compute is fast", async () => { + const { runtime, sqlite } = makeRuntime(async () => 42); + const result = await executeCompute(runtime, emptyPeers, 5_000); + expect(result.ok).toBe(true); + if (!result.ok) return; + expect(result.value).toBe(42); + sqlite.close(); + }); + + it("passes AbortSignal to compute fn", async () => { + let capturedSignal: AbortSignal | undefined; + const { runtime, sqlite } = makeRuntime(async (_db, _peers, options) => { + capturedSignal = options?.signal; + return null; + }); + + await executeCompute(runtime, emptyPeers, 1_000); + expect(capturedSignal).toBeInstanceOf(AbortSignal); + sqlite.close(); + }); +}); + +// --------------------------------------------------------------------------- +// parseParentMessage (IPC validation) +// --------------------------------------------------------------------------- + +describe("parseParentMessage", () => { + it("accepts a valid compute message", () => { + const result = parseParentMessage({ type: "compute", sense: "cpu" }); + expect(result.ok).toBe(true); + if (!result.ok) return; + expect(result.value.type).toBe("compute"); + }); + + it("accepts a valid shutdown message", () => { + const result = parseParentMessage({ type: "shutdown" }); + expect(result.ok).toBe(true); + if (!result.ok) return; + expect(result.value.type).toBe("shutdown"); + }); + + it("returns err for non-object input", () => { + expect(parseParentMessage(null).ok).toBe(false); + expect(parseParentMessage("string").ok).toBe(false); + expect(parseParentMessage(42).ok).toBe(false); + }); + + it("returns err when type field is missing", () => { + const result = parseParentMessage({ sense: "cpu" }); + expect(result.ok).toBe(false); + if (result.ok) return; + expect(result.error.message).toMatch(/type/); + }); + + it("returns err for unknown type value", () => { + const result = parseParentMessage({ type: "unknown" }); + expect(result.ok).toBe(false); + if (result.ok) return; + expect(result.error.message).toMatch(/unknown/i); + }); +}); + +// --------------------------------------------------------------------------- +// runMigrations – journal (idempotency) +// --------------------------------------------------------------------------- + +describe("runMigrations journal", () => { + it("does not re-run an already-applied migration", () => { + const sqlite = new Database(":memory:"); + const dir = mkdtempSync(join(tmpdir(), "nerve-journal-")); + writeFileSync(join(dir, "0001_init.sql"), INIT_SQL); + + const first = runMigrations(sqlite, dir); + expect(first.ok).toBe(true); + + // Insert a row so we can verify second run doesn't fail on CREATE TABLE + sqlite.exec("INSERT INTO samples (ts, value) VALUES (1, 1.0)"); + + // Run again — migration must NOT re-run (would fail without IF NOT EXISTS but + // the journal prevents it even for non-idempotent SQL) + const nonIdempotentSql = "CREATE TABLE samples2 (id INTEGER PRIMARY KEY)"; + writeFileSync(join(dir, "0002_samples2.sql"), nonIdempotentSql); + + // First time: creates samples2 + const second = runMigrations(sqlite, dir); + expect(second.ok).toBe(true); + + // Second time: 0002 already in journal, must not re-run + const third = runMigrations(sqlite, dir); + expect(third.ok).toBe(true); + + sqlite.close(); + }); + + it("tracks migrations in _migrations table", () => { + const sqlite = new Database(":memory:"); + const dir = mkdtempSync(join(tmpdir(), "nerve-journal2-")); + writeFileSync(join(dir, "0001_init.sql"), INIT_SQL); + + runMigrations(sqlite, dir); + + const rows = sqlite.prepare("SELECT name FROM _migrations ORDER BY name").all() as Array<{ + name: string; + }>; + expect(rows).toHaveLength(1); + expect(rows[0].name).toBe("0001_init.sql"); + + sqlite.close(); + }); }); diff --git a/packages/daemon/src/ipc.ts b/packages/daemon/src/ipc.ts index 0aa7f85..cfd0a08 100644 --- a/packages/daemon/src/ipc.ts +++ b/packages/daemon/src/ipc.ts @@ -3,6 +3,9 @@ * Protocol per RFC §5.2: hub-and-spoke, all messages through engine. */ +import type { Result } from "@uncaged/nerve-core"; +import { err, ok } from "@uncaged/nerve-core"; + /** Parent → Worker: trigger one compute cycle for a sense */ export type ComputeMessage = { type: "compute"; @@ -38,3 +41,19 @@ export type ReadyMessage = { /** Union of all messages a worker sends to the parent */ export type WorkerToParentMessage = SignalMessage | ErrorMessage | ReadyMessage; + +/** Validate and parse an unknown IPC message received from the parent process. */ +export function parseParentMessage(raw: unknown): Result { + if (raw === null || typeof raw !== "object") { + return err(new Error("IPC message is not an object")); + } + const obj = raw as Record; + if (typeof obj.type !== "string") { + return err(new Error("IPC message missing string 'type' field")); + } + const type = obj.type; + if (type !== "compute" && type !== "shutdown") { + return err(new Error(`Unknown IPC message type: "${type}"`)); + } + return ok(raw as ParentToWorkerMessage); +} diff --git a/packages/daemon/src/sense-runtime.ts b/packages/daemon/src/sense-runtime.ts index e9eb441..7946f5d 100644 --- a/packages/daemon/src/sense-runtime.ts +++ b/packages/daemon/src/sense-runtime.ts @@ -14,12 +14,21 @@ export type DrizzleDB = BetterSQLite3Database>; /** Read-only map of peer sense name → their Drizzle DB */ export type PeerMap = Readonly>; +/** Options passed to a compute function */ +export type ComputeOptions = { + signal: AbortSignal; +}; + /** * The shape every sense's index.ts must export. - * Engine injects `db` (read-write) and `peers` (read-only). + * Engine injects `db` (read-write), `peers` (read-only), and `options`. * Returns T when a signal should be emitted, null for silence. */ -export type ComputeFn = (db: DrizzleDB, peers: PeerMap) => Promise; +export type ComputeFn = ( + db: DrizzleDB, + peers: PeerMap, + options?: ComputeOptions, +) => Promise; /** All state held for one sense inside a worker */ export type SenseRuntime = { @@ -28,38 +37,81 @@ export type SenseRuntime = { compute: ComputeFn; }; -/** - * Run all *.sql migration files in the given directory against a - * better-sqlite3 Database, in lexicographic order. - */ -export function runMigrations(sqlite: Database.Database, migrationsDir: string): Result { - let files: string[]; - +function ensureMigrationsTable(sqlite: Database.Database): Result { try { - files = readdirSync(migrationsDir) + sqlite.exec( + `CREATE TABLE IF NOT EXISTS _migrations ( + name TEXT PRIMARY KEY, + applied_at INTEGER NOT NULL + )`, + ); + return ok(undefined); + } catch (e) { + const msg = e instanceof Error ? e.message : String(e); + return err(new Error(`Failed to create _migrations table: ${msg}`)); + } +} + +function listMigrationFiles(migrationsDir: string): Result { + try { + const files = readdirSync(migrationsDir) .filter((f) => f.endsWith(".sql")) .sort(); + return ok(files); } catch (e) { const msg = e instanceof Error ? e.message : String(e); return err(new Error(`Failed to read migrations directory "${migrationsDir}": ${msg}`)); } +} - for (const file of files) { - const filePath = join(migrationsDir, file); - let sql: string; - try { - sql = readFileSync(filePath, "utf8"); - } catch (e) { - const msg = e instanceof Error ? e.message : String(e); - return err(new Error(`Failed to read migration file "${filePath}": ${msg}`)); - } +function applyMigrationFile( + sqlite: Database.Database, + file: string, + filePath: string, +): Result { + let sql: string; + try { + sql = readFileSync(filePath, "utf8"); + } catch (e) { + const msg = e instanceof Error ? e.message : String(e); + return err(new Error(`Failed to read migration file "${filePath}": ${msg}`)); + } - try { + const insertJournal = sqlite.prepare("INSERT INTO _migrations (name, applied_at) VALUES (?, ?)"); + try { + sqlite.transaction(() => { sqlite.exec(sql); - } catch (e) { - const msg = e instanceof Error ? e.message : String(e); - return err(new Error(`Migration "${file}" failed: ${msg}`)); - } + insertJournal.run(file, Date.now()); + })(); + return ok(undefined); + } catch (e) { + const msg = e instanceof Error ? e.message : String(e); + return err(new Error(`Migration "${file}" failed: ${msg}`)); + } +} + +/** + * Run all *.sql migration files in the given directory against a + * better-sqlite3 Database, in lexicographic order. + * Tracks applied migrations in _migrations table to avoid re-running. + */ +export function runMigrations(sqlite: Database.Database, migrationsDir: string): Result { + const tableResult = ensureMigrationsTable(sqlite); + if (!tableResult.ok) return tableResult; + + const filesResult = listMigrationFiles(migrationsDir); + if (!filesResult.ok) return filesResult; + + const applied = new Set( + (sqlite.prepare("SELECT name FROM _migrations").all() as Array<{ name: string }>).map( + (r) => r.name, + ), + ); + + for (const file of filesResult.value) { + if (applied.has(file)) continue; + const result = applyMigrationFile(sqlite, file, join(migrationsDir, file)); + if (!result.ok) return result; } return ok(undefined); @@ -136,18 +188,42 @@ export async function loadComputeFn(senseIndexPath: string): Promise> { + const controller = new AbortController(); + const options: ComputeOptions = { signal: controller.signal }; + + let timer: ReturnType | undefined; + const timeoutPromise = + timeoutMs !== undefined + ? new Promise((_, reject) => { + timer = setTimeout(() => { + controller.abort(); + reject(new Error(`compute("${runtime.name}") timed out after ${timeoutMs}ms`)); + }, timeoutMs); + }) + : null; + try { - const result = await runtime.compute(runtime.db, peers); + const computePromise = runtime.compute(runtime.db, peers, options); + const result = timeoutPromise + ? await Promise.race([computePromise, timeoutPromise]) + : await computePromise; return ok(result); } catch (e) { const msg = e instanceof Error ? e.message : String(e); + if (controller.signal.aborted) { + return err(new Error(`compute("${runtime.name}") timed out after ${timeoutMs as number}ms`)); + } return err(new Error(`compute("${runtime.name}") threw: ${msg}`)); + } finally { + if (timer !== undefined) clearTimeout(timer); } } diff --git a/packages/daemon/src/sense-worker.ts b/packages/daemon/src/sense-worker.ts index b5320db..f5f5dc0 100644 --- a/packages/daemon/src/sense-worker.ts +++ b/packages/daemon/src/sense-worker.ts @@ -19,7 +19,8 @@ import { join, resolve } from "node:path"; import { parseNerveConfig } from "@uncaged/nerve-core"; import type { NerveConfig } from "@uncaged/nerve-core"; -import type { ParentToWorkerMessage, WorkerToParentMessage } from "./ipc.js"; +import type { WorkerToParentMessage } from "./ipc.js"; +import { parseParentMessage } from "./ipc.js"; import { executeCompute, loadComputeFn, openPeerDb, openSenseDb } from "./sense-runtime.js"; import type { DrizzleDB, PeerMap, SenseRuntime } from "./sense-runtime.js"; @@ -100,10 +101,14 @@ function buildPeers( nerveRoot: string, allSenseNames: string[], ownDbs: Map, + groupSenseNames: Set, ): PeerMap { const entries: [string, DrizzleDB][] = []; for (const peerName of allSenseNames) { + // Exclude senses that belong to this worker's own group — they are not peers + if (groupSenseNames.has(peerName)) continue; + const own = ownDbs.get(peerName); if (own !== undefined) { entries.push([peerName, own]); @@ -129,8 +134,16 @@ function handleMessage( runtimes: Map, peers: PeerMap, group: string, + timeoutMs: number, + inFlight: Map>, ): void { - const msg = raw as ParentToWorkerMessage; + const parseResult = parseParentMessage(raw); + if (!parseResult.ok) { + process.stderr.write(`[sense-worker] Invalid IPC message: ${parseResult.error.message}\n`); + return; + } + + const msg = parseResult.value; if (msg.type === "shutdown") { process.exit(0); @@ -143,7 +156,10 @@ function handleMessage( return; } - executeCompute(runtime, peers).then((result) => { + // Serialize computes for the same sense + const previous = inFlight.get(msg.sense) ?? Promise.resolve(); + const next = previous.then(async () => { + const result = await executeCompute(runtime, peers, timeoutMs); if (!result.ok) { sendError(msg.sense, result.error.message); return; @@ -152,6 +168,13 @@ function handleMessage( sendSignal(msg.sense, result.value); } }); + + const tracked = next.catch((e: unknown) => { + const errMsg = e instanceof Error ? e.message : String(e); + sendError(msg.sense, errMsg); + }); + + inFlight.set(msg.sense, tracked); } } @@ -159,6 +182,8 @@ function handleMessage( // Bootstrap // --------------------------------------------------------------------------- +const DEFAULT_TIMEOUT_MS = 30_000; + async function bootstrap(nerveRoot: string, group: string): Promise { const config = readConfig(nerveRoot); @@ -180,12 +205,22 @@ async function bootstrap(nerveRoot: string, group: string): Promise { runtimes.set(senseName, runtime); } - const peers = buildPeers(nerveRoot, Object.keys(config.senses), ownDbs); + const groupSenseNames = new Set(groupSenses); + const peers = buildPeers(nerveRoot, Object.keys(config.senses), ownDbs, groupSenseNames); + + // Read timeout from config (uses first group sense's config, or default) + const firstSenseConfig = config.senses[groupSenses[0]]; + const timeoutMs = + typeof (firstSenseConfig as Record).timeoutMs === "number" + ? ((firstSenseConfig as Record).timeoutMs as number) + : DEFAULT_TIMEOUT_MS; + + const inFlight = new Map>(); sendReady(); process.on("message", (raw: unknown) => { - handleMessage(raw, runtimes, peers, group); + handleMessage(raw, runtimes, peers, group, timeoutMs, inFlight); }); }