Compare commits

..

2 Commits

Author SHA1 Message Date
xiaoju 8f495cf92e fix: address all 8 PR #17 review issues
Critical fixes:
1. triggerPayload spread → safe field assignment with validation
2. Graceful shutdown stopping flag → no false crash warnings
3. Shutdown awaits in-flight work (10s timeout) before exit
4. Thread loop safety valve (MAX_STEPS=1000)
5. active counter: bare number → Set<string> by runId

Should-fix:
6. ThreadEventMessage.eventType → union type with validation
7. SQLite status cast → runtime validateWorkflowRunStatus()
8. getActiveWorkflowRuns → pre-compiled prepared statements

小橘 <xiaoju@shazhou.work>
2026-04-22 12:13:29 +00:00
xiaoju a68338c4e9 feat: implement Workflow Engine Phase 1 (#16)
- Core types: WorkflowDefinition, ThreadState, CommandEvent, ModerateResult, etc.
- IPC: StartThreadMessage, ResumeThreadMessage, ThreadEventMessage, WorkflowErrorMessage
- WorkflowManager: concurrency control (drop/queue overflow), worker lifecycle
- Workflow worker: moderate→execute loop, user code loading
- LogStore: workflow_runs materialized table, appendWithWorkflowUpdate
- 131 tests passing

RFC-002 Phase 1 complete.

小橘 <xiaoju@shazhou.work>
2026-04-22 11:49:50 +00:00
9 changed files with 1484 additions and 6 deletions
+8
View File
@@ -8,6 +8,14 @@ export type {
QueueOverflowConfig,
WorkflowConfig,
NerveConfig,
CommandEvent,
ThreadState,
ModerateResult,
WorkflowContext,
RoleExecuteFn,
Role,
ModerateFn,
WorkflowDefinition,
} from "./types.js";
export type { Result } from "./result.js";
export { ok, err } from "./result.js";
+55
View File
@@ -45,3 +45,58 @@ export type NerveConfig = {
reflexes: ReflexConfig[];
workflows: Record<string, WorkflowConfig> | null;
};
// ---------------------------------------------------------------------------
// Workflow Engine types (RFC-002)
// ---------------------------------------------------------------------------
/** A single event in the command event stream that drives a workflow thread. */
export type CommandEvent = {
type: string;
[key: string]: unknown;
};
/** Accumulated state of a running thread — the event history for moderate(). */
export type ThreadState = {
runId: string;
/** All events so far, including the initial thread_start event. */
events: CommandEvent[];
};
/** The result of moderate() — which role to hand to next, and what prompt to pass. */
export type ModerateResult = {
role: string;
prompt: unknown;
};
/** Context injected into every role execute() call. */
export type WorkflowContext = {
runId: string;
workflowName: string;
/** Emit a log message back to the parent process. */
log: (message: string) => void;
};
/**
* A role's execute function. Has side effects (API calls, file I/O, etc.).
* Returns a CommandEvent that is fed back into moderate().
*/
export type RoleExecuteFn = (prompt: unknown, ctx: WorkflowContext) => Promise<CommandEvent>;
/** A role in a workflow — a named unit of execution with side effects. */
export type Role = {
execute: RoleExecuteFn;
};
/**
* The moderator function — pure, no side effects.
* Decides which role to pass control to next.
* Returns null to signal thread completion.
*/
export type ModerateFn = (thread: ThreadState, event: CommandEvent) => ModerateResult | null;
/** The complete definition of a workflow, as authored by users. */
export type WorkflowDefinition = {
roles: Record<string, Role>;
moderate: ModerateFn;
};
@@ -0,0 +1,186 @@
/**
* Tests for workflow-related LogStore functionality (RFC-002 §6.2).
*/
import { mkdtempSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { createLogStore } from "../log-store.js";
import type { LogStore, WorkflowRun } from "../log-store.js";
describe("LogStore — workflow_runs", () => {
let tmpDir: string;
let store: LogStore;
beforeEach(() => {
tmpDir = mkdtempSync(join(tmpdir(), "nerve-wf-log-test-"));
store = createLogStore(join(tmpDir, "data", "logs.db"));
});
afterEach(() => {
store.close();
rmSync(tmpDir, { recursive: true, force: true });
});
describe("upsertWorkflowRun", () => {
it("writes a log entry and creates a workflow_runs row atomically", () => {
const run: WorkflowRun = {
runId: "run-1",
workflow: "cleanup",
status: "started",
ts: 1000,
};
const entry = store.upsertWorkflowRun(
{ source: "workflow", type: "started", refId: "run-1", payload: null, ts: 1000 },
run,
);
expect(entry.id).toBeGreaterThan(0);
expect(entry.source).toBe("workflow");
expect(entry.type).toBe("started");
const stored = store.getWorkflowRun("run-1");
expect(stored).not.toBeNull();
expect(stored?.runId).toBe("run-1");
expect(stored?.workflow).toBe("cleanup");
expect(stored?.status).toBe("started");
expect(stored?.ts).toBe(1000);
});
it("updates existing workflow_runs row on upsert (status transition)", () => {
store.upsertWorkflowRun(
{ source: "workflow", type: "started", refId: "run-2", payload: null, ts: 1000 },
{ runId: "run-2", workflow: "cleanup", status: "started", ts: 1000 },
);
store.upsertWorkflowRun(
{ source: "workflow", type: "completed", refId: "run-2", payload: null, ts: 2000 },
{ runId: "run-2", workflow: "cleanup", status: "completed", ts: 2000 },
);
const stored = store.getWorkflowRun("run-2");
expect(stored?.status).toBe("completed");
expect(stored?.ts).toBe(2000);
// Both log entries should be present (event sourcing)
const logs = store.query({ refId: "run-2" });
expect(logs).toHaveLength(2);
});
it("the log entries act as source of truth for event history", () => {
for (const [type, status, ts] of [
["queued", "queued", 1000],
["started", "started", 1001],
["step_complete", "started", 1002],
["completed", "completed", 1005],
] as const) {
store.upsertWorkflowRun(
{ source: "workflow", type, refId: "run-3", payload: null, ts },
{ runId: "run-3", workflow: "cleanup", status, ts },
);
}
const logs = store.query({ source: "workflow", refId: "run-3" });
expect(logs).toHaveLength(4);
expect(logs[0].type).toBe("queued");
expect(logs[3].type).toBe("completed");
});
});
describe("getWorkflowRun", () => {
it("returns null for a non-existent runId", () => {
expect(store.getWorkflowRun("no-such-run")).toBeNull();
});
it("returns the latest state after multiple upserts", () => {
store.upsertWorkflowRun(
{ source: "workflow", type: "queued", refId: "run-4", payload: null, ts: 100 },
{ runId: "run-4", workflow: "code-review", status: "queued", ts: 100 },
);
store.upsertWorkflowRun(
{ source: "workflow", type: "started", refId: "run-4", payload: null, ts: 200 },
{ runId: "run-4", workflow: "code-review", status: "started", ts: 200 },
);
const run = store.getWorkflowRun("run-4");
expect(run?.status).toBe("started");
expect(run?.ts).toBe(200);
});
});
describe("getActiveWorkflowRuns", () => {
beforeEach(() => {
store.upsertWorkflowRun(
{ source: "workflow", type: "queued", refId: "r1", payload: null, ts: 100 },
{ runId: "r1", workflow: "cleanup", status: "queued", ts: 100 },
);
store.upsertWorkflowRun(
{ source: "workflow", type: "started", refId: "r2", payload: null, ts: 200 },
{ runId: "r2", workflow: "cleanup", status: "started", ts: 200 },
);
store.upsertWorkflowRun(
{ source: "workflow", type: "completed", refId: "r3", payload: null, ts: 300 },
{ runId: "r3", workflow: "cleanup", status: "completed", ts: 300 },
);
store.upsertWorkflowRun(
{ source: "workflow", type: "failed", refId: "r4", payload: null, ts: 400 },
{ runId: "r4", workflow: "deploy", status: "queued", ts: 400 },
);
});
it("returns queued and started runs by default", () => {
const active = store.getActiveWorkflowRuns();
expect(active).toHaveLength(3);
const ids = active.map((r) => r.runId);
expect(ids).toContain("r1");
expect(ids).toContain("r2");
expect(ids).toContain("r4");
});
it("excludes completed and failed runs", () => {
const active = store.getActiveWorkflowRuns();
const ids = active.map((r) => r.runId);
expect(ids).not.toContain("r3");
});
it("filters by workflow name", () => {
const cleanupRuns = store.getActiveWorkflowRuns("cleanup");
expect(cleanupRuns).toHaveLength(2);
const ids = cleanupRuns.map((r) => r.runId);
expect(ids).toContain("r1");
expect(ids).toContain("r2");
});
it("returns only matching workflow when name given", () => {
const deployRuns = store.getActiveWorkflowRuns("deploy");
expect(deployRuns).toHaveLength(1);
expect(deployRuns[0].runId).toBe("r4");
});
it("returns empty array when workflow name matches no active runs", () => {
expect(store.getActiveWorkflowRuns("nonexistent")).toHaveLength(0);
});
it("returns runs ordered by ts ascending", () => {
const active = store.getActiveWorkflowRuns();
expect(active[0].ts).toBeLessThan(active[1].ts);
});
});
describe("all statuses are storable", () => {
it.each(["queued", "started", "completed", "failed", "crashed", "dropped"] as const)(
"stores status=%s",
(status) => {
const runId = `run-${status}`;
store.upsertWorkflowRun(
{ source: "workflow", type: status, refId: runId, payload: null, ts: 1 },
{ runId, workflow: "test", status, ts: 1 },
);
expect(store.getWorkflowRun(runId)?.status).toBe(status);
},
);
});
});
@@ -0,0 +1,362 @@
/**
* Tests for WorkflowManager — concurrency, overflow, queue, and lifecycle.
*/
import { EventEmitter } from "node:events";
import type { NerveConfig } from "@uncaged/nerve-core";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
// ---------------------------------------------------------------------------
// Mock child_process.fork before importing workflow-manager
// ---------------------------------------------------------------------------
type MockChild = EventEmitter & {
send: ReturnType<typeof vi.fn>;
kill: ReturnType<typeof vi.fn>;
connected: boolean;
exitCode: number | null;
pid: number;
};
const mockChildren: MockChild[] = [];
function makeMockChild(pid = 1): MockChild {
const child = new EventEmitter() as MockChild;
child.connected = true;
child.exitCode = null;
child.pid = pid;
child.send = vi.fn((msg: unknown) => {
if (
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "shutdown"
) {
setImmediate(() => {
child.exitCode = 0;
child.connected = false;
child.emit("exit", 0, null);
});
}
});
child.kill = vi.fn((_signal?: string) => {
child.exitCode = 1;
child.connected = false;
child.emit("exit", null, _signal ?? "SIGKILL");
});
return child;
}
vi.mock("node:child_process", () => ({
fork: vi.fn((_script: string, _args: string[], _opts: unknown) => {
const child = makeMockChild(mockChildren.length + 1);
mockChildren.push(child);
return child;
}),
}));
// Import after mock is set up
const { createWorkflowManager } = await import("../workflow-manager.js");
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function makeLogStore() {
return {
append: vi.fn(),
query: vi.fn(() => []),
getMeta: vi.fn(() => null),
setMeta: vi.fn(),
upsertWorkflowRun: vi.fn(),
appendWithWorkflowUpdate: vi.fn(),
getWorkflowRun: vi.fn(() => null),
getActiveWorkflowRuns: vi.fn(() => []),
close: vi.fn(),
};
}
function makeConfig(overrides: Partial<NerveConfig["workflows"]> = {}): NerveConfig {
return {
senses: {},
reflexes: [],
workflows: overrides as NerveConfig["workflows"],
};
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describe("WorkflowManager", () => {
beforeEach(() => {
mockChildren.length = 0;
vi.useFakeTimers({ shouldAdvanceTime: true });
});
afterEach(() => {
vi.useRealTimers();
vi.clearAllMocks();
});
describe("startWorkflow under concurrency limit dispatches thread", () => {
it("forks a worker and sends start-thread when active < concurrency", () => {
const logStore = makeLogStore();
const config = makeConfig({
"my-workflow": { concurrency: 2, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-workflow", { event: "test" });
expect(mockChildren).toHaveLength(1);
expect(mockChildren[0].send).toHaveBeenCalledWith(
expect.objectContaining({ type: "start-thread", workflow: "my-workflow" }),
);
expect(mgr.activeCount("my-workflow")).toBe(1);
});
it("reuses the same worker for a second thread under the limit", () => {
const logStore = makeLogStore();
const config = makeConfig({
"my-workflow": { concurrency: 3, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-workflow", { n: 1 });
mgr.startWorkflow("my-workflow", { n: 2 });
// Only one forked child — worker is reused
expect(mockChildren).toHaveLength(1);
expect(mockChildren[0].send).toHaveBeenCalledTimes(2);
expect(mgr.activeCount("my-workflow")).toBe(2);
});
it("logs a 'started' event for each dispatched thread", () => {
const logStore = makeLogStore();
const config = makeConfig({
"my-workflow": { concurrency: 2, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-workflow", { x: 1 });
expect(logStore.upsertWorkflowRun).toHaveBeenCalledWith(
expect.objectContaining({ source: "workflow", type: "started" }),
expect.objectContaining({ workflow: "my-workflow", status: "started" }),
);
});
});
describe("startWorkflow at limit with drop overflow drops the request", () => {
it("does NOT send start-thread when at concurrency limit with overflow=drop", () => {
const logStore = makeLogStore();
const config = makeConfig({
"drop-wf": { concurrency: 1, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("drop-wf", { first: true });
// now at limit — second call should be dropped
mgr.startWorkflow("drop-wf", { second: true });
expect(mgr.activeCount("drop-wf")).toBe(1);
expect(mgr.queueLength("drop-wf")).toBe(0);
// Only one start-thread sent
expect(mockChildren[0].send).toHaveBeenCalledTimes(1);
});
it("logs a 'dropped' event when overflow=drop", () => {
const logStore = makeLogStore();
const config = makeConfig({
"drop-wf": { concurrency: 1, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("drop-wf", {});
mgr.startWorkflow("drop-wf", {});
const droppedCall = logStore.upsertWorkflowRun.mock.calls.find(
([entry]) => entry.type === "dropped",
);
expect(droppedCall).toBeDefined();
});
});
describe("startWorkflow at limit with queue overflow queues the request", () => {
it("queues the thread when at concurrency limit with overflow=queue", () => {
const logStore = makeLogStore();
const config = makeConfig({
"queue-wf": { concurrency: 1, overflow: "queue", maxQueue: 5 },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("queue-wf", { first: true });
mgr.startWorkflow("queue-wf", { second: true });
expect(mgr.activeCount("queue-wf")).toBe(1);
expect(mgr.queueLength("queue-wf")).toBe(1);
});
it("logs a 'queued' event for the waiting thread", () => {
const logStore = makeLogStore();
const config = makeConfig({
"queue-wf": { concurrency: 1, overflow: "queue", maxQueue: 5 },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("queue-wf", {});
mgr.startWorkflow("queue-wf", {});
const queuedCall = logStore.upsertWorkflowRun.mock.calls.find(
([entry]) => entry.type === "queued",
);
expect(queuedCall).toBeDefined();
expect(queuedCall?.[1]).toMatchObject({ workflow: "queue-wf", status: "queued" });
});
});
describe("queue overflow with maxQueue drops oldest when full", () => {
it("drops oldest queued item when queue reaches maxQueue", () => {
const logStore = makeLogStore();
const config = makeConfig({
"queue-wf": { concurrency: 1, overflow: "queue", maxQueue: 2 },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
// Fill the concurrency slot
mgr.startWorkflow("queue-wf", { n: 0 });
// Fill the queue to maxQueue
mgr.startWorkflow("queue-wf", { n: 1 });
mgr.startWorkflow("queue-wf", { n: 2 });
// This one should push out { n: 1 }
mgr.startWorkflow("queue-wf", { n: 3 });
// Queue should still be at maxQueue (2)
expect(mgr.queueLength("queue-wf")).toBe(2);
// There should be a dropped event (for the oldest evicted item)
const droppedCalls = logStore.upsertWorkflowRun.mock.calls.filter(
([entry]) => entry.type === "dropped",
);
expect(droppedCalls).toHaveLength(1);
});
});
describe("completing a thread dequeues the next one", () => {
it("dispatches the next queued thread when the active thread sends completed", () => {
const logStore = makeLogStore();
const config = makeConfig({
"queue-wf": { concurrency: 1, overflow: "queue", maxQueue: 5 },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("queue-wf", { first: true });
mgr.startWorkflow("queue-wf", { second: true });
expect(mgr.activeCount("queue-wf")).toBe(1);
expect(mgr.queueLength("queue-wf")).toBe(1);
// Simulate the worker sending a "thread-event: completed"
const child = mockChildren[0];
const startCalls = (child.send as ReturnType<typeof vi.fn>).mock.calls;
const firstRunId = (startCalls[0][0] as { runId: string }).runId;
child.emit("message", {
type: "thread-event",
runId: firstRunId,
eventType: "completed",
payload: null,
});
// The queued thread should now be active
expect(mgr.activeCount("queue-wf")).toBe(1);
expect(mgr.queueLength("queue-wf")).toBe(0);
// A second start-thread message should have been sent
expect(child.send).toHaveBeenCalledTimes(2);
expect(child.send).toHaveBeenLastCalledWith(
expect.objectContaining({ type: "start-thread", workflow: "queue-wf" }),
);
});
it("dispatches next queued thread when active thread sends failed", () => {
const logStore = makeLogStore();
const config = makeConfig({
"queue-wf": { concurrency: 1, overflow: "queue", maxQueue: 5 },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("queue-wf", { first: true });
mgr.startWorkflow("queue-wf", { second: true });
const child = mockChildren[0];
const firstRunId = (child.send as ReturnType<typeof vi.fn>).mock.calls[0][0].runId as string;
child.emit("message", {
type: "thread-event",
runId: firstRunId,
eventType: "failed",
payload: { error: "boom" },
});
expect(mgr.activeCount("queue-wf")).toBe(1);
expect(mgr.queueLength("queue-wf")).toBe(0);
});
});
describe("stop() shuts down all workers", () => {
it("sends shutdown to every worker and resolves when they exit", async () => {
const logStore = makeLogStore();
const config = makeConfig({
"wf-a": { concurrency: 2, overflow: "drop" },
"wf-b": { concurrency: 1, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("wf-a", {});
mgr.startWorkflow("wf-b", {});
// Two distinct workers should have been forked
expect(mockChildren).toHaveLength(2);
const stopPromise = mgr.stop();
// Let setImmediate callbacks run (the mock exits on shutdown)
await vi.runAllTimersAsync();
await stopPromise;
for (const child of mockChildren) {
expect(child.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" }));
}
});
it("ignores startWorkflow calls after stop()", async () => {
const logStore = makeLogStore();
const config = makeConfig({
"wf-a": { concurrency: 2, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
mgr.startWorkflow("wf-a", {});
// No worker should have been spawned
expect(mockChildren).toHaveLength(0);
});
});
describe("unknown workflow name", () => {
it("does nothing for an unknown workflow name", () => {
const logStore = makeLogStore();
const config = makeConfig({});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("no-such-workflow", {});
expect(mockChildren).toHaveLength(0);
expect(logStore.upsertWorkflowRun).not.toHaveBeenCalled();
});
});
});
+8 -1
View File
@@ -8,6 +8,10 @@ export type {
ErrorMessage,
ReadyMessage,
WorkerToParentMessage,
StartThreadMessage,
ResumeThreadMessage,
ThreadEventMessage,
WorkflowErrorMessage,
} from "./ipc.js";
export type { SignalBus, SignalHandler, Unsubscribe } from "./signal-bus.js";
@@ -29,4 +33,7 @@ export { createFileWatcher } from "./file-watcher.js";
export type { FileWatcher, FileChange, FileChangeHandler } from "./file-watcher.js";
export { createLogStore } from "./log-store.js";
export type { LogStore, LogEntry, LogQuery } from "./log-store.js";
export type { LogStore, LogEntry, LogQuery, WorkflowRun, WorkflowRunStatus } from "./log-store.js";
export { createWorkflowManager } from "./workflow-manager.js";
export type { WorkflowManager } from "./workflow-manager.js";
+118 -4
View File
@@ -22,8 +22,32 @@ export type HealthRequestMessage = {
type: "health-request";
};
// ---------------------------------------------------------------------------
// Workflow IPC messages (RFC-002 §5.2)
// ---------------------------------------------------------------------------
/** Parent → Workflow Worker: start a new thread */
export type StartThreadMessage = {
type: "start-thread";
runId: string;
workflow: string;
/** The trigger payload from the Reflex that initiated this thread. */
triggerPayload: unknown;
};
/** Parent → Workflow Worker: resume an existing thread after crash recovery */
export type ResumeThreadMessage = {
type: "resume-thread";
runId: string;
};
/** Union of all messages the parent sends to a worker */
export type ParentToWorkerMessage = ComputeMessage | ShutdownMessage | HealthRequestMessage;
export type ParentToWorkerMessage =
| ComputeMessage
| ShutdownMessage
| HealthRequestMessage
| StartThreadMessage
| ResumeThreadMessage;
/** Worker → Parent: compute produced a signal */
export type SignalMessage = {
@@ -51,14 +75,46 @@ export type HealthResponseMessage = {
inFlightCount: number;
};
// ---------------------------------------------------------------------------
// Workflow Worker → Parent messages (RFC-002 §5.2)
// ---------------------------------------------------------------------------
/** Valid lifecycle event types for a workflow thread. */
export type ThreadEventType = "queued" | "started" | "step_complete" | "completed" | "failed";
/**
* Workflow Worker → Parent: a thread lifecycle event.
*/
export type ThreadEventMessage = {
type: "thread-event";
runId: string;
eventType: ThreadEventType;
payload: unknown;
};
/** Workflow Worker → Parent: a thread encountered an unrecoverable error. */
export type WorkflowErrorMessage = {
type: "workflow-error";
runId: string;
error: string;
};
/** Union of all messages a worker sends to the parent */
export type WorkerToParentMessage =
| SignalMessage
| ErrorMessage
| ReadyMessage
| HealthResponseMessage;
| HealthResponseMessage
| ThreadEventMessage
| WorkflowErrorMessage;
const PARENT_MSG_TYPES = new Set(["compute", "shutdown", "health-request"]);
const PARENT_MSG_TYPES = new Set([
"compute",
"shutdown",
"health-request",
"start-thread",
"resume-thread",
]);
/** Validate and parse an unknown IPC message received from the parent process. */
export function parseParentMessage(raw: unknown): Result<ParentToWorkerMessage> {
@@ -72,6 +128,18 @@ export function parseParentMessage(raw: unknown): Result<ParentToWorkerMessage>
if (!PARENT_MSG_TYPES.has(obj.type)) {
return err(new Error(`Unknown IPC message type: "${obj.type}"`));
}
if (obj.type === "start-thread") {
if (typeof obj.runId !== "string")
return err(new Error("'start-thread' message missing string 'runId'"));
if (typeof obj.workflow !== "string")
return err(new Error("'start-thread' message missing string 'workflow'"));
if (!("triggerPayload" in obj))
return err(new Error("'start-thread' message missing 'triggerPayload'"));
}
if (obj.type === "resume-thread") {
if (typeof obj.runId !== "string")
return err(new Error("'resume-thread' message missing string 'runId'"));
}
return ok(raw as ParentToWorkerMessage);
}
@@ -108,7 +176,51 @@ function parseHealthResponseMsg(
return ok(raw as HealthResponseMessage);
}
const WORKER_MSG_TYPES = new Set(["signal", "error", "ready", "health-response"]);
const THREAD_EVENT_TYPES = new Set<string>([
"queued",
"started",
"step_complete",
"completed",
"failed",
]);
function parseThreadEventMsg(
obj: Record<string, unknown>,
raw: unknown,
): Result<WorkerToParentMessage> {
if (typeof obj.runId !== "string") {
return err(new Error("Worker 'thread-event' message missing string 'runId' field"));
}
if (typeof obj.eventType !== "string" || !THREAD_EVENT_TYPES.has(obj.eventType)) {
return err(new Error(`Worker 'thread-event' message has invalid 'eventType': "${obj.eventType}"`));
}
if (!("payload" in obj)) {
return err(new Error("Worker 'thread-event' message missing 'payload' field"));
}
return ok(raw as ThreadEventMessage);
}
function parseWorkflowErrorMsg(
obj: Record<string, unknown>,
raw: unknown,
): Result<WorkerToParentMessage> {
if (typeof obj.runId !== "string") {
return err(new Error("Worker 'workflow-error' message missing string 'runId' field"));
}
if (typeof obj.error !== "string") {
return err(new Error("Worker 'workflow-error' message missing string 'error' field"));
}
return ok(raw as WorkflowErrorMessage);
}
const WORKER_MSG_TYPES = new Set([
"signal",
"error",
"ready",
"health-response",
"thread-event",
"workflow-error",
]);
/** Validate and parse an unknown IPC message received from a worker process. */
export function parseWorkerMessage(raw: unknown): Result<WorkerToParentMessage> {
@@ -125,5 +237,7 @@ export function parseWorkerMessage(raw: unknown): Result<WorkerToParentMessage>
if (obj.type === "signal") return parseSignalMsg(obj, raw);
if (obj.type === "error") return parseErrorMsg(obj, raw);
if (obj.type === "health-response") return parseHealthResponseMsg(obj, raw);
if (obj.type === "thread-event") return parseThreadEventMsg(obj, raw);
if (obj.type === "workflow-error") return parseWorkflowErrorMsg(obj, raw);
return ok({ type: "ready" });
}
+140 -1
View File
@@ -30,11 +30,70 @@ export type LogQuery = {
limit?: number;
};
// ---------------------------------------------------------------------------
// Workflow runs materialized view (RFC-002 §6.2)
// ---------------------------------------------------------------------------
export type WorkflowRunStatus =
| "queued"
| "started"
| "completed"
| "failed"
| "crashed"
| "dropped";
const VALID_WORKFLOW_STATUSES = new Set<string>([
"queued",
"started",
"completed",
"failed",
"crashed",
"dropped",
]);
function validateWorkflowRunStatus(status: string): WorkflowRunStatus {
if (!VALID_WORKFLOW_STATUSES.has(status)) {
throw new Error(`Invalid workflow run status from DB: "${status}"`);
}
return status as WorkflowRunStatus;
}
/** One row in the workflow_runs materialized table. */
export type WorkflowRun = {
runId: string;
workflow: string;
status: WorkflowRunStatus;
ts: number;
};
export type LogStore = {
append: (entry: Omit<LogEntry, "id">) => LogEntry;
query: (filter?: LogQuery) => LogEntry[];
getMeta: (key: string) => string | null;
setMeta: (key: string, value: string) => void;
/**
* Append a workflow log event and atomically upsert the workflow_runs
* materialized table — both in a single SQLite transaction (RFC-002 §6.2).
*/
upsertWorkflowRun: (
entry: Omit<LogEntry, "id">,
run: WorkflowRun,
) => LogEntry;
/**
* Alias for upsertWorkflowRun — append a log entry and update workflow_runs
* in one atomic transaction.
*/
appendWithWorkflowUpdate: (
entry: Omit<LogEntry, "id">,
run: WorkflowRun,
) => LogEntry;
/** Get the current materialized state of a specific workflow run. */
getWorkflowRun: (runId: string) => WorkflowRun | null;
/**
* Get all workflow runs with status 'queued' or 'started'.
* Optionally filter by workflow name.
*/
getActiveWorkflowRuns: (workflowName?: string) => WorkflowRun[];
close: () => void;
};
@@ -56,6 +115,16 @@ CREATE TABLE IF NOT EXISTS meta (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS workflow_runs (
run_id TEXT PRIMARY KEY,
workflow TEXT NOT NULL,
status TEXT NOT NULL,
ts INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_workflow_runs_status ON workflow_runs(status);
CREATE INDEX IF NOT EXISTS idx_workflow_runs_workflow ON workflow_runs(workflow);
`;
export function createLogStore(dbPath: string): LogStore {
@@ -74,6 +143,41 @@ export function createLogStore(dbPath: string): LogStore {
"INSERT INTO meta (key, value) VALUES (@key, @value) ON CONFLICT(key) DO UPDATE SET value = @value",
);
const upsertWorkflowRunStmt = sqlite.prepare(
"INSERT OR REPLACE INTO workflow_runs (run_id, workflow, status, ts) VALUES (@runId, @workflow, @status, @ts)",
);
const getWorkflowRunStmt = sqlite.prepare(
"SELECT run_id, workflow, status, ts FROM workflow_runs WHERE run_id = ?",
);
const getActiveWorkflowRunsStmt = sqlite.prepare(
"SELECT run_id, workflow, status, ts FROM workflow_runs WHERE status IN ('queued', 'started') ORDER BY ts ASC",
);
const getActiveWorkflowRunsByNameStmt = sqlite.prepare(
"SELECT run_id, workflow, status, ts FROM workflow_runs WHERE status IN ('queued', 'started') AND workflow = ? ORDER BY ts ASC",
);
const upsertWorkflowRunTx = sqlite.transaction(
(entry: Omit<LogEntry, "id">, run: WorkflowRun) => {
const info = insertStmt.run({
source: entry.source,
type: entry.type,
refId: entry.refId,
payload: entry.payload,
ts: entry.ts,
});
upsertWorkflowRunStmt.run({
runId: run.runId,
workflow: run.workflow,
status: run.status,
ts: run.ts,
});
return { ...entry, id: Number(info.lastInsertRowid) };
},
);
function append(entry: Omit<LogEntry, "id">): LogEntry {
const info = insertStmt.run({
source: entry.source,
@@ -142,9 +246,44 @@ export function createLogStore(dbPath: string): LogStore {
setMetaStmt.run({ key, value });
}
function upsertWorkflowRun(entry: Omit<LogEntry, "id">, run: WorkflowRun): LogEntry {
return upsertWorkflowRunTx(entry, run) as LogEntry;
}
function appendWithWorkflowUpdate(entry: Omit<LogEntry, "id">, run: WorkflowRun): LogEntry {
return upsertWorkflowRunTx(entry, run) as LogEntry;
}
function getWorkflowRun(runId: string): WorkflowRun | null {
const row = getWorkflowRunStmt.get(runId) as
| { run_id: string; workflow: string; status: string; ts: number }
| undefined;
if (row === undefined) return null;
return {
runId: row.run_id,
workflow: row.workflow,
status: validateWorkflowRunStatus(row.status),
ts: row.ts,
};
}
function getActiveWorkflowRuns(workflowName?: string): WorkflowRun[] {
const rows = (
workflowName !== undefined
? getActiveWorkflowRunsByNameStmt.all(workflowName)
: getActiveWorkflowRunsStmt.all()
) as Array<{ run_id: string; workflow: string; status: string; ts: number }>;
return rows.map((r) => ({
runId: r.run_id,
workflow: r.workflow,
status: validateWorkflowRunStatus(r.status),
ts: r.ts,
}));
}
function close(): void {
sqlite.close();
}
return { append, query, getMeta, setMeta, close };
return { append, query, getMeta, setMeta, upsertWorkflowRun, appendWithWorkflowUpdate, getWorkflowRun, getActiveWorkflowRuns, close };
}
+363
View File
@@ -0,0 +1,363 @@
/**
* WorkflowManager — manages per-workflow worker processes, concurrency control,
* and thread lifecycle tracking.
*
* RFC-002 §7.1: one worker process per workflow name, reused while alive.
* Concurrency and overflow (drop/queue) are enforced here in the parent process.
*/
import { fork } from "node:child_process";
import type { ChildProcess } from "node:child_process";
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import type { NerveConfig, WorkflowConfig } from "@uncaged/nerve-core";
import type { ShutdownMessage, StartThreadMessage, ThreadEventMessage } from "./ipc.js";
import { parseWorkerMessage } from "./ipc.js";
import type { LogStore } from "./log-store.js";
export type WorkflowManager = {
/** Trigger a new workflow thread (called by Reflex scheduler). */
startWorkflow: (workflowName: string, payload: unknown) => void;
/** Number of currently active (running) threads for a workflow. */
activeCount: (workflowName: string) => number;
/** Number of pending queued threads waiting to run for a workflow. */
queueLength: (workflowName: string) => number;
/** Gracefully shut down all workflow workers. */
stop: () => Promise<void>;
};
type PendingThread = {
runId: string;
payload: unknown;
};
type WorkflowState = {
active: Set<string>;
queue: PendingThread[];
};
type WorkerEntry = {
workflowName: string;
process: ChildProcess;
stopping: boolean;
};
const DEFAULT_MAX_QUEUE = 100;
function resolveWorkerScript(): string {
const __filename = fileURLToPath(import.meta.url);
const __dir = dirname(__filename);
return join(__dir, "workflow-worker.js");
}
function spawnWorkflowWorker(
nerveRoot: string,
workflowName: string,
workerScript: string,
): ChildProcess {
return fork(workerScript, ["--workflow", workflowName, "--root", nerveRoot], {
stdio: ["ignore", "inherit", "inherit", "ipc"],
});
}
function sendStartThread(worker: ChildProcess, msg: StartThreadMessage): void {
if (worker.connected === false) return;
try {
worker.send(msg);
} catch {
// IPC channel closed between connected check and send
}
}
function sendShutdown(worker: ChildProcess, entry: WorkerEntry): void {
entry.stopping = true;
if (worker.connected === false) return;
const msg: ShutdownMessage = { type: "shutdown" };
try {
worker.send(msg);
} catch {
// IPC channel closed between connected check and send
}
}
function waitForExit(child: ChildProcess, timeoutMs: number): Promise<void> {
return new Promise((resolve) => {
const timer = setTimeout(() => {
child.kill("SIGKILL");
resolve();
}, timeoutMs);
child.once("exit", () => {
clearTimeout(timer);
resolve();
});
});
}
export function createWorkflowManager(
nerveRoot: string,
config: NerveConfig,
logStore: LogStore,
): WorkflowManager {
const workerScript = resolveWorkerScript();
const states = new Map<string, WorkflowState>();
const workers = new Map<string, WorkerEntry>();
let stopped = false;
function getOrCreateState(workflowName: string): WorkflowState {
let state = states.get(workflowName);
if (state === undefined) {
state = { active: new Set<string>(), queue: [] };
states.set(workflowName, state);
}
return state;
}
function workflowConfig(workflowName: string): WorkflowConfig | null {
return config.workflows?.[workflowName] ?? null;
}
function logWorkflowEvent(
workflowName: string,
runId: string,
eventType: string,
payload?: unknown,
): void {
const ts = Date.now();
if (
eventType === "started" ||
eventType === "queued" ||
eventType === "completed" ||
eventType === "failed" ||
eventType === "crashed" ||
eventType === "dropped"
) {
const status =
eventType === "dropped"
? ("dropped" as const)
: eventType === "queued"
? ("queued" as const)
: eventType === "started"
? ("started" as const)
: eventType === "completed"
? ("completed" as const)
: eventType === "failed"
? ("failed" as const)
: ("crashed" as const);
logStore.upsertWorkflowRun(
{
source: "workflow",
type: eventType,
refId: runId,
payload: payload !== undefined ? JSON.stringify(payload) : null,
ts,
},
{ runId, workflow: workflowName, status, ts },
);
} else {
logStore.append({
source: "workflow",
type: eventType,
refId: runId,
payload: payload !== undefined ? JSON.stringify(payload) : null,
ts,
});
}
}
function dispatchThread(workflowName: string, runId: string, payload: unknown): void {
const state = getOrCreateState(workflowName);
state.active.add(runId);
const worker = getOrSpawnWorker(workflowName);
const msg: StartThreadMessage = {
type: "start-thread",
runId,
workflow: workflowName,
triggerPayload: payload,
};
sendStartThread(worker.process, msg);
logWorkflowEvent(workflowName, runId, "started");
}
function dequeueNext(workflowName: string): void {
const state = states.get(workflowName);
if (state === undefined || state.queue.length === 0) return;
const wfConfig = workflowConfig(workflowName);
const concurrency = wfConfig?.concurrency ?? 1;
if (state.active.size < concurrency) {
const next = state.queue.shift();
if (next !== undefined) {
dispatchThread(workflowName, next.runId, next.payload);
}
}
}
function handleThreadEvent(workflowName: string, msg: ThreadEventMessage): void {
const state = states.get(workflowName);
if (state === undefined) return;
if (msg.eventType === "completed" || msg.eventType === "failed") {
state.active.delete(msg.runId);
dequeueNext(workflowName);
}
if (msg.eventType === "completed" || msg.eventType === "failed") {
logWorkflowEvent(workflowName, msg.runId, msg.eventType, msg.payload);
}
}
function handleWorkerCrash(workflowName: string): void {
const state = states.get(workflowName);
if (state === undefined) return;
const crashedCount = state.active.size;
if (crashedCount > 0) {
process.stderr.write(
`[workflow-manager] worker for "${workflowName}" crashed with ${crashedCount} active thread(s)\n`,
);
}
// All active threads are now crashed — we can't recover runIds from this
// in-memory state alone (Phase 3 crash recovery will use the DB).
// Reset active set so the manager stays consistent.
state.active.clear();
workers.delete(workflowName);
}
function getOrSpawnWorker(workflowName: string): WorkerEntry {
const existing = workers.get(workflowName);
if (existing !== undefined && existing.process.exitCode === null) {
return existing;
}
const child = spawnWorkflowWorker(nerveRoot, workflowName, workerScript);
child.on("message", (raw: unknown) => {
const result = parseWorkerMessage(raw);
if (!result.ok) {
process.stderr.write(
`[workflow-manager] invalid message from "${workflowName}" worker: ${result.error.message}\n`,
);
return;
}
const msg = result.value;
if (msg.type === "thread-event") {
handleThreadEvent(workflowName, msg);
return;
}
if (msg.type === "workflow-error") {
process.stderr.write(
`[workflow-manager] workflow-error for runId "${msg.runId}" in "${workflowName}": ${msg.error}\n`,
);
const state = states.get(workflowName);
if (state !== undefined) {
state.active.delete(msg.runId);
dequeueNext(workflowName);
}
logWorkflowEvent(workflowName, msg.runId, "failed", { error: msg.error });
return;
}
if (msg.type === "error") {
process.stderr.write(
`[workflow-manager] error from "${workflowName}" worker: ${msg.error}\n`,
);
}
});
child.on("exit", (code) => {
const entry = workers.get(workflowName);
if (entry?.stopping) {
workers.delete(workflowName);
const state = states.get(workflowName);
if (state !== undefined) {
state.active.clear();
}
return;
}
process.stderr.write(
`[workflow-manager] worker for "${workflowName}" exited with code ${code ?? "null"}\n`,
);
handleWorkerCrash(workflowName);
});
const entry: WorkerEntry = { workflowName, process: child, stopping: false };
workers.set(workflowName, entry);
return entry;
}
function startWorkflow(workflowName: string, payload: unknown): void {
if (stopped) return;
const wfConfig = workflowConfig(workflowName);
if (wfConfig === null) {
process.stderr.write(
`[workflow-manager] startWorkflow: unknown workflow "${workflowName}"\n`,
);
return;
}
const state = getOrCreateState(workflowName);
const runId = crypto.randomUUID();
if (state.active.size < wfConfig.concurrency) {
dispatchThread(workflowName, runId, payload);
return;
}
// At concurrency limit — apply overflow policy
if (wfConfig.overflow === "drop") {
logWorkflowEvent(workflowName, runId, "dropped");
process.stderr.write(
`[workflow-manager] dropped thread for "${workflowName}" (overflow=drop)\n`,
);
return;
}
// overflow === "queue"
const maxQueue = (wfConfig as { maxQueue?: number }).maxQueue ?? DEFAULT_MAX_QUEUE;
if (state.queue.length >= maxQueue) {
const oldest = state.queue.shift();
if (oldest !== undefined) {
process.stderr.write(
`[workflow-manager] queue overflow for "${workflowName}", dropping oldest runId "${oldest.runId}"\n`,
);
logWorkflowEvent(workflowName, oldest.runId, "dropped");
}
}
state.queue.push({ runId, payload });
logWorkflowEvent(workflowName, runId, "queued");
process.stderr.write(
`[workflow-manager] queued thread for "${workflowName}" runId "${runId}" (queue length: ${state.queue.length})\n`,
);
}
function activeCount(workflowName: string): number {
return states.get(workflowName)?.active.size ?? 0;
}
function queueLength(workflowName: string): number {
return states.get(workflowName)?.queue.length ?? 0;
}
async function stop(): Promise<void> {
stopped = true;
const exitPromises: Promise<void>[] = [];
for (const entry of workers.values()) {
sendShutdown(entry.process, entry);
exitPromises.push(waitForExit(entry.process, 5000));
}
await Promise.all(exitPromises);
workers.clear();
}
return { startWorkflow, activeCount, queueLength, stop };
}
+244
View File
@@ -0,0 +1,244 @@
/**
* Workflow Worker runtime bootstrap.
*
* Entry point for a forked workflow worker process.
* Receives the workflow name and nerve root via CLI args, dynamically imports
* the user's WorkflowDefinition, then signals ready and enters the IPC event loop.
*
* Layout assumptions (nerve user config at `<nerveRoot>/`):
* workflows/<name>/index.ts (or .js) ← user workflow definition
*/
import { resolve, join } from "node:path";
import { existsSync } from "node:fs";
import type {
CommandEvent,
ThreadState,
WorkflowContext,
WorkflowDefinition,
} from "@uncaged/nerve-core";
import type { WorkerToParentMessage, ThreadEventType } from "./ipc.js";
import { parseParentMessage } from "./ipc.js";
// ---------------------------------------------------------------------------
// IPC helpers
// ---------------------------------------------------------------------------
function send(msg: WorkerToParentMessage): void {
if (process.send) {
process.send(msg);
}
}
function sendReady(): void {
send({ type: "ready" });
}
function sendThreadEvent(runId: string, eventType: ThreadEventType, payload: unknown): void {
send({ type: "thread-event", runId, eventType, payload });
}
function sendWorkflowError(runId: string, error: string): void {
send({ type: "workflow-error", runId, error });
}
// ---------------------------------------------------------------------------
// Thread loop (RFC-002 §5.4)
// ---------------------------------------------------------------------------
async function runThread(
def: WorkflowDefinition,
workflowName: string,
runId: string,
triggerPayload: unknown,
): Promise<void> {
const state: ThreadState = { runId, events: [] };
const ctx: WorkflowContext = {
runId,
workflowName,
log: (msg) =>
sendThreadEvent(runId, "step_complete", { message: msg }),
};
let event: CommandEvent = {
type: "thread_start",
triggerPayload: triggerPayload != null && typeof triggerPayload === "object" ? triggerPayload : {},
};
const MAX_STEPS = 1000;
let step = 0;
while (step < MAX_STEPS) {
step++;
state.events.push(event);
const next = def.moderate(state, event);
if (next === null) {
sendThreadEvent(runId, "completed", null);
return;
}
const role = def.roles[next.role];
if (!role) {
sendWorkflowError(runId, `Unknown role: ${next.role}`);
return;
}
try {
event = await role.execute(next.prompt, ctx);
} catch (e: unknown) {
const errMsg = e instanceof Error ? e.message : String(e);
sendThreadEvent(runId, "failed", { error: errMsg });
return;
}
}
if (step >= MAX_STEPS) {
sendWorkflowError(runId, `Thread exceeded maximum steps (${MAX_STEPS})`);
}
}
// ---------------------------------------------------------------------------
// Workflow definition loader
// ---------------------------------------------------------------------------
async function loadWorkflowDefinition(
nerveRoot: string,
workflowName: string,
): Promise<WorkflowDefinition> {
const candidates = [
resolve(join(nerveRoot, "workflows", workflowName, "index.ts")),
resolve(join(nerveRoot, "workflows", workflowName, "index.js")),
];
const indexPath = candidates.find((p) => existsSync(p));
if (!indexPath) {
throw new Error(
`Workflow definition not found for "${workflowName}". Tried:\n` +
candidates.map((p) => ` ${p}`).join("\n"),
);
}
const mod = await import(indexPath);
const def: unknown = mod.default ?? mod;
if (
def === null ||
typeof def !== "object" ||
typeof (def as WorkflowDefinition).moderate !== "function" ||
typeof (def as WorkflowDefinition).roles !== "object"
) {
throw new Error(
`Workflow "${workflowName}" must export a WorkflowDefinition with "roles" and "moderate".`,
);
}
return def as WorkflowDefinition;
}
// ---------------------------------------------------------------------------
// IPC message dispatch
// ---------------------------------------------------------------------------
function handleMessage(
raw: unknown,
def: WorkflowDefinition,
workflowName: string,
inFlight: Map<string, Promise<void>>,
shuttingDown: { value: boolean },
): void {
const parseResult = parseParentMessage(raw);
if (!parseResult.ok) {
process.stderr.write(`[workflow-worker] Invalid IPC message: ${parseResult.error.message}\n`);
return;
}
const msg = parseResult.value;
if (msg.type === "shutdown") {
shuttingDown.value = true;
const timeout = new Promise<void>(r => setTimeout(r, 10_000));
Promise.race([Promise.all(inFlight.values()), timeout])
.then(() => process.exit(0))
.catch(() => process.exit(1));
return;
}
if (msg.type === "start-thread") {
if (shuttingDown.value) return;
const { runId, triggerPayload } = msg;
const previous = inFlight.get(runId) ?? Promise.resolve();
const next = previous
.then(() => runThread(def, workflowName, runId, triggerPayload))
.catch((e: unknown) => {
const errMsg = e instanceof Error ? e.message : String(e);
sendWorkflowError(runId, errMsg);
})
.finally(() => {
inFlight.delete(runId);
});
inFlight.set(runId, next);
return;
}
}
// ---------------------------------------------------------------------------
// Bootstrap
// ---------------------------------------------------------------------------
async function bootstrap(nerveRoot: string, workflowName: string): Promise<void> {
let def: WorkflowDefinition;
try {
def = await loadWorkflowDefinition(nerveRoot, workflowName);
} catch (e: unknown) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[workflow-worker] Failed to load workflow "${workflowName}": ${msg}\n`);
process.exit(1);
}
const inFlight = new Map<string, Promise<void>>();
const shuttingDown = { value: false };
sendReady();
process.on("message", (raw: unknown) => {
handleMessage(raw, def, workflowName, inFlight, shuttingDown);
});
}
// ---------------------------------------------------------------------------
// CLI entrypoint
// ---------------------------------------------------------------------------
function parseArgs(): { nerveRoot: string; workflow: string } | null {
const args = process.argv.slice(2);
let workflow: string | null = null;
let nerveRoot: string | null = null;
for (let i = 0; i < args.length; i++) {
if (args[i] === "--workflow" && i + 1 < args.length) {
workflow = args[i + 1];
i++;
} else if (args[i] === "--root" && i + 1 < args.length) {
nerveRoot = args[i + 1];
i++;
}
}
if (!workflow || !nerveRoot) return null;
return { nerveRoot, workflow };
}
const parsed = parseArgs();
if (!parsed) {
process.stderr.write("Usage: workflow-worker --workflow <name> --root <nerve-root-dir>\n");
process.exit(1);
}
bootstrap(parsed.nerveRoot, parsed.workflow).catch((e) => {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[workflow-worker] Unhandled bootstrap error: ${msg}\n`);
process.exit(1);
});