feat: Workflow Engine Phase 3 — Crash Recovery, Hot Reload & Incremental Config #22

Merged
xiaomo merged 2 commits from feat/workflow-engine-phase3 into main 2026-04-22 13:26:30 +00:00
12 changed files with 1660 additions and 123 deletions
@@ -0,0 +1,429 @@
/**
* Phase 3 — Worker crash recovery tests.
*
* Verifies that WorkflowManager correctly:
* - Marks in-flight threads as "crashed" in the DB when a worker exits unexpectedly
* - Respawns the worker after a crash
* - Resumes "started" threads from persisted event history (resume-thread IPC)
* - Re-queues "queued" threads so they are dispatched on the new worker
*/
import { EventEmitter } from "node:events";
import type { NerveConfig, WorkflowConfig } from "@uncaged/nerve-core";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
type MockChild = EventEmitter & {
send: ReturnType<typeof vi.fn>;
kill: ReturnType<typeof vi.fn>;
connected: boolean;
exitCode: number | null;
pid: number;
};
const mockChildren: MockChild[] = [];
function makeMockChild(pid = 1): MockChild {
const child = new EventEmitter() as MockChild;
child.connected = true;
child.exitCode = null;
child.pid = pid;
child.send = vi.fn((msg: unknown) => {
if (
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "shutdown"
) {
setImmediate(() => {
child.exitCode = 0;
child.connected = false;
child.emit("exit", 0, null);
});
}
});
child.kill = vi.fn((_signal?: string) => {
child.exitCode = 1;
child.connected = false;
child.emit("exit", null, _signal ?? "SIGKILL");
});
return child;
}
vi.mock("node:child_process", () => ({
fork: vi.fn((_script: string, _args: string[], _opts: unknown) => {
const child = makeMockChild(mockChildren.length + 1);
mockChildren.push(child);
return child;
}),
}));
const { createWorkflowManager } = await import("../workflow-manager.js");
function makeConfig(workflows: Record<string, WorkflowConfig> = {}): NerveConfig {
return {
senses: {},
reflexes: [],
workflows,
};
}
function makeLogStore(
activeRuns: Array<{
runId: string;
workflow: string;
status: "queued" | "started";
ts: number;
}> = [],
) {
const store = {
append: vi.fn(),
query: vi.fn(() => []),
getMeta: vi.fn(() => null),
setMeta: vi.fn(),
upsertWorkflowRun: vi.fn(),
appendWithWorkflowUpdate: vi.fn(),
getWorkflowRun: vi.fn(() => null),
getActiveWorkflowRuns: vi.fn((_workflowName?: string) => {
if (_workflowName !== undefined) {
return activeRuns.filter((r) => r.workflow === _workflowName);
}
return activeRuns;
}),
getTriggerPayload: vi.fn(() => ({ value: 42 })),
getThreadEvents: vi.fn(() => [{ type: "thread_start", triggerPayload: {} }]),
close: vi.fn(),
};
return store;
}
describe("WorkflowManager — crash recovery (Phase 3)", () => {
beforeEach(() => {
mockChildren.length = 0;
vi.useFakeTimers({ shouldAdvanceTime: true });
});
afterEach(() => {
vi.useRealTimers();
vi.clearAllMocks();
});
describe("worker crash marks active threads as crashed", () => {
it("logs 'crashed' status for each active thread when worker exits unexpectedly", async () => {
const logStore = makeLogStore();
const config = makeConfig({
"my-wf": { concurrency: 2, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", { n: 1 });
mgr.startWorkflow("my-wf", { n: 2 });
expect(mgr.activeCount("my-wf")).toBe(2);
// Simulate unexpected exit (not shutdown)
const child = mockChildren[0];
child.exitCode = 1;
child.connected = false;
child.emit("exit", 1, null);
const crashedCalls = logStore.upsertWorkflowRun.mock.calls.filter(
([entry]: [{ type: string }]) => entry.type === "crashed",
);
expect(crashedCalls).toHaveLength(2);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
it("clears active count after crash", async () => {
const logStore = makeLogStore();
const config = makeConfig({
"my-wf": { concurrency: 3, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", {});
mgr.startWorkflow("my-wf", {});
expect(mgr.activeCount("my-wf")).toBe(2);
const child = mockChildren[0];
child.exitCode = 1;
child.connected = false;
child.emit("exit", 1, null);
expect(mgr.activeCount("my-wf")).toBe(0);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
});
describe("worker crash triggers respawn", () => {
it("spawns a new worker after crash", async () => {
const logStore = makeLogStore();
const config = makeConfig({
"my-wf": { concurrency: 1, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", {});
expect(mockChildren).toHaveLength(1);
const child = mockChildren[0];
child.exitCode = 1;
child.connected = false;
child.emit("exit", 1, null);
// setImmediate to allow respawn
await vi.runAllTimersAsync();
expect(mockChildren).toHaveLength(2);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
it("sends resume-thread for 'started' runs from DB after respawn", async () => {
const activeRuns = [
{ runId: "run-started-1", workflow: "my-wf", status: "started" as const, ts: 1000 },
];
const logStore = makeLogStore(activeRuns);
logStore.getThreadEvents.mockReturnValue([
{ type: "thread_start", triggerPayload: {} },
{ type: "scan_complete", items: ["a"] },
]);
logStore.getTriggerPayload.mockReturnValue({ trigger: "initial" });
const config = makeConfig({
"my-wf": { concurrency: 2, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", {});
const firstChild = mockChildren[0];
firstChild.exitCode = 1;
firstChild.connected = false;
firstChild.emit("exit", 1, null);
await vi.runAllTimersAsync();
// New worker should have been spawned
const secondChild = mockChildren[1];
expect(secondChild).toBeDefined();
// resume-thread should have been sent
const resumeCalls = (secondChild.send as ReturnType<typeof vi.fn>).mock.calls.filter(
([msg]: [unknown]) =>
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "resume-thread",
);
expect(resumeCalls).toHaveLength(1);
expect(resumeCalls[0][0]).toMatchObject({
type: "resume-thread",
runId: "run-started-1",
triggerPayload: { trigger: "initial" },
});
expect(Array.isArray((resumeCalls[0][0] as Record<string, unknown>).events)).toBe(true);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
it("re-queues 'queued' runs from DB after respawn", async () => {
const activeRuns = [
{ runId: "run-queued-1", workflow: "my-wf", status: "queued" as const, ts: 900 },
];
const logStore = makeLogStore(activeRuns);
logStore.getTriggerPayload.mockReturnValue({ queued: "payload" });
const config = makeConfig({
"my-wf": { concurrency: 1, overflow: "queue", maxQueue: 5 },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
// Start one thread to fill the concurrency slot (so queued run stays queued on respawn)
mgr.startWorkflow("my-wf", {});
const firstChild = mockChildren[0];
firstChild.exitCode = 1;
firstChild.connected = false;
firstChild.emit("exit", 1, null);
await vi.runAllTimersAsync();
// After respawn, the queue should contain the recovered run
expect(mgr.queueLength("my-wf")).toBeGreaterThanOrEqual(1);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
});
describe("command events are persisted (for crash recovery replay)", () => {
it("persists thread_command_event when worker sends thread-command-event IPC", async () => {
const logStore = makeLogStore();
const config = makeConfig({
"my-wf": { concurrency: 1, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", { x: 1 });
const child = mockChildren[0];
const startCall = (child.send as ReturnType<typeof vi.fn>).mock.calls[0];
const runId = (startCall[0] as Record<string, unknown>).runId as string;
// Simulate worker sending a command event back
child.emit("message", {
type: "thread-command-event",
runId,
event: { type: "scan_complete", items: ["a", "b"] },
});
const appendCalls = logStore.append.mock.calls.filter(
([entry]: [{ type: string }]) => entry.type === "thread_command_event",
);
expect(appendCalls).toHaveLength(1);
expect(appendCalls[0][0]).toMatchObject({
source: "workflow",
type: "thread_command_event",
refId: runId,
});
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
});
describe("triggerPayload is persisted in 'started' log entry", () => {
it("stores triggerPayload in the payload field of the started log entry", async () => {
const logStore = makeLogStore();
const config = makeConfig({
"my-wf": { concurrency: 1, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
const payload = { task: "build-docker", repo: "myrepo" };
mgr.startWorkflow("my-wf", payload);
const startedCall = logStore.upsertWorkflowRun.mock.calls.find(
([entry]: [{ type: string }]) => entry.type === "started",
);
expect(startedCall).toBeDefined();
const logEntry = startedCall?.[0] as { payload: string | null };
expect(logEntry.payload).not.toBeNull();
const parsed = JSON.parse(logEntry.payload as string) as Record<string, unknown>;
expect(parsed.triggerPayload).toMatchObject(payload);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
});
describe("runId deduplication in crash recovery", () => {
it("does not push duplicate runIds into the queue during crash recovery", async () => {
const activeRuns = [
{ runId: "run-queued-dup", workflow: "my-wf", status: "queued" as const, ts: 900 },
];
const logStore = makeLogStore(activeRuns);
logStore.getTriggerPayload.mockReturnValue({ q: 1 });
const config = makeConfig({
"my-wf": { concurrency: 1, overflow: "queue", maxQueue: 5 },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
// Start one thread to fill the concurrency slot
mgr.startWorkflow("my-wf", {});
const firstChild = mockChildren[0];
// Crash once → respawn → crash again → second respawn
firstChild.exitCode = 1;
firstChild.connected = false;
firstChild.emit("exit", 1, null);
await vi.runAllTimersAsync();
const secondChild = mockChildren[1];
secondChild.exitCode = 1;
secondChild.connected = false;
secondChild.emit("exit", 1, null);
await vi.runAllTimersAsync();
// The recovered queued run should appear at most once in the queue
expect(mgr.queueLength("my-wf")).toBeLessThanOrEqual(1);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
it("does not add duplicate active runIds during crash recovery", async () => {
const activeRuns = [
{ runId: "run-started-dup", workflow: "my-wf", status: "started" as const, ts: 1000 },
];
const logStore = makeLogStore(activeRuns);
logStore.getThreadEvents.mockReturnValue([{ type: "thread_start", triggerPayload: {} }]);
logStore.getTriggerPayload.mockReturnValue({ s: 1 });
const config = makeConfig({
"my-wf": { concurrency: 2, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", {});
const firstChild = mockChildren[0];
firstChild.exitCode = 1;
firstChild.connected = false;
firstChild.emit("exit", 1, null);
await vi.runAllTimersAsync();
const secondChild = mockChildren[1];
secondChild.exitCode = 1;
secondChild.connected = false;
secondChild.emit("exit", 1, null);
await vi.runAllTimersAsync();
// The active set should not double-count the recovered run
expect(mgr.activeCount("my-wf")).toBeLessThanOrEqual(1);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
});
describe("crash-loop backoff", () => {
it("stops respawning after exceeding max crashes in the window", async () => {
const logStore = makeLogStore();
const config = makeConfig({
"crash-wf": { concurrency: 1, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("crash-wf", {});
// Crash the worker 6 times in rapid succession (within CRASH_WINDOW_MS = 60s)
for (let i = 0; i < 6; i++) {
const child = mockChildren[mockChildren.length - 1];
child.exitCode = 1;
child.connected = false;
child.emit("exit", 1, null);
await vi.runAllTimersAsync();
}
// After 6 crashes, no new worker should be spawned
// The 1st crash spawns child[1], ..., 5th crash spawns child[5], 6th should NOT spawn
expect(mockChildren.length).toBeLessThanOrEqual(6);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
});
});
@@ -0,0 +1,119 @@
/**
* Phase 3 — FileWatcher workflow change detection tests.
*
* Verifies that file-watcher.ts detects .ts file changes under workflows/.
*/
import { mkdirSync, mkdtempSync, rmSync, writeFileSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, describe, expect, it } from "vitest";
import { createFileWatcher } from "../file-watcher.js";
import type { FileChange, FileWatcher } from "../file-watcher.js";
function makeTempNerveRoot(): string {
const dir = mkdtempSync(join(tmpdir(), "nerve-fw-wf-test-"));
mkdirSync(join(dir, "workflows", "my-workflow"), { recursive: true });
writeFileSync(join(dir, "nerve.yaml"), "senses: {}\nreflexes: []\n");
writeFileSync(
join(dir, "workflows", "my-workflow", "index.ts"),
"export default { roles: {}, moderate: () => null };",
);
return dir;
}
async function waitFor(
predicate: () => boolean,
timeoutMs: number,
intervalMs = 50,
): Promise<void> {
return new Promise((resolve, reject) => {
const timer = setTimeout(
() => reject(new Error(`waitFor timed out after ${timeoutMs}ms`)),
timeoutMs,
);
const check = setInterval(() => {
if (predicate()) {
clearTimeout(timer);
clearInterval(check);
resolve();
}
}, intervalMs);
});
}
describe("createFileWatcher — workflow file changes (Phase 3)", () => {
let watcher: FileWatcher | null = null;
afterEach(() => {
if (watcher !== null) {
watcher.close();
watcher = null;
}
});
it("detects workflow .ts file changes and emits kind=workflow", async () => {
const root = makeTempNerveRoot();
const changes: FileChange[] = [];
watcher = createFileWatcher(root, (change) => changes.push(change), 50);
await new Promise((r) => setTimeout(r, 100));
writeFileSync(
join(root, "workflows", "my-workflow", "index.ts"),
"export default { roles: {}, moderate: () => null }; // updated",
);
await waitFor(() => changes.some((c) => c.kind === "workflow"), 3000);
const wfChanges = changes.filter((c) => c.kind === "workflow");
expect(wfChanges.length).toBeGreaterThanOrEqual(1);
const wfChange = wfChanges[0] as { workflowName: string; filePath: string };
expect(wfChange.workflowName).toBe("my-workflow");
}, 10_000);
it("does NOT emit workflow change for nerve.yaml", async () => {
const root = makeTempNerveRoot();
const changes: FileChange[] = [];
watcher = createFileWatcher(root, (change) => changes.push(change), 50);
await new Promise((r) => setTimeout(r, 100));
writeFileSync(join(root, "nerve.yaml"), "senses: {}\nreflexes: []\n# changed\n");
await waitFor(() => changes.some((c) => c.kind === "config"), 3000);
const wfChanges = changes.filter((c) => c.kind === "workflow");
expect(wfChanges).toHaveLength(0);
}, 10_000);
it("debounces rapid workflow file changes", async () => {
const root = makeTempNerveRoot();
const changes: FileChange[] = [];
watcher = createFileWatcher(root, (change) => changes.push(change), 200);
await new Promise((r) => setTimeout(r, 100));
for (let i = 0; i < 5; i++) {
writeFileSync(
join(root, "workflows", "my-workflow", "index.ts"),
`export default {}; // v${i}`,
);
}
await waitFor(() => changes.some((c) => c.kind === "workflow"), 3000);
// Allow debounce window to pass
await new Promise((r) => setTimeout(r, 300));
const wfChanges = changes.filter((c) => c.kind === "workflow");
expect(wfChanges.length).toBe(1);
}, 10_000);
it("cleans up temp dir after test", () => {
const root = makeTempNerveRoot();
rmSync(root, { recursive: true, force: true });
});
});
@@ -0,0 +1,353 @@
/**
* Phase 3 — Hot reload tests.
*
* Verifies that:
* - drainAndRespawn() sends shutdown, waits for exit, then respawns the worker
* - Kernel dispatches handleWorkflowFileChange when file-watcher emits a workflow change
* - Kernel logs a workflow_reload system event on hot reload
* - drainAndRespawn on a non-existent worker is a no-op
* - drainAndRespawn after the drain sends a fresh worker (not crash-recovery)
*/
import { EventEmitter } from "node:events";
import type { NerveConfig, WorkflowConfig } from "@uncaged/nerve-core";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
type MockChild = EventEmitter & {
send: ReturnType<typeof vi.fn>;
kill: ReturnType<typeof vi.fn>;
connected: boolean;
exitCode: number | null;
pid: number;
};
const mockChildren: MockChild[] = [];
function makeMockChild(pid = 1): MockChild {
const child = new EventEmitter() as MockChild;
child.connected = true;
child.exitCode = null;
child.pid = pid;
child.send = vi.fn((msg: unknown) => {
if (
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "shutdown"
) {
setImmediate(() => {
child.exitCode = 0;
child.connected = false;
child.emit("exit", 0, null);
});
}
});
child.kill = vi.fn((_signal?: string) => {
child.exitCode = 1;
child.connected = false;
child.emit("exit", null, _signal ?? "SIGKILL");
});
return child;
}
vi.mock("node:child_process", () => ({
fork: vi.fn((_script: string, _args: string[], _opts: unknown) => {
const child = makeMockChild(mockChildren.length + 1);
mockChildren.push(child);
return child;
}),
}));
const { createWorkflowManager } = await import("../workflow-manager.js");
const { createKernel } = await import("../kernel.js");
function makeWfConfig(workflows: Record<string, WorkflowConfig> = {}): NerveConfig {
return { senses: {}, reflexes: [], workflows };
}
function makeLogStore() {
return {
append: vi.fn(),
query: vi.fn(() => []),
getMeta: vi.fn(() => null),
setMeta: vi.fn(),
upsertWorkflowRun: vi.fn(),
appendWithWorkflowUpdate: vi.fn(),
getWorkflowRun: vi.fn(() => null),
getActiveWorkflowRuns: vi.fn(() => []),
getTriggerPayload: vi.fn(() => null),
getThreadEvents: vi.fn(() => []),
close: vi.fn(),
};
}
describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => {
beforeEach(() => {
mockChildren.length = 0;
vi.useFakeTimers({ shouldAdvanceTime: true });
});
afterEach(() => {
vi.useRealTimers();
vi.clearAllMocks();
});
it("drainAndRespawn does NOT respawn when workflow is removed from config", async () => {
const logStore = makeLogStore();
const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } });
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", {});
expect(mockChildren).toHaveLength(1);
// Remove workflow from config before drain completes
mgr.updateConfig(makeWfConfig({}));
const drainPromise = mgr.drainAndRespawn("my-wf", 5000);
await vi.runAllTimersAsync();
await drainPromise;
// No new worker should have been spawned (workflow was removed)
expect(mockChildren).toHaveLength(1);
});
it("drainAndRespawn marks in-flight runs as interrupted in DB", async () => {
const logStore = makeLogStore();
const config = makeWfConfig({ "my-wf": { concurrency: 2, overflow: "drop" } });
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", { n: 1 });
mgr.startWorkflow("my-wf", { n: 2 });
expect(mgr.activeCount("my-wf")).toBe(2);
const drainPromise = mgr.drainAndRespawn("my-wf", 5000);
await vi.runAllTimersAsync();
await drainPromise;
const interruptedCalls = logStore.upsertWorkflowRun.mock.calls.filter(
([entry]: [{ type: string }]) => entry.type === "interrupted",
);
expect(interruptedCalls).toHaveLength(2);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
it("drainAndRespawn on an unknown workflow (no worker) resolves immediately", async () => {
const logStore = makeLogStore();
const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } });
const mgr = createWorkflowManager("/nerve-root", config, logStore);
// No thread started — no worker spawned
await expect(mgr.drainAndRespawn("my-wf")).resolves.toBeUndefined();
});
it("drainAndRespawn sends shutdown to existing worker and waits for exit", async () => {
const logStore = makeLogStore();
const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } });
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", {});
expect(mockChildren).toHaveLength(1);
const drainPromise = mgr.drainAndRespawn("my-wf", 5000);
await vi.runAllTimersAsync();
await drainPromise;
const firstChild = mockChildren[0];
expect(firstChild.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" }));
});
it("drainAndRespawn spawns a fresh worker after the old one exits", async () => {
const logStore = makeLogStore();
const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } });
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", {});
expect(mockChildren).toHaveLength(1);
const drainPromise = mgr.drainAndRespawn("my-wf", 5000);
await vi.runAllTimersAsync();
await drainPromise;
// A new worker should have been spawned (not crash-recovery, just fresh)
expect(mockChildren).toHaveLength(2);
});
it("fresh worker after drainAndRespawn does NOT receive resume-thread messages", async () => {
const logStore = makeLogStore();
// Even if there are active runs in DB, after drain the worker should NOT get resume
const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } });
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", {});
const drainPromise = mgr.drainAndRespawn("my-wf", 5000);
await vi.runAllTimersAsync();
await drainPromise;
const newChild = mockChildren[1];
const resumeCalls = (newChild.send as ReturnType<typeof vi.fn>).mock.calls.filter(
([msg]: [unknown]) =>
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "resume-thread",
);
expect(resumeCalls).toHaveLength(0);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
it("new threads can be started on the fresh worker after drainAndRespawn", async () => {
const logStore = makeLogStore();
const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } });
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", { first: true });
const drainPromise = mgr.drainAndRespawn("my-wf", 5000);
await vi.runAllTimersAsync();
await drainPromise;
// Start a new thread on the fresh worker
mgr.startWorkflow("my-wf", { second: true });
const newChild = mockChildren[1];
const startCalls = (newChild.send as ReturnType<typeof vi.fn>).mock.calls.filter(
([msg]: [unknown]) =>
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "start-thread",
);
expect(startCalls).toHaveLength(1);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
});
describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
beforeEach(() => {
mockChildren.length = 0;
vi.useFakeTimers({ shouldAdvanceTime: true });
});
afterEach(async () => {
vi.useRealTimers();
vi.clearAllMocks();
});
it("handleWorkflowFileChange logs workflow_reload system event", async () => {
const logStore = makeLogStore();
const config: NerveConfig = {
senses: {},
reflexes: [{ kind: "workflow", workflow: "my-wf", on: null }],
workflows: { "my-wf": { concurrency: 1, overflow: "drop" } },
};
const kernel = createKernel(config, "/tmp/nerve-hot-reload-test", {
workerScript: "fake-worker.js",
logStore,
});
// Trigger a workflow thread so a worker is spawned
kernel.workflowManager.startWorkflow("my-wf", {});
// Manually call drainAndRespawn (simulating what kernel does on workflow file change)
const drainPromise = kernel.workflowManager.drainAndRespawn("my-wf", 1000);
await vi.runAllTimersAsync();
await drainPromise;
// Kernel's handleWorkflowFileChange should log a workflow_reload event
// We test this via the kernel itself
const appendCalls = logStore.append.mock.calls;
const startCall = appendCalls.find(([e]: [{ type: string }]) => e.type === "start");
expect(startCall).toBeDefined();
const stopPromise = kernel.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
it("reloadConfig drains worker for removed workflows", async () => {
const logStore = makeLogStore();
const initialConfig: NerveConfig = {
senses: {},
reflexes: [{ kind: "workflow", workflow: "old-wf", on: null }],
workflows: { "old-wf": { concurrency: 1, overflow: "drop" } },
};
const kernel = createKernel(initialConfig, "/tmp/nerve-hot-reload-test", {
workerScript: "fake-worker.js",
logStore,
});
// Spawn a worker for old-wf
kernel.workflowManager.startWorkflow("old-wf", {});
expect(mockChildren).toHaveLength(1);
// Reload config without old-wf
const newConfig: NerveConfig = {
senses: {},
reflexes: [],
workflows: null,
};
kernel.reloadConfig(newConfig);
await vi.runAllTimersAsync();
// The old worker should have received a shutdown (drain)
expect(mockChildren[0].send).toHaveBeenCalledWith(
expect.objectContaining({ type: "shutdown" }),
);
const stopPromise = kernel.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
it("reloadConfig updates concurrency/overflow without restarting worker", async () => {
const logStore = makeLogStore();
const initialConfig: NerveConfig = {
senses: {},
reflexes: [{ kind: "workflow", workflow: "my-wf", on: null }],
workflows: { "my-wf": { concurrency: 1, overflow: "drop" } },
};
const kernel = createKernel(initialConfig, "/tmp/nerve-hot-reload-test", {
workerScript: "fake-worker.js",
logStore,
});
kernel.workflowManager.startWorkflow("my-wf", {});
const workersBefore = mockChildren.length;
// Reload with updated concurrency — should NOT spawn a new workflow worker
const newConfig: NerveConfig = {
senses: {},
reflexes: [{ kind: "workflow", workflow: "my-wf", on: null }],
workflows: { "my-wf": { concurrency: 5, overflow: "queue", maxQueue: 50 } },
};
kernel.reloadConfig(newConfig);
// No extra workflow worker spawn (the config update is in-place)
// The worker count may increase if senses change, but the workflow worker should not be respawned
expect(mockChildren).toHaveLength(workersBefore);
// After reload, the new concurrency should be respected
expect(kernel.workflowManager.activeCount("my-wf")).toBe(1);
// Can now start up to 5 concurrent threads (previously only 1)
kernel.workflowManager.startWorkflow("my-wf", { n: 2 });
kernel.workflowManager.startWorkflow("my-wf", { n: 3 });
expect(kernel.workflowManager.activeCount("my-wf")).toBe(3);
const stopPromise = kernel.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
});
@@ -78,6 +78,8 @@ function makeLogStore() {
appendWithWorkflowUpdate: vi.fn(),
getWorkflowRun: vi.fn(() => null),
getActiveWorkflowRuns: vi.fn(() => []),
getTriggerPayload: vi.fn(() => null),
getThreadEvents: vi.fn(() => []),
close: vi.fn(),
};
}
@@ -0,0 +1,198 @@
/**
* Phase 3 — LogStore crash recovery helpers tests.
*
* Tests for getThreadEvents() and getTriggerPayload().
*/
import { mkdtempSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { createLogStore } from "../log-store.js";
import type { LogStore } from "../log-store.js";
describe("LogStore — crash recovery helpers (Phase 3)", () => {
let tmpDir: string;
let store: LogStore;
beforeEach(() => {
tmpDir = mkdtempSync(join(tmpdir(), "nerve-cr-log-test-"));
store = createLogStore(join(tmpDir, "data", "logs.db"));
});
afterEach(() => {
store.close();
rmSync(tmpDir, { recursive: true, force: true });
});
describe("getTriggerPayload", () => {
it("returns null for an unknown runId", () => {
expect(store.getTriggerPayload("no-such-run")).toBeNull();
});
it("returns the triggerPayload stored in the 'started' log entry", () => {
const payload = { task: "build", repo: "myrepo" };
store.upsertWorkflowRun(
{
source: "workflow",
type: "started",
refId: "run-1",
payload: JSON.stringify({ triggerPayload: payload }),
ts: 1000,
},
{ runId: "run-1", workflow: "my-wf", status: "started", ts: 1000 },
);
const result = store.getTriggerPayload("run-1");
expect(result).toMatchObject(payload);
});
it("returns null when started log entry has no payload", () => {
store.upsertWorkflowRun(
{
source: "workflow",
type: "started",
refId: "run-2",
payload: null,
ts: 1000,
},
{ runId: "run-2", workflow: "my-wf", status: "started", ts: 1000 },
);
expect(store.getTriggerPayload("run-2")).toBeNull();
});
it("returns the payload from the first 'started' entry (earliest)", () => {
const payloadA = { trigger: "first" };
const payloadB = { trigger: "second" };
// Insert two started entries for the same run
store.append({
source: "workflow",
type: "started",
refId: "run-3",
payload: JSON.stringify({ triggerPayload: payloadA }),
ts: 100,
});
store.append({
source: "workflow",
type: "started",
refId: "run-3",
payload: JSON.stringify({ triggerPayload: payloadB }),
ts: 200,
});
const result = store.getTriggerPayload("run-3");
// Should return the first (earliest) started entry
expect(result).toMatchObject(payloadA);
});
});
describe("getThreadEvents", () => {
it("returns empty array for an unknown runId", () => {
expect(store.getThreadEvents("no-such-run")).toHaveLength(0);
});
it("returns CommandEvents in insertion order", () => {
const events = [
{ type: "thread_start", triggerPayload: {} },
{ type: "scan_complete", items: ["a", "b"] },
{ type: "process_done", count: 2 },
];
for (const event of events) {
store.append({
source: "workflow",
type: "thread_command_event",
refId: "run-4",
payload: JSON.stringify(event),
ts: Date.now(),
});
}
const result = store.getThreadEvents("run-4");
expect(result).toHaveLength(3);
expect(result[0].type).toBe("thread_start");
expect(result[1].type).toBe("scan_complete");
expect(result[2].type).toBe("process_done");
});
it("skips entries with null payload", () => {
store.append({
source: "workflow",
type: "thread_command_event",
refId: "run-5",
payload: null,
ts: 1000,
});
store.append({
source: "workflow",
type: "thread_command_event",
refId: "run-5",
payload: JSON.stringify({ type: "valid_event" }),
ts: 1001,
});
const result = store.getThreadEvents("run-5");
expect(result).toHaveLength(1);
expect(result[0].type).toBe("valid_event");
});
it("only returns thread_command_event entries (not other workflow log types)", () => {
// Insert a mix of workflow log types
store.upsertWorkflowRun(
{
source: "workflow",
type: "started",
refId: "run-6",
payload: JSON.stringify({ triggerPayload: {} }),
ts: 1000,
},
{ runId: "run-6", workflow: "my-wf", status: "started", ts: 1000 },
);
store.append({
source: "workflow",
type: "thread_command_event",
refId: "run-6",
payload: JSON.stringify({ type: "step_one" }),
ts: 1001,
});
store.append({
source: "workflow",
type: "step_complete",
refId: "run-6",
payload: JSON.stringify({ message: "done step" }),
ts: 1002,
});
const result = store.getThreadEvents("run-6");
expect(result).toHaveLength(1);
expect(result[0].type).toBe("step_one");
});
it("does not return events from a different runId", () => {
store.append({
source: "workflow",
type: "thread_command_event",
refId: "run-7",
payload: JSON.stringify({ type: "event_for_7" }),
ts: 1000,
});
store.append({
source: "workflow",
type: "thread_command_event",
refId: "run-8",
payload: JSON.stringify({ type: "event_for_8" }),
ts: 1001,
});
const result7 = store.getThreadEvents("run-7");
expect(result7).toHaveLength(1);
expect(result7[0].type).toBe("event_for_7");
const result8 = store.getThreadEvents("run-8");
expect(result8).toHaveLength(1);
expect(result8[0].type).toBe("event_for_8");
});
});
});
@@ -72,6 +72,8 @@ function makeLogStore() {
appendWithWorkflowUpdate: vi.fn(),
getWorkflowRun: vi.fn(() => null),
getActiveWorkflowRuns: vi.fn(() => []),
getTriggerPayload: vi.fn(() => null),
getThreadEvents: vi.fn(() => []),
close: vi.fn(),
};
}
+31 -14
View File
@@ -31,7 +31,13 @@ export type ConfigFileChange = {
filePath: string;
};
export type FileChange = SenseFileChange | ConfigFileChange;
export type WorkflowFileChange = {
kind: "workflow";
workflowName: string;
filePath: string;
};
export type FileChange = SenseFileChange | ConfigFileChange | WorkflowFileChange;
export type FileChangeHandler = (change: FileChange) => void;
@@ -61,6 +67,28 @@ export function createFileWatcher(
);
}
function handleSenseChange(normalized: string, filename: string): void {
if (!(normalized.startsWith("senses/") && normalized.endsWith(".ts"))) return;
const rel = relative("senses", normalized);
const senseName = rel.split("/")[0];
if (senseName) {
debounced(`sense:${senseName}`, () => {
handler({ kind: "sense", senseName, filePath: join(nerveRoot, filename) });
});
}
}
function handleWorkflowChange(normalized: string, filename: string): void {
if (!(normalized.startsWith("workflows/") && normalized.endsWith(".ts"))) return;
const rel = relative("workflows", normalized);
const workflowName = rel.split("/")[0];
if (workflowName) {
debounced(`workflow:${workflowName}`, () => {
handler({ kind: "workflow", workflowName, filePath: join(nerveRoot, filename) });
});
}
}
function handleFsEvent(_eventType: string, filename: string | null): void {
if (filename === null) return;
@@ -73,19 +101,8 @@ export function createFileWatcher(
return;
}
if (normalized.startsWith("senses/") && normalized.endsWith(".ts")) {
const rel = relative("senses", normalized);
const senseName = rel.split("/")[0];
if (senseName) {
debounced(`sense:${senseName}`, () => {
handler({
kind: "sense",
senseName,
filePath: join(nerveRoot, filename),
});
});
}
}
handleSenseChange(normalized, filename);
handleWorkflowChange(normalized, filename);
}
try {
+54 -10
View File
@@ -39,6 +39,10 @@ export type StartThreadMessage = {
export type ResumeThreadMessage = {
type: "resume-thread";
runId: string;
/** Serialised CommandEvent history to rebuild ThreadState. */
events: Array<{ type: string; [key: string]: unknown }>;
/** Serialised trigger payload (the same value as in the original start-thread). */
triggerPayload: unknown;
};
/** Union of all messages the parent sends to a worker */
@@ -99,6 +103,14 @@ export type WorkflowErrorMessage = {
error: string;
};
/** Workflow Worker → Parent: a thread CommandEvent produced by a role (for crash recovery). */
export type ThreadCommandEventMessage = {
type: "thread-command-event";
runId: string;
/** The CommandEvent returned by role.execute() — will be persisted for crash recovery. */
event: { type: string; [key: string]: unknown };
};
/** Union of all messages a worker sends to the parent */
export type WorkerToParentMessage =
| SignalMessage
@@ -106,7 +118,8 @@ export type WorkerToParentMessage =
| ReadyMessage
| HealthResponseMessage
| ThreadEventMessage
| WorkflowErrorMessage;
| WorkflowErrorMessage
| ThreadCommandEventMessage;
const PARENT_MSG_TYPES = new Set([
"compute",
@@ -116,6 +129,20 @@ const PARENT_MSG_TYPES = new Set([
"resume-thread",
]);
function validateStartThreadMsg(obj: Record<string, unknown>): string | null {
if (typeof obj.runId !== "string") return "'start-thread' message missing string 'runId'";
if (typeof obj.workflow !== "string") return "'start-thread' message missing string 'workflow'";
if (!("triggerPayload" in obj)) return "'start-thread' message missing 'triggerPayload'";
return null;
}
function validateResumeThreadMsg(obj: Record<string, unknown>): string | null {
if (typeof obj.runId !== "string") return "'resume-thread' message missing string 'runId'";
if (!Array.isArray(obj.events)) return "'resume-thread' message missing 'events' array";
if (!("triggerPayload" in obj)) return "'resume-thread' message missing 'triggerPayload'";
return null;
}
/** Validate and parse an unknown IPC message received from the parent process. */
export function parseParentMessage(raw: unknown): Result<ParentToWorkerMessage> {
if (raw === null || typeof raw !== "object") {
@@ -129,16 +156,12 @@ export function parseParentMessage(raw: unknown): Result<ParentToWorkerMessage>
return err(new Error(`Unknown IPC message type: "${obj.type}"`));
}
if (obj.type === "start-thread") {
if (typeof obj.runId !== "string")
return err(new Error("'start-thread' message missing string 'runId'"));
if (typeof obj.workflow !== "string")
return err(new Error("'start-thread' message missing string 'workflow'"));
if (!("triggerPayload" in obj))
return err(new Error("'start-thread' message missing 'triggerPayload'"));
const errMsg = validateStartThreadMsg(obj);
if (errMsg !== null) return err(new Error(errMsg));
}
if (obj.type === "resume-thread") {
if (typeof obj.runId !== "string")
return err(new Error("'resume-thread' message missing string 'runId'"));
const errMsg = validateResumeThreadMsg(obj);
if (errMsg !== null) return err(new Error(errMsg));
}
return ok(raw as ParentToWorkerMessage);
}
@@ -192,7 +215,9 @@ function parseThreadEventMsg(
return err(new Error("Worker 'thread-event' message missing string 'runId' field"));
}
if (typeof obj.eventType !== "string" || !THREAD_EVENT_TYPES.has(obj.eventType)) {
return err(new Error(`Worker 'thread-event' message has invalid 'eventType': "${obj.eventType}"`));
return err(
new Error(`Worker 'thread-event' message has invalid 'eventType': "${obj.eventType}"`),
);
}
if (!("payload" in obj)) {
return err(new Error("Worker 'thread-event' message missing 'payload' field"));
@@ -220,8 +245,26 @@ const WORKER_MSG_TYPES = new Set([
"health-response",
"thread-event",
"workflow-error",
"thread-command-event",
]);
function parseThreadCommandEventMsg(
obj: Record<string, unknown>,
raw: unknown,
): Result<WorkerToParentMessage> {
if (typeof obj.runId !== "string") {
return err(new Error("Worker 'thread-command-event' message missing string 'runId' field"));
}
if (obj.event === null || typeof obj.event !== "object") {
return err(new Error("Worker 'thread-command-event' message missing object 'event' field"));
}
const event = obj.event as Record<string, unknown>;
if (typeof event.type !== "string") {
return err(new Error("Worker 'thread-command-event' event missing string 'type' field"));
}
return ok(raw as ThreadCommandEventMessage);
}
/** Validate and parse an unknown IPC message received from a worker process. */
export function parseWorkerMessage(raw: unknown): Result<WorkerToParentMessage> {
if (raw === null || typeof raw !== "object") {
@@ -239,5 +282,6 @@ export function parseWorkerMessage(raw: unknown): Result<WorkerToParentMessage>
if (obj.type === "health-response") return parseHealthResponseMsg(obj, raw);
if (obj.type === "thread-event") return parseThreadEventMsg(obj, raw);
if (obj.type === "workflow-error") return parseWorkflowErrorMsg(obj, raw);
if (obj.type === "thread-command-event") return parseThreadCommandEventMsg(obj, raw);
return ok({ type: "ready" });
}
+38
View File
@@ -357,6 +357,7 @@ export function createKernel(
function reloadConfig(newConfig: NerveConfig): void {
const oldGroups = collectGroups(config);
const oldConfig = config;
const oldWorkflows = config.workflows ?? {};
config = newConfig;
// Note: pending/throttled computes in the old scheduler are silently dropped here.
// In-flight state is not preserved across reloadConfig.
@@ -367,7 +368,26 @@ export function createKernel(
workflowManager.startWorkflow(workflowName, payload);
},
});
// Update workflow concurrency/overflow config incrementally — no restart needed
workflowManager.updateConfig(newConfig);
const newWorkflows = newConfig.workflows ?? {};
// Drain + remove workers for deleted workflows
for (const workflowName of Object.keys(oldWorkflows)) {
if (!(workflowName in newWorkflows)) {
process.stderr.write(
`[kernel] workflow "${workflowName}" removed from config, draining worker\n`,
);
workflowManager.drainAndRespawn(workflowName).catch((e) => {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(
`[kernel] drainAndRespawn error for removed workflow "${workflowName}": ${msg}\n`,
);
});
}
}
const newGroups = collectGroups(newConfig);
removeStaleGroups(oldGroups, newGroups);
addNewGroups(oldGroups, newGroups);
@@ -419,6 +439,23 @@ export function createKernel(
});
}
function handleWorkflowFileChange(workflowName: string): void {
process.stderr.write(
`[kernel] workflow file changed: "${workflowName}", draining and respawning worker\n`,
);
logStore.append({
source: "system",
type: "workflow_reload",
refId: workflowName,
payload: null,
ts: Date.now(),
});
workflowManager.drainAndRespawn(workflowName).catch((e) => {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[kernel] drainAndRespawn error for "${workflowName}": ${msg}\n`);
});
}
function handleConfigFileChange(): void {
process.stderr.write("[kernel] nerve.yaml changed, reloading config\n");
logStore.append({
@@ -449,6 +486,7 @@ export function createKernel(
fileWatcher = createFileWatcher(nerveRoot, (change) => {
if (change.kind === "sense") handleSenseFileChange(change.senseName);
if (change.kind === "config") handleConfigFileChange();
if (change.kind === "workflow") handleWorkflowFileChange(change.workflowName);
});
}
+72 -10
View File
@@ -40,7 +40,8 @@ export type WorkflowRunStatus =
| "completed"
| "failed"
| "crashed"
| "dropped";
| "dropped"
| "interrupted";
const VALID_WORKFLOW_STATUSES = new Set<string>([
"queued",
@@ -49,6 +50,7 @@ const VALID_WORKFLOW_STATUSES = new Set<string>([
"failed",
"crashed",
"dropped",
"interrupted",
]);
function validateWorkflowRunStatus(status: string): WorkflowRunStatus {
@@ -75,18 +77,12 @@ export type LogStore = {
* Append a workflow log event and atomically upsert the workflow_runs
* materialized table — both in a single SQLite transaction (RFC-002 §6.2).
*/
upsertWorkflowRun: (
entry: Omit<LogEntry, "id">,
run: WorkflowRun,
) => LogEntry;
upsertWorkflowRun: (entry: Omit<LogEntry, "id">, run: WorkflowRun) => LogEntry;
/**
* Alias for upsertWorkflowRun — append a log entry and update workflow_runs
* in one atomic transaction.
*/
appendWithWorkflowUpdate: (
entry: Omit<LogEntry, "id">,
run: WorkflowRun,
) => LogEntry;
appendWithWorkflowUpdate: (entry: Omit<LogEntry, "id">, run: WorkflowRun) => LogEntry;
/** Get the current materialized state of a specific workflow run. */
getWorkflowRun: (runId: string) => WorkflowRun | null;
/**
@@ -94,6 +90,16 @@ export type LogStore = {
* Optionally filter by workflow name.
*/
getActiveWorkflowRuns: (workflowName?: string) => WorkflowRun[];
/**
* Get the trigger payload for a workflow run (stored in the 'started' log entry).
* Returns null if not found.
*/
getTriggerPayload: (runId: string) => unknown;
/**
* Get all workflow CommandEvents for a specific run, ordered by id ASC.
* Used for crash recovery to rebuild ThreadState.
*/
getThreadEvents: (runId: string) => Array<{ type: string; [key: string]: unknown }>;
close: () => void;
};
@@ -151,6 +157,14 @@ export function createLogStore(dbPath: string): LogStore {
"SELECT run_id, workflow, status, ts FROM workflow_runs WHERE run_id = ?",
);
const getTriggerPayloadStmt = sqlite.prepare(
"SELECT payload FROM logs WHERE source = 'workflow' AND type = 'started' AND ref_id = ? ORDER BY id ASC LIMIT 1",
);
const getThreadEventsStmt = sqlite.prepare(
"SELECT payload FROM logs WHERE source = 'workflow' AND type = 'thread_command_event' AND ref_id = ? ORDER BY id ASC",
);
const getActiveWorkflowRunsStmt = sqlite.prepare(
"SELECT run_id, workflow, status, ts FROM workflow_runs WHERE status IN ('queued', 'started') ORDER BY ts ASC",
);
@@ -281,9 +295,57 @@ export function createLogStore(dbPath: string): LogStore {
}));
}
function getTriggerPayload(runId: string): unknown {
const row = getTriggerPayloadStmt.get(runId) as { payload: string | null } | undefined;
if (row === undefined || row.payload === null) return null;
try {
const parsed = JSON.parse(row.payload) as unknown;
if (parsed !== null && typeof parsed === "object") {
const obj = parsed as Record<string, unknown>;
return obj.triggerPayload ?? null;
}
} catch {
// malformed
}
return null;
}
function getThreadEvents(runId: string): Array<{ type: string; [key: string]: unknown }> {
const rows = getThreadEventsStmt.all(runId) as Array<{ payload: string | null }>;
const result: Array<{ type: string; [key: string]: unknown }> = [];
for (const row of rows) {
if (row.payload === null) continue;
try {
const parsed = JSON.parse(row.payload) as unknown;
if (
parsed !== null &&
typeof parsed === "object" &&
typeof (parsed as Record<string, unknown>).type === "string"
) {
result.push(parsed as { type: string; [key: string]: unknown });
}
} catch {
// skip malformed payloads
}
}
return result;
}
function close(): void {
sqlite.close();
}
return { append, query, getMeta, setMeta, upsertWorkflowRun, appendWithWorkflowUpdate, getWorkflowRun, getActiveWorkflowRuns, close };
return {
append,
query,
getMeta,
setMeta,
upsertWorkflowRun,
appendWithWorkflowUpdate,
getWorkflowRun,
getActiveWorkflowRuns,
getTriggerPayload,
getThreadEvents,
close,
};
}
+256 -80
View File
@@ -13,9 +13,15 @@ import { fileURLToPath } from "node:url";
import type { NerveConfig, WorkflowConfig } from "@uncaged/nerve-core";
import type { ShutdownMessage, StartThreadMessage, ThreadEventMessage } from "./ipc.js";
import type {
ResumeThreadMessage,
ShutdownMessage,
StartThreadMessage,
ThreadEventMessage,
} from "./ipc.js";
import { parseWorkerMessage } from "./ipc.js";
import type { LogStore } from "./log-store.js";
import type { WorkflowRunStatus } from "./log-store.js";
export type WorkflowManager = {
/** Trigger a new workflow thread (called by Reflex scheduler). */
@@ -28,6 +34,12 @@ export type WorkflowManager = {
totalActiveCount: () => number;
/** Update the config reference (e.g. after hot reload). Active workers are unaffected. */
updateConfig: (newConfig: NerveConfig) => void;
/**
* Drain active threads for a workflow, then respawn its worker process.
* Used for hot reload when the workflow .ts file changes.
* Waits up to `drainTimeoutMs` for threads to complete before force-killing.
*/
drainAndRespawn: (workflowName: string, drainTimeoutMs?: number) => Promise<void>;
/** Gracefully shut down all workflow workers. */
stop: () => Promise<void>;
};
@@ -46,8 +58,21 @@ type WorkerEntry = {
workflowName: string;
process: ChildProcess;
stopping: boolean;
/** When set, the worker is draining before a hot-reload respawn. */
draining: boolean;
};
// Crash respawn backoff: track crash timestamps per workflow.
const MAX_CRASHES_IN_WINDOW = 5;
const CRASH_WINDOW_MS = 60_000;
/**
* Worker shutdown timeout — must stay in sync with SHUTDOWN_TIMEOUT_MS in workflow-worker.ts.
* The drain timeout passed to drainAndRespawn must be >= this value so the worker has
* enough time to finish in-flight threads before the parent force-kills it.
*/
const WORKER_SHUTDOWN_TIMEOUT_MS = 10_000;
const DEFAULT_MAX_QUEUE = 100;
function resolveWorkerScript(): string {
@@ -86,6 +111,15 @@ function sendShutdown(worker: ChildProcess, entry: WorkerEntry): void {
}
}
function sendResumeThread(worker: ChildProcess, msg: ResumeThreadMessage): void {
if (worker.connected === false) return;
try {
worker.send(msg);
} catch {
// IPC channel closed between connected check and send
}
}
function waitForExit(child: ChildProcess, timeoutMs: number): Promise<void> {
return new Promise((resolve) => {
const timer = setTimeout(() => {
@@ -108,6 +142,7 @@ export function createWorkflowManager(
const states = new Map<string, WorkflowState>();
const workers = new Map<string, WorkerEntry>();
const crashTimestamps = new Map<string, number[]>();
let stopped = false;
let config = initialConfig;
@@ -124,6 +159,19 @@ export function createWorkflowManager(
return config.workflows?.[workflowName] ?? null;
}
function toWorkflowRunStatus(eventType: string): WorkflowRunStatus | null {
const map: Record<string, WorkflowRunStatus> = {
started: "started",
queued: "queued",
completed: "completed",
failed: "failed",
crashed: "crashed",
dropped: "dropped",
interrupted: "interrupted",
};
return map[eventType] ?? null;
}
function logWorkflowEvent(
workflowName: string,
runId: string,
@@ -131,34 +179,12 @@ export function createWorkflowManager(
payload?: unknown,
): void {
const ts = Date.now();
if (
eventType === "started" ||
eventType === "queued" ||
eventType === "completed" ||
eventType === "failed" ||
eventType === "crashed" ||
eventType === "dropped"
) {
const status =
eventType === "dropped"
? ("dropped" as const)
: eventType === "queued"
? ("queued" as const)
: eventType === "started"
? ("started" as const)
: eventType === "completed"
? ("completed" as const)
: eventType === "failed"
? ("failed" as const)
: ("crashed" as const);
const serialised = payload !== undefined ? JSON.stringify(payload) : null;
const status = toWorkflowRunStatus(eventType);
if (status !== null) {
logStore.upsertWorkflowRun(
{
source: "workflow",
type: eventType,
refId: runId,
payload: payload !== undefined ? JSON.stringify(payload) : null,
ts,
},
{ source: "workflow", type: eventType, refId: runId, payload: serialised, ts },
{ runId, workflow: workflowName, status, ts },
);
} else {
@@ -166,7 +192,7 @@ export function createWorkflowManager(
source: "workflow",
type: eventType,
refId: runId,
payload: payload !== undefined ? JSON.stringify(payload) : null,
payload: serialised,
ts,
});
}
@@ -184,7 +210,8 @@ export function createWorkflowManager(
triggerPayload: payload,
};
sendStartThread(worker.process, msg);
logWorkflowEvent(workflowName, runId, "started");
// Store triggerPayload in the log so it can be recovered after a crash
logWorkflowEvent(workflowName, runId, "started", { triggerPayload: payload });
}
function dequeueNext(workflowName: string): void {
@@ -216,6 +243,60 @@ export function createWorkflowManager(
}
}
function recoverQueuedRun(workflowName: string, runId: string, state: WorkflowState): void {
if (state.queue.some((q) => q.runId === runId)) return;
const triggerPayload = logStore.getTriggerPayload(runId);
state.queue.push({ runId, payload: triggerPayload });
process.stderr.write(
`[workflow-manager] crash-recovery: re-queued thread "${runId}" for "${workflowName}"\n`,
);
}
function recoverStartedRun(
workflowName: string,
runId: string,
state: WorkflowState,
worker: WorkerEntry,
): void {
if (state.active.has(runId)) return;
const events = logStore.getThreadEvents(runId);
const triggerPayload = logStore.getTriggerPayload(runId);
state.active.add(runId);
const msg: ResumeThreadMessage = {
type: "resume-thread",
runId,
events,
triggerPayload,
};
sendResumeThread(worker.process, msg);
process.stderr.write(
`[workflow-manager] crash-recovery: resuming thread "${runId}" for "${workflowName}" (${events.length} events)\n`,
);
}
function recoverThreadsForWorker(workflowName: string, worker: WorkerEntry): void {
const activeRuns = logStore.getActiveWorkflowRuns(workflowName);
const state = getOrCreateState(workflowName);
for (const run of activeRuns) {
if (run.status === "queued") {
recoverQueuedRun(workflowName, run.runId, state);
} else if (run.status === "started") {
recoverStartedRun(workflowName, run.runId, state, worker);
}
}
}
function recordCrashAndCheckLimit(workflowName: string): boolean {
const now = Date.now();
const timestamps = (crashTimestamps.get(workflowName) ?? []).filter(
(t) => now - t < CRASH_WINDOW_MS,
);
timestamps.push(now);
crashTimestamps.set(workflowName, timestamps);
return timestamps.length > MAX_CRASHES_IN_WINDOW;
}
function handleWorkerCrash(workflowName: string): void {
const state = states.get(workflowName);
if (state === undefined) return;
@@ -225,13 +306,113 @@ export function createWorkflowManager(
process.stderr.write(
`[workflow-manager] worker for "${workflowName}" crashed with ${crashedCount} active thread(s)\n`,
);
for (const runId of state.active) {
logWorkflowEvent(workflowName, runId, "crashed");
}
}
// All active threads are now crashed — we can't recover runIds from this
// in-memory state alone (Phase 3 crash recovery will use the DB).
// Reset active set so the manager stays consistent.
state.active.clear();
workers.delete(workflowName);
if (stopped || workflowConfig(workflowName) === null) return;
if (recordCrashAndCheckLimit(workflowName)) {
const count = crashTimestamps.get(workflowName)?.length ?? 0;
process.stderr.write(
`[workflow-manager] worker for "${workflowName}" crashed ${count} times in ${CRASH_WINDOW_MS}ms — stopping respawn\n`,
);
return;
}
process.stderr.write(
`[workflow-manager] respawning worker for "${workflowName}" after crash\n`,
);
const newWorker = getOrSpawnWorker(workflowName);
setImmediate(() => {
recoverThreadsForWorker(workflowName, newWorker);
});
}
function handleWorkerMessage(workflowName: string, raw: unknown): void {
const result = parseWorkerMessage(raw);
if (!result.ok) {
process.stderr.write(
`[workflow-manager] invalid message from "${workflowName}" worker: ${result.error.message}\n`,
);
return;
}
const msg = result.value;
if (msg.type === "thread-event") {
handleThreadEvent(workflowName, msg);
return;
}
if (msg.type === "thread-command-event") {
logStore.append({
source: "workflow",
type: "thread_command_event",
refId: msg.runId,
payload: JSON.stringify(msg.event),
ts: Date.now(),
});
return;
}
if (msg.type === "workflow-error") {
process.stderr.write(
`[workflow-manager] workflow-error for runId "${msg.runId}" in "${workflowName}": ${msg.error}\n`,
);
const state = states.get(workflowName);
if (state !== undefined) {
state.active.delete(msg.runId);
dequeueNext(workflowName);
}
logWorkflowEvent(workflowName, msg.runId, "failed", { error: msg.error });
return;
}
if (msg.type === "error") {
process.stderr.write(
`[workflow-manager] error from "${workflowName}" worker: ${msg.error}\n`,
);
}
}
function markActiveRunsInterrupted(workflowName: string): void {
const state = states.get(workflowName);
if (state === undefined) return;
for (const runId of state.active) {
logWorkflowEvent(workflowName, runId, "interrupted");
}
state.active.clear();
}
function handleWorkerExit(workflowName: string, code: number | null): void {
const entry = workers.get(workflowName);
if (entry?.draining) {
workers.delete(workflowName);
markActiveRunsInterrupted(workflowName);
if (!stopped && workflowConfig(workflowName) !== null) {
process.stderr.write(
`[workflow-manager] worker for "${workflowName}" drained, respawning\n`,
);
getOrSpawnWorker(workflowName);
}
return;
}
if (entry?.stopping) {
workers.delete(workflowName);
const state = states.get(workflowName);
if (state !== undefined) {
state.active.clear();
}
return;
}
process.stderr.write(
`[workflow-manager] worker for "${workflowName}" exited with code ${code ?? "null"}\n`,
);
handleWorkerCrash(workflowName);
}
function getOrSpawnWorker(workflowName: string): WorkerEntry {
@@ -243,57 +424,14 @@ export function createWorkflowManager(
const child = spawnWorkflowWorker(nerveRoot, workflowName, workerScript);
child.on("message", (raw: unknown) => {
const result = parseWorkerMessage(raw);
if (!result.ok) {
process.stderr.write(
`[workflow-manager] invalid message from "${workflowName}" worker: ${result.error.message}\n`,
);
return;
}
const msg = result.value;
if (msg.type === "thread-event") {
handleThreadEvent(workflowName, msg);
return;
}
if (msg.type === "workflow-error") {
process.stderr.write(
`[workflow-manager] workflow-error for runId "${msg.runId}" in "${workflowName}": ${msg.error}\n`,
);
const state = states.get(workflowName);
if (state !== undefined) {
state.active.delete(msg.runId);
dequeueNext(workflowName);
}
logWorkflowEvent(workflowName, msg.runId, "failed", { error: msg.error });
return;
}
if (msg.type === "error") {
process.stderr.write(
`[workflow-manager] error from "${workflowName}" worker: ${msg.error}\n`,
);
}
handleWorkerMessage(workflowName, raw);
});
child.on("exit", (code) => {
const entry = workers.get(workflowName);
if (entry?.stopping) {
workers.delete(workflowName);
const state = states.get(workflowName);
if (state !== undefined) {
state.active.clear();
}
return;
}
process.stderr.write(
`[workflow-manager] worker for "${workflowName}" exited with code ${code ?? "null"}\n`,
);
handleWorkerCrash(workflowName);
handleWorkerExit(workflowName, code);
});
const entry: WorkerEntry = { workflowName, process: child, stopping: false };
const entry: WorkerEntry = { workflowName, process: child, stopping: false, draining: false };
workers.set(workflowName, entry);
return entry;
}
@@ -365,6 +503,36 @@ export function createWorkflowManager(
config = newConfig;
}
/**
* Default drain timeout must be at least WORKER_SHUTDOWN_TIMEOUT_MS so the worker
* has enough time to finish in-flight threads before the parent force-kills it.
*/
const DEFAULT_DRAIN_TIMEOUT_MS = Math.max(30_000, WORKER_SHUTDOWN_TIMEOUT_MS + 5_000);
async function drainAndRespawn(
workflowName: string,
drainTimeoutMs: number = DEFAULT_DRAIN_TIMEOUT_MS,
): Promise<void> {
const entry = workers.get(workflowName);
if (entry === undefined) {
// No active worker — nothing to drain
return;
}
entry.draining = true;
// Send shutdown without setting stopping=true (so the exit handler uses the draining branch)
if (entry.process.connected) {
const msg: ShutdownMessage = { type: "shutdown" };
try {
entry.process.send(msg);
} catch {
// IPC closed
}
}
await waitForExit(entry.process, drainTimeoutMs);
// The exit handler (draining branch) will respawn the worker automatically
}
async function stop(): Promise<void> {
stopped = true;
const exitPromises: Promise<void>[] = [];
@@ -376,5 +544,13 @@ export function createWorkflowManager(
workers.clear();
}
return { startWorkflow, activeCount, queueLength, totalActiveCount, updateConfig, stop };
return {
startWorkflow,
activeCount,
queueLength,
totalActiveCount,
updateConfig,
drainAndRespawn,
stop,
};
}
+106 -9
View File
@@ -9,8 +9,8 @@
* workflows/<name>/index.ts (or .js) ← user workflow definition
*/
import { resolve, join } from "node:path";
import { existsSync } from "node:fs";
import { join, resolve } from "node:path";
import type {
CommandEvent,
@@ -19,7 +19,7 @@ import type {
WorkflowDefinition,
} from "@uncaged/nerve-core";
import type { WorkerToParentMessage, ThreadEventType } from "./ipc.js";
import type { ThreadCommandEventMessage, ThreadEventType, WorkerToParentMessage } from "./ipc.js";
import { parseParentMessage } from "./ipc.js";
// ---------------------------------------------------------------------------
@@ -44,29 +44,107 @@ function sendWorkflowError(runId: string, error: string): void {
send({ type: "workflow-error", runId, error });
}
function sendCommandEvent(runId: string, event: CommandEvent): void {
const msg: ThreadCommandEventMessage = {
type: "thread-command-event",
runId,
event: event as { type: string; [key: string]: unknown },
};
send(msg);
}
// ---------------------------------------------------------------------------
// Thread loop (RFC-002 §5.4)
// ---------------------------------------------------------------------------
/**
* Replay persisted events through moderate() to reconstruct ThreadState,
* then execute the next role and return the resulting CommandEvent.
* Returns null if the thread is already complete (moderate returned null).
*/
async function replayAndResume(
def: WorkflowDefinition,
runId: string,
ctx: WorkflowContext,
state: ThreadState,
resumeEvents: CommandEvent[],
): Promise<CommandEvent | null> {
let lastNext: ReturnType<typeof def.moderate> = null;
for (const ev of resumeEvents) {
state.events.push(ev);
lastNext = def.moderate(state, ev);
if (lastNext === null) {
sendThreadEvent(runId, "completed", null);
return null;
}
}
const next = lastNext;
if (next === null) {
sendThreadEvent(runId, "completed", null);
return null;
}
const role = def.roles[next.role];
if (!role) {
sendWorkflowError(runId, `Unknown role: ${next.role}`);
return null;
}
try {
const event = await role.execute(next.prompt, ctx);
sendCommandEvent(runId, event);
return event;
} catch (e: unknown) {
const errMsg = e instanceof Error ? e.message : String(e);
sendThreadEvent(runId, "failed", { error: errMsg });
return null;
}
}
async function runThread(
def: WorkflowDefinition,
workflowName: string,
runId: string,
triggerPayload: unknown,
/** Pre-existing event history for crash-recovery resume. Empty for a fresh thread. */
resumeEvents: CommandEvent[] = [],
): Promise<void> {
const state: ThreadState = { runId, events: [] };
const ctx: WorkflowContext = {
runId,
workflowName,
log: (msg) =>
sendThreadEvent(runId, "step_complete", { message: msg }),
log: (msg) => sendThreadEvent(runId, "step_complete", { message: msg }),
};
let event: CommandEvent = {
const initialEvent: CommandEvent = {
type: "thread_start",
triggerPayload: triggerPayload != null && typeof triggerPayload === "object" ? triggerPayload : {},
triggerPayload:
triggerPayload != null && typeof triggerPayload === "object" ? triggerPayload : {},
};
// On resume: replay persisted events, run the next un-executed role, then continue.
if (resumeEvents.length > 0) {
const nextEvent = await replayAndResume(def, runId, ctx, state, resumeEvents);
if (nextEvent === null) return;
await continueThread(def, runId, ctx, state, nextEvent);
return;
}
// Fresh thread — send the initial command event and enter the loop.
sendCommandEvent(runId, initialEvent);
await continueThread(def, runId, ctx, state, initialEvent);
}
async function continueThread(
def: WorkflowDefinition,
runId: string,
ctx: WorkflowContext,
state: ThreadState,
firstEvent: CommandEvent,
): Promise<void> {
let event = firstEvent;
const MAX_STEPS = 1000;
let step = 0;
while (step < MAX_STEPS) {
@@ -92,6 +170,7 @@ async function runThread(
sendThreadEvent(runId, "failed", { error: errMsg });
return;
}
sendCommandEvent(runId, event);
}
if (step >= MAX_STEPS) {
sendWorkflowError(runId, `Thread exceeded maximum steps (${MAX_STEPS})`);
@@ -114,8 +193,7 @@ async function loadWorkflowDefinition(
const indexPath = candidates.find((p) => existsSync(p));
if (!indexPath) {
throw new Error(
`Workflow definition not found for "${workflowName}". Tried:\n` +
candidates.map((p) => ` ${p}`).join("\n"),
`Workflow definition not found for "${workflowName}". Tried:\n${candidates.map((p) => ` ${p}`).join("\n")}`,
);
}
@@ -157,7 +235,7 @@ function handleMessage(
if (msg.type === "shutdown") {
shuttingDown.value = true;
const timeout = new Promise<void>(r => setTimeout(r, 10_000));
const timeout = new Promise<void>((r) => setTimeout(r, 10_000));
Promise.race([Promise.all(inFlight.values()), timeout])
.then(() => process.exit(0))
.catch(() => process.exit(1));
@@ -182,6 +260,25 @@ function handleMessage(
inFlight.set(runId, next);
return;
}
if (msg.type === "resume-thread") {
if (shuttingDown.value) return;
const { runId, events, triggerPayload } = msg;
const previous = inFlight.get(runId) ?? Promise.resolve();
const next = previous
.then(() => runThread(def, workflowName, runId, triggerPayload, events as CommandEvent[]))
.catch((e: unknown) => {
const errMsg = e instanceof Error ? e.message : String(e);
sendWorkflowError(runId, errMsg);
})
.finally(() => {
inFlight.delete(runId);
});
inFlight.set(runId, next);
return;
}
}
// ---------------------------------------------------------------------------