Compare commits

...

10 Commits

Author SHA1 Message Date
xiaoju 418d8ee0c8 refactor(cli): replace sql.js with node:sqlite
Drop the sql.js WASM dependency in favour of Node 22's built-in
node:sqlite (DatabaseSync). This eliminates the ~2 MB WASM binary,
removes the async init ceremony, and lets us open databases in
readonly mode directly on disk instead of loading them into memory.

Breaking: requires Node >= 22.5.0 (sqlite support).

- Remove sql.js from cli dependencies
- Rewrite sense-sqlite.ts to use DatabaseSync
- Update sense command (schema/query) — sync API, no more queryAsObjects
- Update tests to use node:sqlite directly
- Remove sql.js from tsup externals

小橘 🍊(NEKO Team)
2026-04-23 08:43:39 +00:00
xiaomo 719c4c1449 Merge pull request 'refactor(cli): replace better-sqlite3 with sql.js (pure WASM) — implements RFC #63' (#64) from refactor/sql-js-migration into main 2026-04-23 07:32:38 +00:00
xiaoju c8bf4bf547 refactor(cli): replace better-sqlite3 with sql.js (pure WASM)
- Remove native C++ addon dependency, no more pnpm approve-builds
- sql.js loads SQLite as WASM, zero compilation required
- WASM init is singleton (once per process)
- Add queryAsObjects() adapter for sql.js columnar → row format
- Tests migrated to sql.js (16 passing)

Implements RFC #63
2026-04-23 07:25:08 +00:00
xiaoju 9b93c4a4d9 chore(cli): bump version to 0.1.8 2026-04-23 07:10:28 +00:00
xiaomo ca14c5f51d Merge pull request 'feat(cli): add nerve sense schema and query commands (closes #60)' (#62) from feat/sense-query into main 2026-04-23 07:06:02 +00:00
xiaomo 1979e0e16c Merge pull request 'refactor: replace dynamic imports with static imports in CLI' (#61) from refactor/static-imports into main 2026-04-23 07:04:31 +00:00
xiaoju b15fc993f2 feat(cli): add nerve sense schema and query commands
Open each sense SQLite file read-only under data/senses. schema lists CREATE TABLE SQL from sqlite_master; query runs optional SQL or a default SELECT ordered by rowid. Human output uses aligned columns; --json for machine-readable output. Add better-sqlite3 to the CLI package and externalize it in tsup.

Tests cover sense-sqlite helpers and integration against a temp database.

Made-with: Cursor
2026-04-23 07:01:16 +00:00
xiaomo fc76b862ad Merge pull request 'refactor(cli): replace dynamic imports with static imports — closes #57' (#59) from refactor/static-imports into main 2026-04-23 06:55:46 +00:00
xiaomo 96188c8cda Merge pull request 'fix(daemon): foreground worker signals and crash diagnostics (closes #55, closes #56)' (#58) from fix/dev-worker-crash into main 2026-04-23 06:48:33 +00:00
xiaoju f1458f8353 fix(daemon): foreground worker signals and crash diagnostics
Ignore SIGINT/SIGTERM only when fork IPC is active (process.send) so terminal signals do not race the kernel shutdown in nerve dev, without breaking standalone worker CLIs (fixes #55).

Pipe worker stderr through the parent with a rolling capture buffer; log exit signal name and stderr tail on worker exit (fixes #56). Apply the same exit logging to workflow workers.

Made-with: Cursor
2026-04-23 06:41:32 +00:00
10 changed files with 522 additions and 17 deletions
+2 -2
View File
@@ -1,6 +1,6 @@
{ {
"name": "@uncaged/nerve-cli", "name": "@uncaged/nerve-cli",
"version": "0.1.7", "version": "0.1.8",
"type": "module", "type": "module",
"bin": { "bin": {
"nerve": "dist/cli.js" "nerve": "dist/cli.js"
@@ -23,9 +23,9 @@
"citty": "^0.1.6" "citty": "^0.1.6"
}, },
"devDependencies": { "devDependencies": {
"@uncaged/nerve-daemon": "workspace:*",
"@types/better-sqlite3": "^7.6.13", "@types/better-sqlite3": "^7.6.13",
"@types/node": "^22.0.0", "@types/node": "^22.0.0",
"@uncaged/nerve-daemon": "workspace:*",
"vitest": "^4.1.5" "vitest": "^4.1.5"
} }
} }
@@ -0,0 +1,159 @@
/**
* Tests for sense SQLite helpers used by `nerve sense schema` / `nerve sense query`.
*/
import { mkdirSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { DatabaseSync } from "node:sqlite";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import {
assertSenseDbExists,
collectColumnKeys,
defaultPreviewSql,
formatRowsAsAlignedTable,
listTableSqlStatements,
parseSenseQueryArgs,
pickDefaultPreviewTable,
senseDbPath,
} from "../sense-sqlite.js";
let tmpDir: string;
beforeEach(() => {
tmpDir = join(
tmpdir(),
`nerve-sense-sqlite-${Date.now()}-${Math.random().toString(16).slice(2)}`,
);
mkdirSync(join(tmpDir, "data", "senses"), { recursive: true });
});
afterEach(() => {
rmSync(tmpDir, { recursive: true, force: true });
});
describe("senseDbPath", () => {
it("points at data/senses/<name>.db under the given root", () => {
expect(senseDbPath("/root", "cpu-usage")).toBe(join("/root", "data", "senses", "cpu-usage.db"));
});
});
describe("assertSenseDbExists", () => {
it("throws when the file is missing", () => {
expect(() => assertSenseDbExists(tmpDir, "nope")).toThrow(/No database at/);
});
it("returns the path when the file exists", () => {
const p = join(tmpDir, "data", "senses", "x.db");
new DatabaseSync(p).close();
expect(assertSenseDbExists(tmpDir, "x")).toBe(p);
});
});
describe("listTableSqlStatements", () => {
it("returns CREATE statements ordered by tbl_name", () => {
const p = join(tmpDir, "data", "senses", "t.db");
const db = new DatabaseSync(p);
db.exec("CREATE TABLE zebra (id INTEGER)");
db.exec("CREATE TABLE alpha (id INTEGER)");
const stmts = listTableSqlStatements(db);
db.close();
expect(stmts).toHaveLength(2);
expect(stmts[0]).toMatch(/^CREATE TABLE alpha/i);
expect(stmts[1]).toMatch(/^CREATE TABLE zebra/i);
});
});
describe("pickDefaultPreviewTable", () => {
it("prefers non-_migrations tables when both exist", () => {
const p = join(tmpDir, "data", "senses", "t.db");
const db = new DatabaseSync(p);
db.exec("CREATE TABLE _migrations (name TEXT PRIMARY KEY)");
db.exec("CREATE TABLE readings (id INTEGER)");
expect(pickDefaultPreviewTable(db)).toBe("readings");
db.close();
});
it("uses _migrations when it is the only table", () => {
const p = join(tmpDir, "data", "senses", "t.db");
const db = new DatabaseSync(p);
db.exec("CREATE TABLE _migrations (name TEXT PRIMARY KEY)");
expect(pickDefaultPreviewTable(db)).toBe("_migrations");
db.close();
});
});
describe("defaultPreviewSql", () => {
it("quotes identifiers for SQL safety", () => {
expect(defaultPreviewSql(`weird"name`)).toContain(`weird""name`);
});
});
describe("parseSenseQueryArgs", () => {
it("parses sense name only", () => {
expect(parseSenseQueryArgs(["cpu"])).toEqual({ name: "cpu", sql: undefined });
});
it("strips --json", () => {
expect(parseSenseQueryArgs(["cpu", "--json"])).toEqual({ name: "cpu", sql: undefined });
expect(parseSenseQueryArgs(["--json", "cpu"])).toEqual({ name: "cpu", sql: undefined });
});
it("joins remaining tokens into SQL", () => {
expect(parseSenseQueryArgs(["cpu", "SELECT", "1"])).toEqual({ name: "cpu", sql: "SELECT 1" });
});
it("throws when name is missing", () => {
expect(() => parseSenseQueryArgs(["--json"])).toThrow(/Missing sense name/);
});
});
describe("formatRowsAsAlignedTable", () => {
it("shows empty marker for no rows", () => {
expect(formatRowsAsAlignedTable([])).toContain("(0 rows)");
});
it("aligns columns from row data", () => {
const out = formatRowsAsAlignedTable([
{ a: 1, b: "x" },
{ a: 22, b: "yy" },
]);
expect(out).toContain("a");
expect(out).toContain("b");
expect(out).toContain("22");
});
});
describe("collectColumnKeys", () => {
it("preserves key order from first row then appends new keys", () => {
expect(
collectColumnKeys([
{ z: 1, a: 2 },
{ a: 3, b: 4 },
]),
).toEqual(["z", "a", "b"]);
});
});
describe("readonly query integration", () => {
it("runs default preview SQL on a real db", () => {
const p = join(tmpDir, "data", "senses", "demo.db");
const rw = new DatabaseSync(p);
rw.exec("CREATE TABLE items (id INTEGER PRIMARY KEY, v TEXT)");
rw.exec("INSERT INTO items (v) VALUES ('a'), ('b')");
rw.close();
const db = new DatabaseSync(p, { readOnly: true });
const table = pickDefaultPreviewTable(db);
expect(table).toBe("items");
if (table === null) {
throw new Error("expected items table");
}
const sql = defaultPreviewSql(table);
const rows = db.prepare(sql).all() as Record<string, unknown>[];
db.close();
expect(rows.length).toBeGreaterThanOrEqual(1);
});
});
+121 -1
View File
@@ -1,10 +1,20 @@
import { readFileSync } from "node:fs"; import { readFileSync } from "node:fs";
import { join } from "node:path"; import { join } from "node:path";
import { DatabaseSync } from "node:sqlite";
import { type SenseInfo, parseNerveConfig } from "@uncaged/nerve-core"; import { type SenseInfo, parseNerveConfig } from "@uncaged/nerve-core";
import { defineCommand } from "citty"; import { defineCommand } from "citty";
import { listSensesViaDaemon, triggerSenseViaDaemon } from "../daemon-client.js"; import { listSensesViaDaemon, triggerSenseViaDaemon } from "../daemon-client.js";
import {
assertSenseDbExists,
defaultPreviewSql,
formatRowsAsAlignedTable,
listTableSqlStatements,
openSenseDb,
parseSenseQueryArgs,
pickDefaultPreviewTable,
} from "../sense-sqlite.js";
import { getNerveRoot, getSocketPath, isRunning } from "../workspace.js"; import { getNerveRoot, getSocketPath, isRunning } from "../workspace.js";
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -70,7 +80,6 @@ const senseListCommand = defineCommand({
}, },
async run() { async run() {
if (!isRunning()) { if (!isRunning()) {
// Daemon not running — show static info from nerve.yaml
process.stderr.write( process.stderr.write(
"⚠️ Daemon is not running — showing static config only (no last signal time).\n\n", "⚠️ Daemon is not running — showing static config only (no last signal time).\n\n",
); );
@@ -139,6 +148,115 @@ const senseTriggerCommand = defineCommand({
}, },
}); });
// ---------------------------------------------------------------------------
// nerve sense schema <name>
// ---------------------------------------------------------------------------
const senseSchemaCommand = defineCommand({
meta: {
name: "schema",
description: "Print CREATE TABLE statements from a sense SQLite database",
},
args: {
name: {
type: "positional",
description: "Sense name (data/senses/<name>.db under the nerve workspace)",
},
json: {
type: "boolean",
description: "Print JSON array of CREATE TABLE SQL strings",
default: false,
},
},
async run({ args }) {
const nerveRoot = getNerveRoot();
let db: DatabaseSync | undefined;
try {
db = openSenseDb(nerveRoot, args.name);
const statements = listTableSqlStatements(db);
if (args.json) {
process.stdout.write(`${JSON.stringify(statements, null, 2)}\n`);
} else if (statements.length === 0) {
process.stdout.write("(no tables)\n");
} else {
for (const sql of statements) {
process.stdout.write(`${sql};\n\n`);
}
}
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`${msg}\n`);
process.exit(1);
} finally {
db?.close();
}
},
});
// ---------------------------------------------------------------------------
// nerve sense query <name> [sql...]
// ---------------------------------------------------------------------------
const senseQueryCommand = defineCommand({
meta: {
name: "query",
description:
"Run a read-only SQL query against a sense database (default: last 10 rows of the first data table). Pass optional SQL after the sense name; multiple words are joined.",
},
args: {
name: {
type: "positional",
description: "Sense name (data/senses/<name>.db under the nerve workspace)",
},
json: {
type: "boolean",
description: "Print result rows as JSON",
default: false,
},
},
async run({ args, rawArgs }) {
const nerveRoot = getNerveRoot();
let db: DatabaseSync | undefined;
try {
let parsed: { name: string; sql: string | undefined };
try {
parsed = parseSenseQueryArgs(rawArgs);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`${msg}\n`);
process.exit(1);
}
db = openSenseDb(nerveRoot, args.name);
let sql = parsed.sql?.trim();
if (!sql) {
const table = pickDefaultPreviewTable(db);
if (table === null) {
process.stderr.write("❌ No tables found in database.\n");
process.exit(1);
} else {
sql = defaultPreviewSql(table);
}
}
const rows = db.prepare(sql).all() as Record<string, unknown>[];
if (args.json) {
process.stdout.write(`${JSON.stringify(rows, null, 2)}\n`);
} else {
process.stdout.write(formatRowsAsAlignedTable(rows));
}
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`${msg}\n`);
process.exit(1);
} finally {
db?.close();
}
},
});
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// nerve sense (parent command) // nerve sense (parent command)
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -151,5 +269,7 @@ export const senseCommand = defineCommand({
subCommands: { subCommands: {
list: senseListCommand, list: senseListCommand,
trigger: senseTriggerCommand, trigger: senseTriggerCommand,
schema: senseSchemaCommand,
query: senseQueryCommand,
}, },
}); });
+127
View File
@@ -0,0 +1,127 @@
import { existsSync } from "node:fs";
import { join } from "node:path";
import { DatabaseSync } from "node:sqlite";
/** SQLite path for a sense under the nerve workspace root. */
export function senseDbPath(nerveRoot: string, senseName: string): string {
return join(nerveRoot, "data", "senses", `${senseName}.db`);
}
export function assertSenseDbExists(nerveRoot: string, senseName: string): string {
const path = senseDbPath(nerveRoot, senseName);
if (!existsSync(path)) {
throw new Error(`No database at ${path}`);
}
return path;
}
/** Open a sense SQLite database in readonly mode using node:sqlite. */
export function openSenseDb(nerveRoot: string, senseName: string): DatabaseSync {
const path = assertSenseDbExists(nerveRoot, senseName);
return new DatabaseSync(path, { readOnly: true });
}
/** `SELECT sql FROM sqlite_master WHERE type='table'` (non-null sql only). */
export function listTableSqlStatements(db: DatabaseSync): string[] {
const rows = db
.prepare(
`SELECT sql FROM sqlite_master WHERE type = 'table' AND sql IS NOT NULL ORDER BY tbl_name`,
)
.all() as { sql: string }[];
return rows.map((r) => r.sql);
}
/**
* Table used for `nerve sense query <name>` with no SQL.
* Prefers real data tables over `_migrations`, then lexicographic by name.
*/
export function pickDefaultPreviewTable(db: DatabaseSync): string | null {
const row = db
.prepare(
`SELECT name FROM sqlite_master
WHERE type = 'table' AND sql IS NOT NULL
AND name NOT LIKE 'sqlite\\_%' ESCAPE '\\'
ORDER BY
CASE WHEN name = '_migrations' THEN 1 ELSE 0 END,
name
LIMIT 1`,
)
.get() as { name: string } | undefined;
return row?.name ?? null;
}
export function defaultPreviewSql(table: string): string {
return `SELECT * FROM "${table.replace(/"/g, '""')}" ORDER BY rowid DESC LIMIT 10`;
}
/** Parse sense name and optional SQL from subcommand raw argv (flags stripped). */
export function parseSenseQueryArgs(rawArgs: string[]): { name: string; sql: string | undefined } {
const pos: string[] = [];
for (let i = 0; i < rawArgs.length; i++) {
const a = rawArgs[i];
if (a === "--json" || a === "--no-json") continue;
if (a.startsWith("-")) {
const eq = a.indexOf("=");
if (eq === -1 && i + 1 < rawArgs.length && !rawArgs[i + 1].startsWith("-")) {
i += 1;
}
continue;
}
pos.push(a);
}
if (pos.length < 1) {
throw new Error("Missing sense name");
}
const name = pos[0];
const sql = pos.length > 1 ? pos.slice(1).join(" ") : undefined;
return { name, sql };
}
function stringifyCell(value: unknown): string {
if (value === null || value === undefined) return "";
if (typeof value === "bigint") return value.toString();
if (typeof value === "number" || typeof value === "boolean") return String(value);
if (typeof value === "string") return value;
if (Buffer.isBuffer(value)) return value.toString("hex");
try {
return JSON.stringify(value);
} catch {
return String(value);
}
}
/** Collect column keys in stable order (first row keys, then any extras). */
export function collectColumnKeys(rows: Record<string, unknown>[]): string[] {
const keys: string[] = [];
const seen = new Set<string>();
for (const row of rows) {
for (const k of Object.keys(row)) {
if (!seen.has(k)) {
seen.add(k);
keys.push(k);
}
}
}
return keys;
}
const MAX_CELL = 64;
function truncate(s: string): string {
if (s.length <= MAX_CELL) return s;
return `${s.slice(0, MAX_CELL - 1)}`;
}
/** Plain aligned table for terminal output. */
export function formatRowsAsAlignedTable(rows: Record<string, unknown>[]): string {
if (rows.length === 0) {
return "(0 rows)\n";
}
const cols = collectColumnKeys(rows);
const cells = rows.map((row) => cols.map((c) => truncate(stringifyCell(row[c]))));
const widths = cols.map((c, j) => Math.max(c.length, ...cells.map((r) => r[j].length)));
const sep = widths.map((w) => "-".repeat(w)).join("-+-");
const header = cols.map((c, j) => c.padEnd(widths[j])).join(" | ");
const body = cells.map((r) => r.map((cell, j) => cell.padEnd(widths[j])).join(" | ")).join("\n");
return `${header}\n${sep}\n${body}\n`;
}
+18 -5
View File
@@ -33,6 +33,11 @@ import { createReflexScheduler } from "./reflex-scheduler.js";
import type { ReflexScheduler } from "./reflex-scheduler.js"; import type { ReflexScheduler } from "./reflex-scheduler.js";
import { createSignalBus } from "./signal-bus.js"; import { createSignalBus } from "./signal-bus.js";
import type { SignalBus } from "./signal-bus.js"; import type { SignalBus } from "./signal-bus.js";
import {
formatCapturedStderrTail,
formatChildExitSummary,
teeCapturedStderr,
} from "./worker-fork-support.js";
import { createWorkflowManager } from "./workflow-manager.js"; import { createWorkflowManager } from "./workflow-manager.js";
import type { WorkflowManager } from "./workflow-manager.js"; import type { WorkflowManager } from "./workflow-manager.js";
@@ -84,10 +89,16 @@ function resolveWorkerScript(): string {
return join(__dir, "sense-worker.js"); return join(__dir, "sense-worker.js");
} }
function spawnWorker(nerveRoot: string, group: string, workerScript: string): ChildProcess { function spawnWorker(
nerveRoot: string,
group: string,
workerScript: string,
stderrTail: { value: string },
): ChildProcess {
const child = fork(workerScript, ["--group", group, "--root", nerveRoot], { const child = fork(workerScript, ["--group", group, "--root", nerveRoot], {
stdio: ["ignore", "inherit", "inherit", "ipc"], stdio: ["ignore", "inherit", "pipe", "ipc"],
}); });
teeCapturedStderr(child, stderrTail);
// Prevent unhandled EPIPE when writing to a child whose IPC channel closed // Prevent unhandled EPIPE when writing to a child whose IPC channel closed
child.on("error", (err) => { child.on("error", (err) => {
if ((err as NodeJS.ErrnoException).code !== "EPIPE") { if ((err as NodeJS.ErrnoException).code !== "EPIPE") {
@@ -240,7 +251,8 @@ export function createKernel(
} }
function startWorker(group: string): Promise<void> { function startWorker(group: string): Promise<void> {
const child = spawnWorker(nerveRoot, group, workerScript); const stderrTail = { value: "" };
const child = spawnWorker(nerveRoot, group, workerScript, stderrTail);
let workerReadyResolve: (() => void) | undefined; let workerReadyResolve: (() => void) | undefined;
const workerReady = new Promise<void>((resolve) => { const workerReady = new Promise<void>((resolve) => {
@@ -255,9 +267,10 @@ export function createKernel(
handleWorkerMessage(raw); handleWorkerMessage(raw);
}); });
child.on("exit", (code) => { child.on("exit", (code, signal) => {
const summary = formatChildExitSummary(code, signal ?? null);
process.stderr.write( process.stderr.write(
`[kernel] worker for group "${group}" exited with code ${code ?? "null"}\n`, `[kernel] worker for group "${group}" exited (${summary})${formatCapturedStderrTail(stderrTail.value)}\n`,
); );
// Resolve ready in case the worker exits before sending ready (prevents hangs) // Resolve ready in case the worker exits before sending ready (prevents hangs)
workerReadyResolve?.(); workerReadyResolve?.();
+5
View File
@@ -25,6 +25,7 @@ import type { WorkerToParentMessage } from "./ipc.js";
import { parseParentMessage } from "./ipc.js"; import { parseParentMessage } from "./ipc.js";
import { executeCompute, loadComputeFn, openPeerDb, openSenseDb } from "./sense-runtime.js"; import { executeCompute, loadComputeFn, openPeerDb, openSenseDb } from "./sense-runtime.js";
import type { DrizzleDB, PeerMap, SenseRuntime } from "./sense-runtime.js"; import type { DrizzleDB, PeerMap, SenseRuntime } from "./sense-runtime.js";
import { ignoreSessionBroadcastSignals } from "./worker-fork-support.js";
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// IPC helpers // IPC helpers
@@ -336,6 +337,10 @@ if (!parsed) {
process.exit(1); process.exit(1);
} }
if (typeof process.send === "function") {
ignoreSessionBroadcastSignals();
}
bootstrap(parsed.nerveRoot, parsed.group).catch((e) => { bootstrap(parsed.nerveRoot, parsed.group).catch((e) => {
const msg = e instanceof Error ? e.message : String(e); const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[sense-worker] Unhandled bootstrap error: ${msg}\n`); process.stderr.write(`[sense-worker] Unhandled bootstrap error: ${msg}\n`);
@@ -0,0 +1,48 @@
import type { ChildProcess } from "node:child_process";
const STDERR_TAIL_MAX_CHARS = 16_384;
/**
* Forked workers inherit the parent's process group. In foreground `nerve dev`,
* terminal-driven SIGINT/SIGTERM is delivered to the whole group, so workers can exit
* on the default handler before the kernel sends `{ type: "shutdown" }` over IPC.
* Swallow these in worker processes so the parent coordinates shutdown (issue #55).
* Only call when `process.send` is defined (fork IPC); standalone `node …-worker.js` keeps default Ctrl+C behaviour.
*/
export function ignoreSessionBroadcastSignals(): void {
const swallow = (): void => {};
process.on("SIGINT", swallow);
process.on("SIGTERM", swallow);
}
export function teeCapturedStderr(child: ChildProcess, tail: { value: string }): void {
const stream = child.stderr;
if (stream === null || stream === undefined) return;
stream.setEncoding("utf8");
stream.on("data", (chunk: string | Buffer) => {
const text = typeof chunk === "string" ? chunk : chunk.toString("utf8");
process.stderr.write(text);
tail.value = (tail.value + text).slice(-STDERR_TAIL_MAX_CHARS);
});
}
export function formatChildExitSummary(
code: number | null,
signal: NodeJS.Signals | null,
): string {
const codeStr = code === null || code === undefined ? "null" : String(code);
if (signal) {
return `code=${codeStr} signal=${signal}`;
}
return `code=${codeStr}`;
}
export function formatCapturedStderrTail(tail: string, maxChars = 800): string {
const trimmed = tail.trim();
if (trimmed.length === 0) return "";
const normalized = trimmed.replace(/\r?\n/g, "\\n");
if (normalized.length <= maxChars) {
return ` worker_stderr=${normalized}`;
}
return ` worker_stderr=…${normalized.slice(-maxChars)}`;
}
+28 -7
View File
@@ -22,6 +22,11 @@ import type {
import { parseWorkerMessage } from "./ipc.js"; import { parseWorkerMessage } from "./ipc.js";
import type { LogStore } from "./log-store.js"; import type { LogStore } from "./log-store.js";
import type { WorkflowRunStatus } from "./log-store.js"; import type { WorkflowRunStatus } from "./log-store.js";
import {
formatCapturedStderrTail,
formatChildExitSummary,
teeCapturedStderr,
} from "./worker-fork-support.js";
export type WorkflowManager = { export type WorkflowManager = {
/** Trigger a new workflow thread (called by Reflex scheduler). */ /** Trigger a new workflow thread (called by Reflex scheduler). */
@@ -60,6 +65,7 @@ type WorkerEntry = {
stopping: boolean; stopping: boolean;
/** When set, the worker is draining before a hot-reload respawn. */ /** When set, the worker is draining before a hot-reload respawn. */
draining: boolean; draining: boolean;
stderrTail: { value: string };
}; };
// Crash respawn backoff: track crash timestamps per workflow. // Crash respawn backoff: track crash timestamps per workflow.
@@ -85,10 +91,12 @@ function spawnWorkflowWorker(
nerveRoot: string, nerveRoot: string,
workflowName: string, workflowName: string,
workerScript: string, workerScript: string,
stderrTail: { value: string },
): ChildProcess { ): ChildProcess {
const child = fork(workerScript, ["--workflow", workflowName, "--root", nerveRoot], { const child = fork(workerScript, ["--workflow", workflowName, "--root", nerveRoot], {
stdio: ["ignore", "inherit", "inherit", "ipc"], stdio: ["ignore", "inherit", "pipe", "ipc"],
}); });
teeCapturedStderr(child, stderrTail);
// Prevent unhandled EPIPE when writing to a child whose IPC channel closed // Prevent unhandled EPIPE when writing to a child whose IPC channel closed
child.on("error", (err) => { child.on("error", (err) => {
if ((err as NodeJS.ErrnoException).code !== "EPIPE") { if ((err as NodeJS.ErrnoException).code !== "EPIPE") {
@@ -395,7 +403,11 @@ export function createWorkflowManager(
state.active.clear(); state.active.clear();
} }
function handleWorkerExit(workflowName: string, code: number | null): void { function handleWorkerExit(
workflowName: string,
code: number | null,
signal: NodeJS.Signals | null,
): void {
const entry = workers.get(workflowName); const entry = workers.get(workflowName);
if (entry?.draining) { if (entry?.draining) {
workers.delete(workflowName); workers.delete(workflowName);
@@ -416,8 +428,10 @@ export function createWorkflowManager(
} }
return; return;
} }
const summary = formatChildExitSummary(code, signal);
const stderrExtra = entry !== undefined ? formatCapturedStderrTail(entry.stderrTail.value) : "";
process.stderr.write( process.stderr.write(
`[workflow-manager] worker for "${workflowName}" exited with code ${code ?? "null"}\n`, `[workflow-manager] worker for "${workflowName}" exited (${summary})${stderrExtra}\n`,
); );
handleWorkerCrash(workflowName); handleWorkerCrash(workflowName);
} }
@@ -428,17 +442,24 @@ export function createWorkflowManager(
return existing; return existing;
} }
const child = spawnWorkflowWorker(nerveRoot, workflowName, workerScript); const stderrTail = { value: "" };
const child = spawnWorkflowWorker(nerveRoot, workflowName, workerScript, stderrTail);
child.on("message", (raw: unknown) => { child.on("message", (raw: unknown) => {
handleWorkerMessage(workflowName, raw); handleWorkerMessage(workflowName, raw);
}); });
child.on("exit", (code) => { child.on("exit", (code, signal) => {
handleWorkerExit(workflowName, code); handleWorkerExit(workflowName, code, signal ?? null);
}); });
const entry: WorkerEntry = { workflowName, process: child, stopping: false, draining: false }; const entry: WorkerEntry = {
workflowName,
process: child,
stopping: false,
draining: false,
stderrTail,
};
workers.set(workflowName, entry); workers.set(workflowName, entry);
return entry; return entry;
} }
+5
View File
@@ -21,6 +21,7 @@ import type {
import type { ThreadCommandEventMessage, ThreadEventType, WorkerToParentMessage } from "./ipc.js"; import type { ThreadCommandEventMessage, ThreadEventType, WorkerToParentMessage } from "./ipc.js";
import { parseParentMessage } from "./ipc.js"; import { parseParentMessage } from "./ipc.js";
import { ignoreSessionBroadcastSignals } from "./worker-fork-support.js";
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// IPC helpers // IPC helpers
@@ -335,6 +336,10 @@ if (!parsed) {
process.exit(1); process.exit(1);
} }
if (typeof process.send === "function") {
ignoreSessionBroadcastSignals();
}
bootstrap(parsed.nerveRoot, parsed.workflow).catch((e) => { bootstrap(parsed.nerveRoot, parsed.workflow).catch((e) => {
const msg = e instanceof Error ? e.message : String(e); const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[workflow-worker] Unhandled bootstrap error: ${msg}\n`); process.stderr.write(`[workflow-worker] Unhandled bootstrap error: ${msg}\n`);
+9 -2
View File
@@ -60,7 +60,7 @@ importers:
version: 11.10.0 version: 11.10.0
drizzle-orm: drizzle-orm:
specifier: ^0.43.1 specifier: ^0.43.1
version: 0.43.1(@types/better-sqlite3@7.6.13)(better-sqlite3@11.10.0) version: 0.43.1(@types/better-sqlite3@7.6.13)(better-sqlite3@11.10.0)(sql.js@1.14.1)
yaml: yaml:
specifier: ^2.8.3 specifier: ^2.8.3
version: 2.8.3 version: 2.8.3
@@ -1071,6 +1071,9 @@ packages:
resolution: {integrity: sha512-i5uvt8C3ikiWeNZSVZNWcfZPItFQOsYTUAOkcUPGd8DqDy1uOUikjt5dG+uRlwyvR108Fb9DOd4GvXfT0N2/uQ==} resolution: {integrity: sha512-i5uvt8C3ikiWeNZSVZNWcfZPItFQOsYTUAOkcUPGd8DqDy1uOUikjt5dG+uRlwyvR108Fb9DOd4GvXfT0N2/uQ==}
engines: {node: '>= 12'} engines: {node: '>= 12'}
sql.js@1.14.1:
resolution: {integrity: sha512-gcj8zBWU5cFsi9WUP+4bFNXAyF1iRpA3LLyS/DP5xlrNzGmPIizUeBggKa8DbDwdqaKwUcTEnChtd2grWo/x/A==}
stackback@0.0.2: stackback@0.0.2:
resolution: {integrity: sha512-1XMJE5fQo1jGH6Y/7ebnwPOBEkIEnT4QF32d5R1+VXdXveM0IBMJt8zfaxX1P3QhVwrYe+576+jkANtSS2mBbw==} resolution: {integrity: sha512-1XMJE5fQo1jGH6Y/7ebnwPOBEkIEnT4QF32d5R1+VXdXveM0IBMJt8zfaxX1P3QhVwrYe+576+jkANtSS2mBbw==}
@@ -1692,10 +1695,11 @@ snapshots:
detect-libc@2.1.2: {} detect-libc@2.1.2: {}
drizzle-orm@0.43.1(@types/better-sqlite3@7.6.13)(better-sqlite3@11.10.0): drizzle-orm@0.43.1(@types/better-sqlite3@7.6.13)(better-sqlite3@11.10.0)(sql.js@1.14.1):
optionalDependencies: optionalDependencies:
'@types/better-sqlite3': 7.6.13 '@types/better-sqlite3': 7.6.13
better-sqlite3: 11.10.0 better-sqlite3: 11.10.0
sql.js: 1.14.1
end-of-stream@1.4.5: end-of-stream@1.4.5:
dependencies: dependencies:
@@ -1997,6 +2001,9 @@ snapshots:
source-map@0.7.6: {} source-map@0.7.6: {}
sql.js@1.14.1:
optional: true
stackback@0.0.2: {} stackback@0.0.2: {}
std-env@4.1.0: {} std-env@4.1.0: {}