diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 2972b7c..55b6b16 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -8,6 +8,14 @@ export type { QueueOverflowConfig, WorkflowConfig, NerveConfig, + CommandEvent, + ThreadState, + ModerateResult, + WorkflowContext, + RoleExecuteFn, + Role, + ModerateFn, + WorkflowDefinition, } from "./types.js"; export type { Result } from "./result.js"; export { ok, err } from "./result.js"; diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index 67993eb..ff6c57a 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -45,3 +45,58 @@ export type NerveConfig = { reflexes: ReflexConfig[]; workflows: Record | null; }; + +// --------------------------------------------------------------------------- +// Workflow Engine types (RFC-002) +// --------------------------------------------------------------------------- + +/** A single event in the command event stream that drives a workflow thread. */ +export type CommandEvent = { + type: string; + [key: string]: unknown; +}; + +/** Accumulated state of a running thread — the event history for moderate(). */ +export type ThreadState = { + runId: string; + /** All events so far, including the initial thread_start event. */ + events: CommandEvent[]; +}; + +/** The result of moderate() — which role to hand to next, and what prompt to pass. */ +export type ModerateResult = { + role: string; + prompt: unknown; +}; + +/** Context injected into every role execute() call. */ +export type WorkflowContext = { + runId: string; + workflowName: string; + /** Emit a log message back to the parent process. */ + log: (message: string) => void; +}; + +/** + * A role's execute function. Has side effects (API calls, file I/O, etc.). + * Returns a CommandEvent that is fed back into moderate(). + */ +export type RoleExecuteFn = (prompt: unknown, ctx: WorkflowContext) => Promise; + +/** A role in a workflow — a named unit of execution with side effects. */ +export type Role = { + execute: RoleExecuteFn; +}; + +/** + * The moderator function — pure, no side effects. + * Decides which role to pass control to next. + * Returns null to signal thread completion. + */ +export type ModerateFn = (thread: ThreadState, event: CommandEvent) => ModerateResult | null; + +/** The complete definition of a workflow, as authored by users. */ +export type WorkflowDefinition = { + roles: Record; + moderate: ModerateFn; +}; diff --git a/packages/daemon/src/__tests__/log-store-workflow.test.ts b/packages/daemon/src/__tests__/log-store-workflow.test.ts new file mode 100644 index 0000000..41bdcc6 --- /dev/null +++ b/packages/daemon/src/__tests__/log-store-workflow.test.ts @@ -0,0 +1,186 @@ +/** + * Tests for workflow-related LogStore functionality (RFC-002 §6.2). + */ + +import { mkdtempSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; + +import { createLogStore } from "../log-store.js"; +import type { LogStore, WorkflowRun } from "../log-store.js"; + +describe("LogStore — workflow_runs", () => { + let tmpDir: string; + let store: LogStore; + + beforeEach(() => { + tmpDir = mkdtempSync(join(tmpdir(), "nerve-wf-log-test-")); + store = createLogStore(join(tmpDir, "data", "logs.db")); + }); + + afterEach(() => { + store.close(); + rmSync(tmpDir, { recursive: true, force: true }); + }); + + describe("upsertWorkflowRun", () => { + it("writes a log entry and creates a workflow_runs row atomically", () => { + const run: WorkflowRun = { + runId: "run-1", + workflow: "cleanup", + status: "started", + ts: 1000, + }; + + const entry = store.upsertWorkflowRun( + { source: "workflow", type: "started", refId: "run-1", payload: null, ts: 1000 }, + run, + ); + + expect(entry.id).toBeGreaterThan(0); + expect(entry.source).toBe("workflow"); + expect(entry.type).toBe("started"); + + const stored = store.getWorkflowRun("run-1"); + expect(stored).not.toBeNull(); + expect(stored?.runId).toBe("run-1"); + expect(stored?.workflow).toBe("cleanup"); + expect(stored?.status).toBe("started"); + expect(stored?.ts).toBe(1000); + }); + + it("updates existing workflow_runs row on upsert (status transition)", () => { + store.upsertWorkflowRun( + { source: "workflow", type: "started", refId: "run-2", payload: null, ts: 1000 }, + { runId: "run-2", workflow: "cleanup", status: "started", ts: 1000 }, + ); + + store.upsertWorkflowRun( + { source: "workflow", type: "completed", refId: "run-2", payload: null, ts: 2000 }, + { runId: "run-2", workflow: "cleanup", status: "completed", ts: 2000 }, + ); + + const stored = store.getWorkflowRun("run-2"); + expect(stored?.status).toBe("completed"); + expect(stored?.ts).toBe(2000); + + // Both log entries should be present (event sourcing) + const logs = store.query({ refId: "run-2" }); + expect(logs).toHaveLength(2); + }); + + it("the log entries act as source of truth for event history", () => { + for (const [type, status, ts] of [ + ["queued", "queued", 1000], + ["started", "started", 1001], + ["step_complete", "started", 1002], + ["completed", "completed", 1005], + ] as const) { + store.upsertWorkflowRun( + { source: "workflow", type, refId: "run-3", payload: null, ts }, + { runId: "run-3", workflow: "cleanup", status, ts }, + ); + } + + const logs = store.query({ source: "workflow", refId: "run-3" }); + expect(logs).toHaveLength(4); + expect(logs[0].type).toBe("queued"); + expect(logs[3].type).toBe("completed"); + }); + }); + + describe("getWorkflowRun", () => { + it("returns null for a non-existent runId", () => { + expect(store.getWorkflowRun("no-such-run")).toBeNull(); + }); + + it("returns the latest state after multiple upserts", () => { + store.upsertWorkflowRun( + { source: "workflow", type: "queued", refId: "run-4", payload: null, ts: 100 }, + { runId: "run-4", workflow: "code-review", status: "queued", ts: 100 }, + ); + store.upsertWorkflowRun( + { source: "workflow", type: "started", refId: "run-4", payload: null, ts: 200 }, + { runId: "run-4", workflow: "code-review", status: "started", ts: 200 }, + ); + + const run = store.getWorkflowRun("run-4"); + expect(run?.status).toBe("started"); + expect(run?.ts).toBe(200); + }); + }); + + describe("getActiveWorkflowRuns", () => { + beforeEach(() => { + store.upsertWorkflowRun( + { source: "workflow", type: "queued", refId: "r1", payload: null, ts: 100 }, + { runId: "r1", workflow: "cleanup", status: "queued", ts: 100 }, + ); + store.upsertWorkflowRun( + { source: "workflow", type: "started", refId: "r2", payload: null, ts: 200 }, + { runId: "r2", workflow: "cleanup", status: "started", ts: 200 }, + ); + store.upsertWorkflowRun( + { source: "workflow", type: "completed", refId: "r3", payload: null, ts: 300 }, + { runId: "r3", workflow: "cleanup", status: "completed", ts: 300 }, + ); + store.upsertWorkflowRun( + { source: "workflow", type: "failed", refId: "r4", payload: null, ts: 400 }, + { runId: "r4", workflow: "deploy", status: "queued", ts: 400 }, + ); + }); + + it("returns queued and started runs by default", () => { + const active = store.getActiveWorkflowRuns(); + expect(active).toHaveLength(3); + const ids = active.map((r) => r.runId); + expect(ids).toContain("r1"); + expect(ids).toContain("r2"); + expect(ids).toContain("r4"); + }); + + it("excludes completed and failed runs", () => { + const active = store.getActiveWorkflowRuns(); + const ids = active.map((r) => r.runId); + expect(ids).not.toContain("r3"); + }); + + it("filters by workflow name", () => { + const cleanupRuns = store.getActiveWorkflowRuns("cleanup"); + expect(cleanupRuns).toHaveLength(2); + const ids = cleanupRuns.map((r) => r.runId); + expect(ids).toContain("r1"); + expect(ids).toContain("r2"); + }); + + it("returns only matching workflow when name given", () => { + const deployRuns = store.getActiveWorkflowRuns("deploy"); + expect(deployRuns).toHaveLength(1); + expect(deployRuns[0].runId).toBe("r4"); + }); + + it("returns empty array when workflow name matches no active runs", () => { + expect(store.getActiveWorkflowRuns("nonexistent")).toHaveLength(0); + }); + + it("returns runs ordered by ts ascending", () => { + const active = store.getActiveWorkflowRuns(); + expect(active[0].ts).toBeLessThan(active[1].ts); + }); + }); + + describe("all statuses are storable", () => { + it.each(["queued", "started", "completed", "failed", "crashed", "dropped"] as const)( + "stores status=%s", + (status) => { + const runId = `run-${status}`; + store.upsertWorkflowRun( + { source: "workflow", type: status, refId: runId, payload: null, ts: 1 }, + { runId, workflow: "test", status, ts: 1 }, + ); + expect(store.getWorkflowRun(runId)?.status).toBe(status); + }, + ); + }); +}); diff --git a/packages/daemon/src/__tests__/workflow-manager.test.ts b/packages/daemon/src/__tests__/workflow-manager.test.ts new file mode 100644 index 0000000..fc39e43 --- /dev/null +++ b/packages/daemon/src/__tests__/workflow-manager.test.ts @@ -0,0 +1,362 @@ +/** + * Tests for WorkflowManager — concurrency, overflow, queue, and lifecycle. + */ + +import { EventEmitter } from "node:events"; + +import type { NerveConfig } from "@uncaged/nerve-core"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +// --------------------------------------------------------------------------- +// Mock child_process.fork before importing workflow-manager +// --------------------------------------------------------------------------- + +type MockChild = EventEmitter & { + send: ReturnType; + kill: ReturnType; + connected: boolean; + exitCode: number | null; + pid: number; +}; + +const mockChildren: MockChild[] = []; + +function makeMockChild(pid = 1): MockChild { + const child = new EventEmitter() as MockChild; + child.connected = true; + child.exitCode = null; + child.pid = pid; + child.send = vi.fn((msg: unknown) => { + if ( + msg !== null && + typeof msg === "object" && + (msg as Record).type === "shutdown" + ) { + setImmediate(() => { + child.exitCode = 0; + child.connected = false; + child.emit("exit", 0, null); + }); + } + }); + child.kill = vi.fn((_signal?: string) => { + child.exitCode = 1; + child.connected = false; + child.emit("exit", null, _signal ?? "SIGKILL"); + }); + return child; +} + +vi.mock("node:child_process", () => ({ + fork: vi.fn((_script: string, _args: string[], _opts: unknown) => { + const child = makeMockChild(mockChildren.length + 1); + mockChildren.push(child); + return child; + }), +})); + +// Import after mock is set up +const { createWorkflowManager } = await import("../workflow-manager.js"); + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function makeLogStore() { + return { + append: vi.fn(), + query: vi.fn(() => []), + getMeta: vi.fn(() => null), + setMeta: vi.fn(), + upsertWorkflowRun: vi.fn(), + appendWithWorkflowUpdate: vi.fn(), + getWorkflowRun: vi.fn(() => null), + getActiveWorkflowRuns: vi.fn(() => []), + close: vi.fn(), + }; +} + +function makeConfig(overrides: Partial = {}): NerveConfig { + return { + senses: {}, + reflexes: [], + workflows: overrides as NerveConfig["workflows"], + }; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe("WorkflowManager", () => { + beforeEach(() => { + mockChildren.length = 0; + vi.useFakeTimers({ shouldAdvanceTime: true }); + }); + + afterEach(() => { + vi.useRealTimers(); + vi.clearAllMocks(); + }); + + describe("startWorkflow under concurrency limit dispatches thread", () => { + it("forks a worker and sends start-thread when active < concurrency", () => { + const logStore = makeLogStore(); + const config = makeConfig({ + "my-workflow": { concurrency: 2, overflow: "drop" }, + }); + const mgr = createWorkflowManager("/nerve-root", config, logStore); + + mgr.startWorkflow("my-workflow", { event: "test" }); + + expect(mockChildren).toHaveLength(1); + expect(mockChildren[0].send).toHaveBeenCalledWith( + expect.objectContaining({ type: "start-thread", workflow: "my-workflow" }), + ); + expect(mgr.activeCount("my-workflow")).toBe(1); + }); + + it("reuses the same worker for a second thread under the limit", () => { + const logStore = makeLogStore(); + const config = makeConfig({ + "my-workflow": { concurrency: 3, overflow: "drop" }, + }); + const mgr = createWorkflowManager("/nerve-root", config, logStore); + + mgr.startWorkflow("my-workflow", { n: 1 }); + mgr.startWorkflow("my-workflow", { n: 2 }); + + // Only one forked child — worker is reused + expect(mockChildren).toHaveLength(1); + expect(mockChildren[0].send).toHaveBeenCalledTimes(2); + expect(mgr.activeCount("my-workflow")).toBe(2); + }); + + it("logs a 'started' event for each dispatched thread", () => { + const logStore = makeLogStore(); + const config = makeConfig({ + "my-workflow": { concurrency: 2, overflow: "drop" }, + }); + const mgr = createWorkflowManager("/nerve-root", config, logStore); + + mgr.startWorkflow("my-workflow", { x: 1 }); + + expect(logStore.upsertWorkflowRun).toHaveBeenCalledWith( + expect.objectContaining({ source: "workflow", type: "started" }), + expect.objectContaining({ workflow: "my-workflow", status: "started" }), + ); + }); + }); + + describe("startWorkflow at limit with drop overflow drops the request", () => { + it("does NOT send start-thread when at concurrency limit with overflow=drop", () => { + const logStore = makeLogStore(); + const config = makeConfig({ + "drop-wf": { concurrency: 1, overflow: "drop" }, + }); + const mgr = createWorkflowManager("/nerve-root", config, logStore); + + mgr.startWorkflow("drop-wf", { first: true }); + // now at limit — second call should be dropped + mgr.startWorkflow("drop-wf", { second: true }); + + expect(mgr.activeCount("drop-wf")).toBe(1); + expect(mgr.queueLength("drop-wf")).toBe(0); + // Only one start-thread sent + expect(mockChildren[0].send).toHaveBeenCalledTimes(1); + }); + + it("logs a 'dropped' event when overflow=drop", () => { + const logStore = makeLogStore(); + const config = makeConfig({ + "drop-wf": { concurrency: 1, overflow: "drop" }, + }); + const mgr = createWorkflowManager("/nerve-root", config, logStore); + + mgr.startWorkflow("drop-wf", {}); + mgr.startWorkflow("drop-wf", {}); + + const droppedCall = logStore.upsertWorkflowRun.mock.calls.find( + ([entry]) => entry.type === "dropped", + ); + expect(droppedCall).toBeDefined(); + }); + }); + + describe("startWorkflow at limit with queue overflow queues the request", () => { + it("queues the thread when at concurrency limit with overflow=queue", () => { + const logStore = makeLogStore(); + const config = makeConfig({ + "queue-wf": { concurrency: 1, overflow: "queue", maxQueue: 5 }, + }); + const mgr = createWorkflowManager("/nerve-root", config, logStore); + + mgr.startWorkflow("queue-wf", { first: true }); + mgr.startWorkflow("queue-wf", { second: true }); + + expect(mgr.activeCount("queue-wf")).toBe(1); + expect(mgr.queueLength("queue-wf")).toBe(1); + }); + + it("logs a 'queued' event for the waiting thread", () => { + const logStore = makeLogStore(); + const config = makeConfig({ + "queue-wf": { concurrency: 1, overflow: "queue", maxQueue: 5 }, + }); + const mgr = createWorkflowManager("/nerve-root", config, logStore); + + mgr.startWorkflow("queue-wf", {}); + mgr.startWorkflow("queue-wf", {}); + + const queuedCall = logStore.upsertWorkflowRun.mock.calls.find( + ([entry]) => entry.type === "queued", + ); + expect(queuedCall).toBeDefined(); + expect(queuedCall?.[1]).toMatchObject({ workflow: "queue-wf", status: "queued" }); + }); + }); + + describe("queue overflow with maxQueue drops oldest when full", () => { + it("drops oldest queued item when queue reaches maxQueue", () => { + const logStore = makeLogStore(); + const config = makeConfig({ + "queue-wf": { concurrency: 1, overflow: "queue", maxQueue: 2 }, + }); + const mgr = createWorkflowManager("/nerve-root", config, logStore); + + // Fill the concurrency slot + mgr.startWorkflow("queue-wf", { n: 0 }); + // Fill the queue to maxQueue + mgr.startWorkflow("queue-wf", { n: 1 }); + mgr.startWorkflow("queue-wf", { n: 2 }); + // This one should push out { n: 1 } + mgr.startWorkflow("queue-wf", { n: 3 }); + + // Queue should still be at maxQueue (2) + expect(mgr.queueLength("queue-wf")).toBe(2); + + // There should be a dropped event (for the oldest evicted item) + const droppedCalls = logStore.upsertWorkflowRun.mock.calls.filter( + ([entry]) => entry.type === "dropped", + ); + expect(droppedCalls).toHaveLength(1); + }); + }); + + describe("completing a thread dequeues the next one", () => { + it("dispatches the next queued thread when the active thread sends completed", () => { + const logStore = makeLogStore(); + const config = makeConfig({ + "queue-wf": { concurrency: 1, overflow: "queue", maxQueue: 5 }, + }); + const mgr = createWorkflowManager("/nerve-root", config, logStore); + + mgr.startWorkflow("queue-wf", { first: true }); + mgr.startWorkflow("queue-wf", { second: true }); + + expect(mgr.activeCount("queue-wf")).toBe(1); + expect(mgr.queueLength("queue-wf")).toBe(1); + + // Simulate the worker sending a "thread-event: completed" + const child = mockChildren[0]; + const startCalls = (child.send as ReturnType).mock.calls; + const firstRunId = (startCalls[0][0] as { runId: string }).runId; + + child.emit("message", { + type: "thread-event", + runId: firstRunId, + eventType: "completed", + payload: null, + }); + + // The queued thread should now be active + expect(mgr.activeCount("queue-wf")).toBe(1); + expect(mgr.queueLength("queue-wf")).toBe(0); + // A second start-thread message should have been sent + expect(child.send).toHaveBeenCalledTimes(2); + expect(child.send).toHaveBeenLastCalledWith( + expect.objectContaining({ type: "start-thread", workflow: "queue-wf" }), + ); + }); + + it("dispatches next queued thread when active thread sends failed", () => { + const logStore = makeLogStore(); + const config = makeConfig({ + "queue-wf": { concurrency: 1, overflow: "queue", maxQueue: 5 }, + }); + const mgr = createWorkflowManager("/nerve-root", config, logStore); + + mgr.startWorkflow("queue-wf", { first: true }); + mgr.startWorkflow("queue-wf", { second: true }); + + const child = mockChildren[0]; + const firstRunId = (child.send as ReturnType).mock.calls[0][0].runId as string; + + child.emit("message", { + type: "thread-event", + runId: firstRunId, + eventType: "failed", + payload: { error: "boom" }, + }); + + expect(mgr.activeCount("queue-wf")).toBe(1); + expect(mgr.queueLength("queue-wf")).toBe(0); + }); + }); + + describe("stop() shuts down all workers", () => { + it("sends shutdown to every worker and resolves when they exit", async () => { + const logStore = makeLogStore(); + const config = makeConfig({ + "wf-a": { concurrency: 2, overflow: "drop" }, + "wf-b": { concurrency: 1, overflow: "drop" }, + }); + const mgr = createWorkflowManager("/nerve-root", config, logStore); + + mgr.startWorkflow("wf-a", {}); + mgr.startWorkflow("wf-b", {}); + + // Two distinct workers should have been forked + expect(mockChildren).toHaveLength(2); + + const stopPromise = mgr.stop(); + // Let setImmediate callbacks run (the mock exits on shutdown) + await vi.runAllTimersAsync(); + await stopPromise; + + for (const child of mockChildren) { + expect(child.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" })); + } + }); + + it("ignores startWorkflow calls after stop()", async () => { + const logStore = makeLogStore(); + const config = makeConfig({ + "wf-a": { concurrency: 2, overflow: "drop" }, + }); + const mgr = createWorkflowManager("/nerve-root", config, logStore); + + const stopPromise = mgr.stop(); + await vi.runAllTimersAsync(); + await stopPromise; + + mgr.startWorkflow("wf-a", {}); + + // No worker should have been spawned + expect(mockChildren).toHaveLength(0); + }); + }); + + describe("unknown workflow name", () => { + it("does nothing for an unknown workflow name", () => { + const logStore = makeLogStore(); + const config = makeConfig({}); + const mgr = createWorkflowManager("/nerve-root", config, logStore); + + mgr.startWorkflow("no-such-workflow", {}); + + expect(mockChildren).toHaveLength(0); + expect(logStore.upsertWorkflowRun).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/packages/daemon/src/index.ts b/packages/daemon/src/index.ts index dba5af2..1437c07 100644 --- a/packages/daemon/src/index.ts +++ b/packages/daemon/src/index.ts @@ -8,6 +8,10 @@ export type { ErrorMessage, ReadyMessage, WorkerToParentMessage, + StartThreadMessage, + ResumeThreadMessage, + ThreadEventMessage, + WorkflowErrorMessage, } from "./ipc.js"; export type { SignalBus, SignalHandler, Unsubscribe } from "./signal-bus.js"; @@ -29,4 +33,7 @@ export { createFileWatcher } from "./file-watcher.js"; export type { FileWatcher, FileChange, FileChangeHandler } from "./file-watcher.js"; export { createLogStore } from "./log-store.js"; -export type { LogStore, LogEntry, LogQuery } from "./log-store.js"; +export type { LogStore, LogEntry, LogQuery, WorkflowRun, WorkflowRunStatus } from "./log-store.js"; + +export { createWorkflowManager } from "./workflow-manager.js"; +export type { WorkflowManager } from "./workflow-manager.js"; diff --git a/packages/daemon/src/ipc.ts b/packages/daemon/src/ipc.ts index 4a5c2b5..0d45ebc 100644 --- a/packages/daemon/src/ipc.ts +++ b/packages/daemon/src/ipc.ts @@ -22,8 +22,32 @@ export type HealthRequestMessage = { type: "health-request"; }; +// --------------------------------------------------------------------------- +// Workflow IPC messages (RFC-002 §5.2) +// --------------------------------------------------------------------------- + +/** Parent → Workflow Worker: start a new thread */ +export type StartThreadMessage = { + type: "start-thread"; + runId: string; + workflow: string; + /** The trigger payload from the Reflex that initiated this thread. */ + triggerPayload: unknown; +}; + +/** Parent → Workflow Worker: resume an existing thread after crash recovery */ +export type ResumeThreadMessage = { + type: "resume-thread"; + runId: string; +}; + /** Union of all messages the parent sends to a worker */ -export type ParentToWorkerMessage = ComputeMessage | ShutdownMessage | HealthRequestMessage; +export type ParentToWorkerMessage = + | ComputeMessage + | ShutdownMessage + | HealthRequestMessage + | StartThreadMessage + | ResumeThreadMessage; /** Worker → Parent: compute produced a signal */ export type SignalMessage = { @@ -51,14 +75,46 @@ export type HealthResponseMessage = { inFlightCount: number; }; +// --------------------------------------------------------------------------- +// Workflow Worker → Parent messages (RFC-002 §5.2) +// --------------------------------------------------------------------------- + +/** Valid lifecycle event types for a workflow thread. */ +export type ThreadEventType = "queued" | "started" | "step_complete" | "completed" | "failed"; + +/** + * Workflow Worker → Parent: a thread lifecycle event. + */ +export type ThreadEventMessage = { + type: "thread-event"; + runId: string; + eventType: ThreadEventType; + payload: unknown; +}; + +/** Workflow Worker → Parent: a thread encountered an unrecoverable error. */ +export type WorkflowErrorMessage = { + type: "workflow-error"; + runId: string; + error: string; +}; + /** Union of all messages a worker sends to the parent */ export type WorkerToParentMessage = | SignalMessage | ErrorMessage | ReadyMessage - | HealthResponseMessage; + | HealthResponseMessage + | ThreadEventMessage + | WorkflowErrorMessage; -const PARENT_MSG_TYPES = new Set(["compute", "shutdown", "health-request"]); +const PARENT_MSG_TYPES = new Set([ + "compute", + "shutdown", + "health-request", + "start-thread", + "resume-thread", +]); /** Validate and parse an unknown IPC message received from the parent process. */ export function parseParentMessage(raw: unknown): Result { @@ -72,6 +128,18 @@ export function parseParentMessage(raw: unknown): Result if (!PARENT_MSG_TYPES.has(obj.type)) { return err(new Error(`Unknown IPC message type: "${obj.type}"`)); } + if (obj.type === "start-thread") { + if (typeof obj.runId !== "string") + return err(new Error("'start-thread' message missing string 'runId'")); + if (typeof obj.workflow !== "string") + return err(new Error("'start-thread' message missing string 'workflow'")); + if (!("triggerPayload" in obj)) + return err(new Error("'start-thread' message missing 'triggerPayload'")); + } + if (obj.type === "resume-thread") { + if (typeof obj.runId !== "string") + return err(new Error("'resume-thread' message missing string 'runId'")); + } return ok(raw as ParentToWorkerMessage); } @@ -108,7 +176,51 @@ function parseHealthResponseMsg( return ok(raw as HealthResponseMessage); } -const WORKER_MSG_TYPES = new Set(["signal", "error", "ready", "health-response"]); +const THREAD_EVENT_TYPES = new Set([ + "queued", + "started", + "step_complete", + "completed", + "failed", +]); + +function parseThreadEventMsg( + obj: Record, + raw: unknown, +): Result { + if (typeof obj.runId !== "string") { + return err(new Error("Worker 'thread-event' message missing string 'runId' field")); + } + if (typeof obj.eventType !== "string" || !THREAD_EVENT_TYPES.has(obj.eventType)) { + return err(new Error(`Worker 'thread-event' message has invalid 'eventType': "${obj.eventType}"`)); + } + if (!("payload" in obj)) { + return err(new Error("Worker 'thread-event' message missing 'payload' field")); + } + return ok(raw as ThreadEventMessage); +} + +function parseWorkflowErrorMsg( + obj: Record, + raw: unknown, +): Result { + if (typeof obj.runId !== "string") { + return err(new Error("Worker 'workflow-error' message missing string 'runId' field")); + } + if (typeof obj.error !== "string") { + return err(new Error("Worker 'workflow-error' message missing string 'error' field")); + } + return ok(raw as WorkflowErrorMessage); +} + +const WORKER_MSG_TYPES = new Set([ + "signal", + "error", + "ready", + "health-response", + "thread-event", + "workflow-error", +]); /** Validate and parse an unknown IPC message received from a worker process. */ export function parseWorkerMessage(raw: unknown): Result { @@ -125,5 +237,7 @@ export function parseWorkerMessage(raw: unknown): Result if (obj.type === "signal") return parseSignalMsg(obj, raw); if (obj.type === "error") return parseErrorMsg(obj, raw); if (obj.type === "health-response") return parseHealthResponseMsg(obj, raw); + if (obj.type === "thread-event") return parseThreadEventMsg(obj, raw); + if (obj.type === "workflow-error") return parseWorkflowErrorMsg(obj, raw); return ok({ type: "ready" }); } diff --git a/packages/daemon/src/log-store.ts b/packages/daemon/src/log-store.ts index 9ed3c3a..6759335 100644 --- a/packages/daemon/src/log-store.ts +++ b/packages/daemon/src/log-store.ts @@ -30,11 +30,70 @@ export type LogQuery = { limit?: number; }; +// --------------------------------------------------------------------------- +// Workflow runs materialized view (RFC-002 §6.2) +// --------------------------------------------------------------------------- + +export type WorkflowRunStatus = + | "queued" + | "started" + | "completed" + | "failed" + | "crashed" + | "dropped"; + +const VALID_WORKFLOW_STATUSES = new Set([ + "queued", + "started", + "completed", + "failed", + "crashed", + "dropped", +]); + +function validateWorkflowRunStatus(status: string): WorkflowRunStatus { + if (!VALID_WORKFLOW_STATUSES.has(status)) { + throw new Error(`Invalid workflow run status from DB: "${status}"`); + } + return status as WorkflowRunStatus; +} + +/** One row in the workflow_runs materialized table. */ +export type WorkflowRun = { + runId: string; + workflow: string; + status: WorkflowRunStatus; + ts: number; +}; + export type LogStore = { append: (entry: Omit) => LogEntry; query: (filter?: LogQuery) => LogEntry[]; getMeta: (key: string) => string | null; setMeta: (key: string, value: string) => void; + /** + * Append a workflow log event and atomically upsert the workflow_runs + * materialized table — both in a single SQLite transaction (RFC-002 §6.2). + */ + upsertWorkflowRun: ( + entry: Omit, + run: WorkflowRun, + ) => LogEntry; + /** + * Alias for upsertWorkflowRun — append a log entry and update workflow_runs + * in one atomic transaction. + */ + appendWithWorkflowUpdate: ( + entry: Omit, + run: WorkflowRun, + ) => LogEntry; + /** Get the current materialized state of a specific workflow run. */ + getWorkflowRun: (runId: string) => WorkflowRun | null; + /** + * Get all workflow runs with status 'queued' or 'started'. + * Optionally filter by workflow name. + */ + getActiveWorkflowRuns: (workflowName?: string) => WorkflowRun[]; close: () => void; }; @@ -56,6 +115,16 @@ CREATE TABLE IF NOT EXISTS meta ( key TEXT PRIMARY KEY, value TEXT NOT NULL ); + +CREATE TABLE IF NOT EXISTS workflow_runs ( + run_id TEXT PRIMARY KEY, + workflow TEXT NOT NULL, + status TEXT NOT NULL, + ts INTEGER NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_workflow_runs_status ON workflow_runs(status); +CREATE INDEX IF NOT EXISTS idx_workflow_runs_workflow ON workflow_runs(workflow); `; export function createLogStore(dbPath: string): LogStore { @@ -74,6 +143,41 @@ export function createLogStore(dbPath: string): LogStore { "INSERT INTO meta (key, value) VALUES (@key, @value) ON CONFLICT(key) DO UPDATE SET value = @value", ); + const upsertWorkflowRunStmt = sqlite.prepare( + "INSERT OR REPLACE INTO workflow_runs (run_id, workflow, status, ts) VALUES (@runId, @workflow, @status, @ts)", + ); + + const getWorkflowRunStmt = sqlite.prepare( + "SELECT run_id, workflow, status, ts FROM workflow_runs WHERE run_id = ?", + ); + + const getActiveWorkflowRunsStmt = sqlite.prepare( + "SELECT run_id, workflow, status, ts FROM workflow_runs WHERE status IN ('queued', 'started') ORDER BY ts ASC", + ); + + const getActiveWorkflowRunsByNameStmt = sqlite.prepare( + "SELECT run_id, workflow, status, ts FROM workflow_runs WHERE status IN ('queued', 'started') AND workflow = ? ORDER BY ts ASC", + ); + + const upsertWorkflowRunTx = sqlite.transaction( + (entry: Omit, run: WorkflowRun) => { + const info = insertStmt.run({ + source: entry.source, + type: entry.type, + refId: entry.refId, + payload: entry.payload, + ts: entry.ts, + }); + upsertWorkflowRunStmt.run({ + runId: run.runId, + workflow: run.workflow, + status: run.status, + ts: run.ts, + }); + return { ...entry, id: Number(info.lastInsertRowid) }; + }, + ); + function append(entry: Omit): LogEntry { const info = insertStmt.run({ source: entry.source, @@ -142,9 +246,44 @@ export function createLogStore(dbPath: string): LogStore { setMetaStmt.run({ key, value }); } + function upsertWorkflowRun(entry: Omit, run: WorkflowRun): LogEntry { + return upsertWorkflowRunTx(entry, run) as LogEntry; + } + + function appendWithWorkflowUpdate(entry: Omit, run: WorkflowRun): LogEntry { + return upsertWorkflowRunTx(entry, run) as LogEntry; + } + + function getWorkflowRun(runId: string): WorkflowRun | null { + const row = getWorkflowRunStmt.get(runId) as + | { run_id: string; workflow: string; status: string; ts: number } + | undefined; + if (row === undefined) return null; + return { + runId: row.run_id, + workflow: row.workflow, + status: validateWorkflowRunStatus(row.status), + ts: row.ts, + }; + } + + function getActiveWorkflowRuns(workflowName?: string): WorkflowRun[] { + const rows = ( + workflowName !== undefined + ? getActiveWorkflowRunsByNameStmt.all(workflowName) + : getActiveWorkflowRunsStmt.all() + ) as Array<{ run_id: string; workflow: string; status: string; ts: number }>; + return rows.map((r) => ({ + runId: r.run_id, + workflow: r.workflow, + status: validateWorkflowRunStatus(r.status), + ts: r.ts, + })); + } + function close(): void { sqlite.close(); } - return { append, query, getMeta, setMeta, close }; + return { append, query, getMeta, setMeta, upsertWorkflowRun, appendWithWorkflowUpdate, getWorkflowRun, getActiveWorkflowRuns, close }; } diff --git a/packages/daemon/src/workflow-manager.ts b/packages/daemon/src/workflow-manager.ts new file mode 100644 index 0000000..a963e0f --- /dev/null +++ b/packages/daemon/src/workflow-manager.ts @@ -0,0 +1,363 @@ +/** + * WorkflowManager — manages per-workflow worker processes, concurrency control, + * and thread lifecycle tracking. + * + * RFC-002 §7.1: one worker process per workflow name, reused while alive. + * Concurrency and overflow (drop/queue) are enforced here in the parent process. + */ + +import { fork } from "node:child_process"; +import type { ChildProcess } from "node:child_process"; +import { dirname, join } from "node:path"; +import { fileURLToPath } from "node:url"; + +import type { NerveConfig, WorkflowConfig } from "@uncaged/nerve-core"; + +import type { ShutdownMessage, StartThreadMessage, ThreadEventMessage } from "./ipc.js"; +import { parseWorkerMessage } from "./ipc.js"; +import type { LogStore } from "./log-store.js"; + +export type WorkflowManager = { + /** Trigger a new workflow thread (called by Reflex scheduler). */ + startWorkflow: (workflowName: string, payload: unknown) => void; + /** Number of currently active (running) threads for a workflow. */ + activeCount: (workflowName: string) => number; + /** Number of pending queued threads waiting to run for a workflow. */ + queueLength: (workflowName: string) => number; + /** Gracefully shut down all workflow workers. */ + stop: () => Promise; +}; + +type PendingThread = { + runId: string; + payload: unknown; +}; + +type WorkflowState = { + active: Set; + queue: PendingThread[]; +}; + +type WorkerEntry = { + workflowName: string; + process: ChildProcess; + stopping: boolean; +}; + +const DEFAULT_MAX_QUEUE = 100; + +function resolveWorkerScript(): string { + const __filename = fileURLToPath(import.meta.url); + const __dir = dirname(__filename); + return join(__dir, "workflow-worker.js"); +} + +function spawnWorkflowWorker( + nerveRoot: string, + workflowName: string, + workerScript: string, +): ChildProcess { + return fork(workerScript, ["--workflow", workflowName, "--root", nerveRoot], { + stdio: ["ignore", "inherit", "inherit", "ipc"], + }); +} + +function sendStartThread(worker: ChildProcess, msg: StartThreadMessage): void { + if (worker.connected === false) return; + try { + worker.send(msg); + } catch { + // IPC channel closed between connected check and send + } +} + +function sendShutdown(worker: ChildProcess, entry: WorkerEntry): void { + entry.stopping = true; + if (worker.connected === false) return; + const msg: ShutdownMessage = { type: "shutdown" }; + try { + worker.send(msg); + } catch { + // IPC channel closed between connected check and send + } +} + +function waitForExit(child: ChildProcess, timeoutMs: number): Promise { + return new Promise((resolve) => { + const timer = setTimeout(() => { + child.kill("SIGKILL"); + resolve(); + }, timeoutMs); + child.once("exit", () => { + clearTimeout(timer); + resolve(); + }); + }); +} + +export function createWorkflowManager( + nerveRoot: string, + config: NerveConfig, + logStore: LogStore, +): WorkflowManager { + const workerScript = resolveWorkerScript(); + + const states = new Map(); + const workers = new Map(); + let stopped = false; + + function getOrCreateState(workflowName: string): WorkflowState { + let state = states.get(workflowName); + if (state === undefined) { + state = { active: new Set(), queue: [] }; + states.set(workflowName, state); + } + return state; + } + + function workflowConfig(workflowName: string): WorkflowConfig | null { + return config.workflows?.[workflowName] ?? null; + } + + function logWorkflowEvent( + workflowName: string, + runId: string, + eventType: string, + payload?: unknown, + ): void { + const ts = Date.now(); + if ( + eventType === "started" || + eventType === "queued" || + eventType === "completed" || + eventType === "failed" || + eventType === "crashed" || + eventType === "dropped" + ) { + const status = + eventType === "dropped" + ? ("dropped" as const) + : eventType === "queued" + ? ("queued" as const) + : eventType === "started" + ? ("started" as const) + : eventType === "completed" + ? ("completed" as const) + : eventType === "failed" + ? ("failed" as const) + : ("crashed" as const); + logStore.upsertWorkflowRun( + { + source: "workflow", + type: eventType, + refId: runId, + payload: payload !== undefined ? JSON.stringify(payload) : null, + ts, + }, + { runId, workflow: workflowName, status, ts }, + ); + } else { + logStore.append({ + source: "workflow", + type: eventType, + refId: runId, + payload: payload !== undefined ? JSON.stringify(payload) : null, + ts, + }); + } + } + + function dispatchThread(workflowName: string, runId: string, payload: unknown): void { + const state = getOrCreateState(workflowName); + state.active.add(runId); + + const worker = getOrSpawnWorker(workflowName); + const msg: StartThreadMessage = { + type: "start-thread", + runId, + workflow: workflowName, + triggerPayload: payload, + }; + sendStartThread(worker.process, msg); + logWorkflowEvent(workflowName, runId, "started"); + } + + function dequeueNext(workflowName: string): void { + const state = states.get(workflowName); + if (state === undefined || state.queue.length === 0) return; + + const wfConfig = workflowConfig(workflowName); + const concurrency = wfConfig?.concurrency ?? 1; + + if (state.active.size < concurrency) { + const next = state.queue.shift(); + if (next !== undefined) { + dispatchThread(workflowName, next.runId, next.payload); + } + } + } + + function handleThreadEvent(workflowName: string, msg: ThreadEventMessage): void { + const state = states.get(workflowName); + if (state === undefined) return; + + if (msg.eventType === "completed" || msg.eventType === "failed") { + state.active.delete(msg.runId); + dequeueNext(workflowName); + } + + if (msg.eventType === "completed" || msg.eventType === "failed") { + logWorkflowEvent(workflowName, msg.runId, msg.eventType, msg.payload); + } + } + + function handleWorkerCrash(workflowName: string): void { + const state = states.get(workflowName); + if (state === undefined) return; + + const crashedCount = state.active.size; + if (crashedCount > 0) { + process.stderr.write( + `[workflow-manager] worker for "${workflowName}" crashed with ${crashedCount} active thread(s)\n`, + ); + } + + // All active threads are now crashed — we can't recover runIds from this + // in-memory state alone (Phase 3 crash recovery will use the DB). + // Reset active set so the manager stays consistent. + state.active.clear(); + workers.delete(workflowName); + } + + function getOrSpawnWorker(workflowName: string): WorkerEntry { + const existing = workers.get(workflowName); + if (existing !== undefined && existing.process.exitCode === null) { + return existing; + } + + const child = spawnWorkflowWorker(nerveRoot, workflowName, workerScript); + + child.on("message", (raw: unknown) => { + const result = parseWorkerMessage(raw); + if (!result.ok) { + process.stderr.write( + `[workflow-manager] invalid message from "${workflowName}" worker: ${result.error.message}\n`, + ); + return; + } + const msg = result.value; + + if (msg.type === "thread-event") { + handleThreadEvent(workflowName, msg); + return; + } + + if (msg.type === "workflow-error") { + process.stderr.write( + `[workflow-manager] workflow-error for runId "${msg.runId}" in "${workflowName}": ${msg.error}\n`, + ); + const state = states.get(workflowName); + if (state !== undefined) { + state.active.delete(msg.runId); + dequeueNext(workflowName); + } + logWorkflowEvent(workflowName, msg.runId, "failed", { error: msg.error }); + return; + } + + if (msg.type === "error") { + process.stderr.write( + `[workflow-manager] error from "${workflowName}" worker: ${msg.error}\n`, + ); + } + }); + + child.on("exit", (code) => { + const entry = workers.get(workflowName); + if (entry?.stopping) { + workers.delete(workflowName); + const state = states.get(workflowName); + if (state !== undefined) { + state.active.clear(); + } + return; + } + process.stderr.write( + `[workflow-manager] worker for "${workflowName}" exited with code ${code ?? "null"}\n`, + ); + handleWorkerCrash(workflowName); + }); + + const entry: WorkerEntry = { workflowName, process: child, stopping: false }; + workers.set(workflowName, entry); + return entry; + } + + function startWorkflow(workflowName: string, payload: unknown): void { + if (stopped) return; + + const wfConfig = workflowConfig(workflowName); + if (wfConfig === null) { + process.stderr.write( + `[workflow-manager] startWorkflow: unknown workflow "${workflowName}"\n`, + ); + return; + } + + const state = getOrCreateState(workflowName); + const runId = crypto.randomUUID(); + + if (state.active.size < wfConfig.concurrency) { + dispatchThread(workflowName, runId, payload); + return; + } + + // At concurrency limit — apply overflow policy + if (wfConfig.overflow === "drop") { + logWorkflowEvent(workflowName, runId, "dropped"); + process.stderr.write( + `[workflow-manager] dropped thread for "${workflowName}" (overflow=drop)\n`, + ); + return; + } + + // overflow === "queue" + const maxQueue = (wfConfig as { maxQueue?: number }).maxQueue ?? DEFAULT_MAX_QUEUE; + if (state.queue.length >= maxQueue) { + const oldest = state.queue.shift(); + if (oldest !== undefined) { + process.stderr.write( + `[workflow-manager] queue overflow for "${workflowName}", dropping oldest runId "${oldest.runId}"\n`, + ); + logWorkflowEvent(workflowName, oldest.runId, "dropped"); + } + } + + state.queue.push({ runId, payload }); + logWorkflowEvent(workflowName, runId, "queued"); + process.stderr.write( + `[workflow-manager] queued thread for "${workflowName}" runId "${runId}" (queue length: ${state.queue.length})\n`, + ); + } + + function activeCount(workflowName: string): number { + return states.get(workflowName)?.active.size ?? 0; + } + + function queueLength(workflowName: string): number { + return states.get(workflowName)?.queue.length ?? 0; + } + + async function stop(): Promise { + stopped = true; + const exitPromises: Promise[] = []; + for (const entry of workers.values()) { + sendShutdown(entry.process, entry); + exitPromises.push(waitForExit(entry.process, 5000)); + } + await Promise.all(exitPromises); + workers.clear(); + } + + return { startWorkflow, activeCount, queueLength, stop }; +} diff --git a/packages/daemon/src/workflow-worker.ts b/packages/daemon/src/workflow-worker.ts new file mode 100644 index 0000000..a531f1d --- /dev/null +++ b/packages/daemon/src/workflow-worker.ts @@ -0,0 +1,244 @@ +/** + * Workflow Worker runtime bootstrap. + * + * Entry point for a forked workflow worker process. + * Receives the workflow name and nerve root via CLI args, dynamically imports + * the user's WorkflowDefinition, then signals ready and enters the IPC event loop. + * + * Layout assumptions (nerve user config at `/`): + * workflows//index.ts (or .js) ← user workflow definition + */ + +import { resolve, join } from "node:path"; +import { existsSync } from "node:fs"; + +import type { + CommandEvent, + ThreadState, + WorkflowContext, + WorkflowDefinition, +} from "@uncaged/nerve-core"; + +import type { WorkerToParentMessage, ThreadEventType } from "./ipc.js"; +import { parseParentMessage } from "./ipc.js"; + +// --------------------------------------------------------------------------- +// IPC helpers +// --------------------------------------------------------------------------- + +function send(msg: WorkerToParentMessage): void { + if (process.send) { + process.send(msg); + } +} + +function sendReady(): void { + send({ type: "ready" }); +} + +function sendThreadEvent(runId: string, eventType: ThreadEventType, payload: unknown): void { + send({ type: "thread-event", runId, eventType, payload }); +} + +function sendWorkflowError(runId: string, error: string): void { + send({ type: "workflow-error", runId, error }); +} + +// --------------------------------------------------------------------------- +// Thread loop (RFC-002 §5.4) +// --------------------------------------------------------------------------- + +async function runThread( + def: WorkflowDefinition, + workflowName: string, + runId: string, + triggerPayload: unknown, +): Promise { + const state: ThreadState = { runId, events: [] }; + const ctx: WorkflowContext = { + runId, + workflowName, + log: (msg) => + sendThreadEvent(runId, "step_complete", { message: msg }), + }; + + let event: CommandEvent = { + type: "thread_start", + triggerPayload: triggerPayload != null && typeof triggerPayload === "object" ? triggerPayload : {}, + }; + + const MAX_STEPS = 1000; + let step = 0; + while (step < MAX_STEPS) { + step++; + state.events.push(event); + const next = def.moderate(state, event); + + if (next === null) { + sendThreadEvent(runId, "completed", null); + return; + } + + const role = def.roles[next.role]; + if (!role) { + sendWorkflowError(runId, `Unknown role: ${next.role}`); + return; + } + + try { + event = await role.execute(next.prompt, ctx); + } catch (e: unknown) { + const errMsg = e instanceof Error ? e.message : String(e); + sendThreadEvent(runId, "failed", { error: errMsg }); + return; + } + } + if (step >= MAX_STEPS) { + sendWorkflowError(runId, `Thread exceeded maximum steps (${MAX_STEPS})`); + } +} + +// --------------------------------------------------------------------------- +// Workflow definition loader +// --------------------------------------------------------------------------- + +async function loadWorkflowDefinition( + nerveRoot: string, + workflowName: string, +): Promise { + const candidates = [ + resolve(join(nerveRoot, "workflows", workflowName, "index.ts")), + resolve(join(nerveRoot, "workflows", workflowName, "index.js")), + ]; + + const indexPath = candidates.find((p) => existsSync(p)); + if (!indexPath) { + throw new Error( + `Workflow definition not found for "${workflowName}". Tried:\n` + + candidates.map((p) => ` ${p}`).join("\n"), + ); + } + + const mod = await import(indexPath); + const def: unknown = mod.default ?? mod; + + if ( + def === null || + typeof def !== "object" || + typeof (def as WorkflowDefinition).moderate !== "function" || + typeof (def as WorkflowDefinition).roles !== "object" + ) { + throw new Error( + `Workflow "${workflowName}" must export a WorkflowDefinition with "roles" and "moderate".`, + ); + } + + return def as WorkflowDefinition; +} + +// --------------------------------------------------------------------------- +// IPC message dispatch +// --------------------------------------------------------------------------- + +function handleMessage( + raw: unknown, + def: WorkflowDefinition, + workflowName: string, + inFlight: Map>, + shuttingDown: { value: boolean }, +): void { + const parseResult = parseParentMessage(raw); + if (!parseResult.ok) { + process.stderr.write(`[workflow-worker] Invalid IPC message: ${parseResult.error.message}\n`); + return; + } + + const msg = parseResult.value; + + if (msg.type === "shutdown") { + shuttingDown.value = true; + const timeout = new Promise(r => setTimeout(r, 10_000)); + Promise.race([Promise.all(inFlight.values()), timeout]) + .then(() => process.exit(0)) + .catch(() => process.exit(1)); + return; + } + + if (msg.type === "start-thread") { + if (shuttingDown.value) return; + const { runId, triggerPayload } = msg; + + const previous = inFlight.get(runId) ?? Promise.resolve(); + const next = previous + .then(() => runThread(def, workflowName, runId, triggerPayload)) + .catch((e: unknown) => { + const errMsg = e instanceof Error ? e.message : String(e); + sendWorkflowError(runId, errMsg); + }) + .finally(() => { + inFlight.delete(runId); + }); + + inFlight.set(runId, next); + return; + } +} + +// --------------------------------------------------------------------------- +// Bootstrap +// --------------------------------------------------------------------------- + +async function bootstrap(nerveRoot: string, workflowName: string): Promise { + let def: WorkflowDefinition; + try { + def = await loadWorkflowDefinition(nerveRoot, workflowName); + } catch (e: unknown) { + const msg = e instanceof Error ? e.message : String(e); + process.stderr.write(`[workflow-worker] Failed to load workflow "${workflowName}": ${msg}\n`); + process.exit(1); + } + + const inFlight = new Map>(); + const shuttingDown = { value: false }; + + sendReady(); + + process.on("message", (raw: unknown) => { + handleMessage(raw, def, workflowName, inFlight, shuttingDown); + }); +} + +// --------------------------------------------------------------------------- +// CLI entrypoint +// --------------------------------------------------------------------------- + +function parseArgs(): { nerveRoot: string; workflow: string } | null { + const args = process.argv.slice(2); + let workflow: string | null = null; + let nerveRoot: string | null = null; + + for (let i = 0; i < args.length; i++) { + if (args[i] === "--workflow" && i + 1 < args.length) { + workflow = args[i + 1]; + i++; + } else if (args[i] === "--root" && i + 1 < args.length) { + nerveRoot = args[i + 1]; + i++; + } + } + + if (!workflow || !nerveRoot) return null; + return { nerveRoot, workflow }; +} + +const parsed = parseArgs(); +if (!parsed) { + process.stderr.write("Usage: workflow-worker --workflow --root \n"); + process.exit(1); +} + +bootstrap(parsed.nerveRoot, parsed.workflow).catch((e) => { + const msg = e instanceof Error ? e.message : String(e); + process.stderr.write(`[workflow-worker] Unhandled bootstrap error: ${msg}\n`); + process.exit(1); +});