Merge pull request 'RFC-006 Phase 3: Migrate workflow-manager process logic to WorkerRuntime' (#295) from refactor/rfc-006-workflow-runtime into main
This commit was merged in pull request #295.
This commit is contained in:
@@ -28,6 +28,11 @@ function makeMockChild(pid = 1): MockChild {
|
||||
child.connected = true;
|
||||
child.exitCode = null;
|
||||
child.pid = pid;
|
||||
setImmediate(() => {
|
||||
if (child.connected) {
|
||||
child.emit("message", { type: "ready" });
|
||||
}
|
||||
});
|
||||
child.send = vi.fn((msg: unknown) => {
|
||||
if (
|
||||
msg !== null &&
|
||||
@@ -132,6 +137,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test 1", maxRounds: 10, dryRun: false });
|
||||
mgr.startWorkflow("my-wf", { prompt: "test 2", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
expect(mgr.activeCount("my-wf")).toBe(2);
|
||||
|
||||
// Simulate unexpected exit (not shutdown)
|
||||
@@ -159,6 +165,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
expect(mgr.activeCount("my-wf")).toBe(2);
|
||||
|
||||
const child = mockChildren[0];
|
||||
@@ -183,6 +190,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
expect(mockChildren).toHaveLength(1);
|
||||
|
||||
const child = mockChildren[0];
|
||||
@@ -216,6 +224,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
const firstChild = mockChildren[0];
|
||||
firstChild.exitCode = 1;
|
||||
firstChild.connected = false;
|
||||
@@ -260,6 +269,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
|
||||
|
||||
// Start one thread to fill the concurrency slot (so queued run stays queued on respawn)
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
const firstChild = mockChildren[0];
|
||||
firstChild.exitCode = 1;
|
||||
firstChild.connected = false;
|
||||
@@ -285,6 +295,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
const child = mockChildren[0];
|
||||
const startCall = (child.send as ReturnType<typeof vi.fn>).mock.calls[0];
|
||||
@@ -322,6 +333,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
|
||||
|
||||
const launch = { prompt: "build-docker for myrepo", maxRounds: 10, dryRun: false };
|
||||
mgr.startWorkflow("my-wf", launch);
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
const startedCall = logStore.upsertWorkflowRun.mock.calls.find(
|
||||
(args: any[]) => (args[0] as { type: string }).type === "started",
|
||||
@@ -357,6 +369,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
|
||||
|
||||
// Start one thread to fill the concurrency slot
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
const firstChild = mockChildren[0];
|
||||
|
||||
// Crash once → respawn → crash again → second respawn
|
||||
@@ -398,6 +411,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
const firstChild = mockChildren[0];
|
||||
firstChild.exitCode = 1;
|
||||
firstChild.connected = false;
|
||||
@@ -428,6 +442,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("crash-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
// Crash the worker 6 times in rapid succession (within CRASH_WINDOW_MS = 60s)
|
||||
for (let i = 0; i < 6; i++) {
|
||||
|
||||
@@ -33,6 +33,11 @@ function makeMockChild(pid = 1): MockChild {
|
||||
child.connected = true;
|
||||
child.exitCode = null;
|
||||
child.pid = pid;
|
||||
setImmediate(() => {
|
||||
if (child.connected) {
|
||||
child.emit("message", { type: "ready" });
|
||||
}
|
||||
});
|
||||
child.send = vi.fn((msg: unknown) => {
|
||||
if (
|
||||
msg !== null &&
|
||||
@@ -114,6 +119,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => {
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
expect(mockChildren).toHaveLength(1);
|
||||
|
||||
// Remove workflow from config before drain completes
|
||||
@@ -134,6 +140,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => {
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
expect(mgr.activeCount("my-wf")).toBe(2);
|
||||
|
||||
const drainPromise = mgr.drainAndRespawn("my-wf", 5000);
|
||||
@@ -165,6 +172,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => {
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
expect(mockChildren).toHaveLength(1);
|
||||
|
||||
const drainPromise = mgr.drainAndRespawn("my-wf", 5000);
|
||||
@@ -181,6 +189,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => {
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
expect(mockChildren).toHaveLength(1);
|
||||
|
||||
const drainPromise = mgr.drainAndRespawn("my-wf", 5000);
|
||||
@@ -198,6 +207,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => {
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
const drainPromise = mgr.drainAndRespawn("my-wf", 5000);
|
||||
await vi.runAllTimersAsync();
|
||||
@@ -223,6 +233,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => {
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "first", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
const drainPromise = mgr.drainAndRespawn("my-wf", 5000);
|
||||
await vi.runAllTimersAsync();
|
||||
@@ -230,6 +241,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => {
|
||||
|
||||
// Start a new thread on the fresh worker
|
||||
mgr.startWorkflow("my-wf", { prompt: "second", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
const newChild = mockChildren[1];
|
||||
const startCalls = (newChild.send as ReturnType<typeof vi.fn>).mock.calls.filter(
|
||||
@@ -257,12 +269,13 @@ describe("WorkflowManager — drainWhenIdle (hot reload without interrupting in-
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
it("does not send shutdown while a thread is still active", () => {
|
||||
it("does not send shutdown while a thread is still active", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } });
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
const child = mockChildren[0];
|
||||
|
||||
mgr.drainWhenIdle("my-wf");
|
||||
@@ -282,6 +295,7 @@ describe("WorkflowManager — drainWhenIdle (hot reload without interrupting in-
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
const child = mockChildren[0];
|
||||
const runId = (child.send as ReturnType<typeof vi.fn>).mock.calls[0][0] as { runId: string };
|
||||
|
||||
@@ -311,6 +325,7 @@ describe("WorkflowManager — drainWhenIdle (hot reload without interrupting in-
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "a", maxRounds: 10, dryRun: false });
|
||||
mgr.startWorkflow("my-wf", { prompt: "b", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
const child = mockChildren[0];
|
||||
const sendMock = child.send as ReturnType<typeof vi.fn>;
|
||||
const runIdA = (sendMock.mock.calls[0][0] as { runId: string }).runId;
|
||||
@@ -355,6 +370,7 @@ describe("WorkflowManager — drainWhenIdle (hot reload without interrupting in-
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
const child = mockChildren[0];
|
||||
const runId = (child.send as ReturnType<typeof vi.fn>).mock.calls[0][0] as { runId: string };
|
||||
|
||||
@@ -388,6 +404,7 @@ describe("WorkflowManager — drainWhenIdle (hot reload without interrupting in-
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
const child = mockChildren[0];
|
||||
const runId = (child.send as ReturnType<typeof vi.fn>).mock.calls[0][0] as { runId: string };
|
||||
|
||||
@@ -414,6 +431,7 @@ describe("WorkflowManager — drainWhenIdle (hot reload without interrupting in-
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "once", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
const firstChild = mockChildren[0];
|
||||
const runId = (firstChild.send as ReturnType<typeof vi.fn>).mock.calls[0][0] as {
|
||||
runId: string;
|
||||
@@ -471,6 +489,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
|
||||
|
||||
// Trigger a workflow thread so a worker is spawned
|
||||
kernel.workflowManager.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
// Manually call drainAndRespawn (simulating what kernel does on workflow file change)
|
||||
const drainPromise = kernel.workflowManager.drainAndRespawn("my-wf", 1000);
|
||||
@@ -511,6 +530,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
|
||||
maxRounds: 10,
|
||||
dryRun: false,
|
||||
});
|
||||
await vi.runAllTimersAsync();
|
||||
expect(mockChildren).toHaveLength(1);
|
||||
|
||||
// Reload config without old-wf
|
||||
@@ -551,6 +571,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
|
||||
});
|
||||
|
||||
kernel.workflowManager.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
const workersBefore = mockChildren.length;
|
||||
|
||||
// Reload with updated concurrency — should NOT spawn a new workflow worker
|
||||
@@ -573,6 +594,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
|
||||
// Can now start up to 5 concurrent threads (previously only 1)
|
||||
kernel.workflowManager.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
kernel.workflowManager.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
expect(kernel.workflowManager.activeCount("my-wf")).toBe(3);
|
||||
|
||||
const stopPromise = kernel.stop();
|
||||
|
||||
@@ -194,6 +194,8 @@ describe("kernel + workflowManager integration", () => {
|
||||
});
|
||||
}
|
||||
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
// A workflow worker should be spawned and a start-thread message sent
|
||||
const workflowWorker = mockChildren.find((c) =>
|
||||
(c.send as ReturnType<typeof vi.fn>).mock.calls.some(
|
||||
@@ -252,6 +254,8 @@ describe("kernel + workflowManager integration", () => {
|
||||
});
|
||||
}
|
||||
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
// Find the start-thread call and verify triggerPayload
|
||||
const startThreadCall = mockChildren
|
||||
.flatMap((c) => (c.send as ReturnType<typeof vi.fn>).mock.calls as [unknown][])
|
||||
@@ -306,6 +310,8 @@ describe("kernel + workflowManager integration", () => {
|
||||
});
|
||||
}
|
||||
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
const senseEntries = logStore.append.mock.calls
|
||||
.map((c) => c[0] as { source: string; type: string; refId: string | null })
|
||||
.filter((e) => e.source === "sense" && e.refId === "cpu-usage");
|
||||
@@ -423,6 +429,8 @@ describe("kernel + workflowManager integration", () => {
|
||||
});
|
||||
}
|
||||
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
expect(logStore.upsertWorkflowRun).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ source: "workflow", type: "started" }),
|
||||
expect.objectContaining({ workflow: "log-test-workflow", status: "started" }),
|
||||
@@ -498,6 +506,8 @@ describe("kernel + workflowManager integration", () => {
|
||||
});
|
||||
}
|
||||
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
const startThreadCall = mockChildren
|
||||
.flatMap((c) => (c.send as ReturnType<typeof vi.fn>).mock.calls as [unknown][])
|
||||
.find(
|
||||
@@ -582,6 +592,8 @@ describe("kernel + workflowManager integration", () => {
|
||||
});
|
||||
}
|
||||
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
const startThreadCall = mockChildren
|
||||
.flatMap((c) => (c.send as ReturnType<typeof vi.fn>).mock.calls as [unknown][])
|
||||
.find(
|
||||
@@ -642,6 +654,8 @@ describe("kernel + workflowManager integration", () => {
|
||||
});
|
||||
}
|
||||
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
const stopPromise = kernel.stop();
|
||||
await vi.runAllTimersAsync();
|
||||
await expect(stopPromise).resolves.toBeUndefined();
|
||||
|
||||
@@ -16,6 +16,7 @@ function baseConfig(script: string) {
|
||||
onMessage: vi.fn(),
|
||||
onReady: vi.fn(),
|
||||
onExit: vi.fn(),
|
||||
onCrashLimitReached: null,
|
||||
respawn: {
|
||||
enabled: true,
|
||||
maxCrashes: 6,
|
||||
@@ -84,7 +85,7 @@ describe("createWorkerRuntime", () => {
|
||||
const rt = track(createWorkerRuntime(baseConfig(echoWorkerPath)));
|
||||
await rt.start("k");
|
||||
expect(rt.has("k")).toBe(true);
|
||||
await rt.evict("k");
|
||||
await rt.evict("k", null);
|
||||
expect(rt.has("k")).toBe(false);
|
||||
await rt.shutdown();
|
||||
});
|
||||
@@ -94,7 +95,7 @@ describe("createWorkerRuntime", () => {
|
||||
await rt.start("k");
|
||||
const before = rt.pid("k");
|
||||
expect(before).not.toBeNull();
|
||||
await rt.drain("k");
|
||||
await rt.drain("k", null);
|
||||
const after = rt.pid("k");
|
||||
expect(after).not.toBeNull();
|
||||
expect(after).not.toBe(before);
|
||||
|
||||
@@ -26,6 +26,11 @@ function makeMockChild(pid = 1): MockChild {
|
||||
child.connected = true;
|
||||
child.exitCode = null;
|
||||
child.pid = pid;
|
||||
setImmediate(() => {
|
||||
if (child.connected) {
|
||||
child.emit("message", { type: "ready" });
|
||||
}
|
||||
});
|
||||
child.send = vi.fn((msg: unknown) => {
|
||||
if (
|
||||
msg !== null &&
|
||||
@@ -110,7 +115,7 @@ describe("WorkflowManager", () => {
|
||||
});
|
||||
|
||||
describe("startWorkflow under concurrency limit dispatches thread", () => {
|
||||
it("forks a worker and sends start-thread when active < concurrency", () => {
|
||||
it("forks a worker and sends start-thread when active < concurrency", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeConfig({
|
||||
"my-workflow": { concurrency: 2, overflow: "drop" },
|
||||
@@ -118,6 +123,7 @@ describe("WorkflowManager", () => {
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-workflow", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
expect(mockChildren).toHaveLength(1);
|
||||
expect(mockChildren[0].send).toHaveBeenCalledWith(
|
||||
@@ -126,7 +132,7 @@ describe("WorkflowManager", () => {
|
||||
expect(mgr.activeCount("my-workflow")).toBe(1);
|
||||
});
|
||||
|
||||
it("reuses the same worker for a second thread under the limit", () => {
|
||||
it("reuses the same worker for a second thread under the limit", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeConfig({
|
||||
"my-workflow": { concurrency: 3, overflow: "drop" },
|
||||
@@ -135,6 +141,7 @@ describe("WorkflowManager", () => {
|
||||
|
||||
mgr.startWorkflow("my-workflow", { prompt: "test 1", maxRounds: 10, dryRun: false });
|
||||
mgr.startWorkflow("my-workflow", { prompt: "test 2", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
// Only one forked child — worker is reused
|
||||
expect(mockChildren).toHaveLength(1);
|
||||
@@ -142,7 +149,7 @@ describe("WorkflowManager", () => {
|
||||
expect(mgr.activeCount("my-workflow")).toBe(2);
|
||||
});
|
||||
|
||||
it("logs a 'started' event for each dispatched thread", () => {
|
||||
it("logs a 'started' event for each dispatched thread", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeConfig({
|
||||
"my-workflow": { concurrency: 2, overflow: "drop" },
|
||||
@@ -150,6 +157,7 @@ describe("WorkflowManager", () => {
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-workflow", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
expect(logStore.upsertWorkflowRun).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ source: "workflow", type: "started" }),
|
||||
@@ -159,7 +167,7 @@ describe("WorkflowManager", () => {
|
||||
});
|
||||
|
||||
describe("startWorkflow at limit with drop overflow drops the request", () => {
|
||||
it("does NOT send start-thread when at concurrency limit with overflow=drop", () => {
|
||||
it("does NOT send start-thread when at concurrency limit with overflow=drop", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeConfig({
|
||||
"drop-wf": { concurrency: 1, overflow: "drop" },
|
||||
@@ -169,6 +177,7 @@ describe("WorkflowManager", () => {
|
||||
mgr.startWorkflow("drop-wf", { prompt: "first", maxRounds: 10, dryRun: false });
|
||||
// now at limit — second call should be dropped
|
||||
mgr.startWorkflow("drop-wf", { prompt: "second", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
expect(mgr.activeCount("drop-wf")).toBe(1);
|
||||
expect(mgr.queueLength("drop-wf")).toBe(0);
|
||||
@@ -254,7 +263,7 @@ describe("WorkflowManager", () => {
|
||||
});
|
||||
|
||||
describe("completing a thread dequeues the next one", () => {
|
||||
it("dispatches the next queued thread when the active thread sends completed", () => {
|
||||
it("dispatches the next queued thread when the active thread sends completed", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeConfig({
|
||||
"queue-wf": { concurrency: 1, overflow: "queue", maxQueue: 5 },
|
||||
@@ -263,6 +272,7 @@ describe("WorkflowManager", () => {
|
||||
|
||||
mgr.startWorkflow("queue-wf", { prompt: "first", maxRounds: 10, dryRun: false });
|
||||
mgr.startWorkflow("queue-wf", { prompt: "second", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
expect(mgr.activeCount("queue-wf")).toBe(1);
|
||||
expect(mgr.queueLength("queue-wf")).toBe(1);
|
||||
@@ -289,7 +299,7 @@ describe("WorkflowManager", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("dispatches next queued thread when active thread sends failed", () => {
|
||||
it("dispatches next queued thread when active thread sends failed", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeConfig({
|
||||
"queue-wf": { concurrency: 1, overflow: "queue", maxQueue: 5 },
|
||||
@@ -298,6 +308,7 @@ describe("WorkflowManager", () => {
|
||||
|
||||
mgr.startWorkflow("queue-wf", { prompt: "first", maxRounds: 10, dryRun: false });
|
||||
mgr.startWorkflow("queue-wf", { prompt: "second", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
const child = mockChildren[0];
|
||||
const firstRunId = (child.send as ReturnType<typeof vi.fn>).mock.calls[0][0].runId as string;
|
||||
@@ -325,6 +336,7 @@ describe("WorkflowManager", () => {
|
||||
|
||||
mgr.startWorkflow("wf-a", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
mgr.startWorkflow("wf-b", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
// Two distinct workers should have been forked
|
||||
expect(mockChildren).toHaveLength(2);
|
||||
|
||||
@@ -66,6 +66,7 @@ export function createSenseWorkerPool(options: SenseWorkerPoolOptions): SenseWor
|
||||
onReady: (_key, msg) => {
|
||||
options.onWorkerMessage(msg);
|
||||
},
|
||||
onCrashLimitReached: null,
|
||||
onExit: (group, code, signal) => {
|
||||
const sig =
|
||||
signal === null || signal === undefined || signal === ""
|
||||
@@ -102,13 +103,13 @@ export function createSenseWorkerPool(options: SenseWorkerPoolOptions): SenseWor
|
||||
return;
|
||||
}
|
||||
options.onBeforeGroupRestart(group);
|
||||
await runtime.drain(group);
|
||||
await runtime.drain(group, null);
|
||||
}
|
||||
|
||||
function evictGroup(group: string): void {
|
||||
trackedGroups.delete(group);
|
||||
evicting.add(group);
|
||||
void runtime.evict(group).finally(() => {
|
||||
void runtime.evict(group, null).finally(() => {
|
||||
evicting.delete(group);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -8,6 +8,10 @@ import { isPlainRecord } from "@uncaged/nerve-core";
|
||||
|
||||
const STDERR_TAIL_MAX_CHARS = 2048;
|
||||
|
||||
export type WorkerDrainOpts = {
|
||||
shutdownTimeoutMs: number | null;
|
||||
};
|
||||
|
||||
export type WorkerRuntimeConfig<K extends string> = {
|
||||
script: string;
|
||||
argsForKey: (key: K) => string[];
|
||||
@@ -16,6 +20,8 @@ export type WorkerRuntimeConfig<K extends string> = {
|
||||
onMessage: (key: K, msg: unknown) => void;
|
||||
onReady: (key: K, msg: unknown) => void;
|
||||
onExit: (key: K, code: number | null, signal: string | null) => void;
|
||||
/** Invoked when automatic respawn is skipped because `maxCrashes` was exceeded in `windowMs`. */
|
||||
onCrashLimitReached: ((key: K) => void) | null;
|
||||
respawn: {
|
||||
enabled: boolean;
|
||||
maxCrashes: number;
|
||||
@@ -32,8 +38,8 @@ export type WorkerRuntime<K extends string> = {
|
||||
/** When the worker is already ready and IPC-connected, sends synchronously (returns true). Otherwise false — caller may fall back to `send`. */
|
||||
trySendSync: (key: K, msg: unknown) => boolean;
|
||||
start: (key: K) => Promise<void>;
|
||||
evict: (key: K) => Promise<void>;
|
||||
drain: (key: K) => Promise<void>;
|
||||
evict: (key: K, opts: WorkerDrainOpts | null) => Promise<void>;
|
||||
drain: (key: K, opts: WorkerDrainOpts | null) => Promise<void>;
|
||||
shutdown: () => Promise<void>;
|
||||
has: (key: K) => boolean;
|
||||
/** True when a child exists but IPC is disconnected (legacy pool skipped sends in this case). */
|
||||
@@ -264,7 +270,14 @@ export function createWorkerRuntime<K extends string>(
|
||||
await waitForReady(slot, config.shutdownTimeoutMs);
|
||||
}
|
||||
|
||||
async function gracefulStop(slot: WorkerSlot<K>): Promise<void> {
|
||||
function resolveShutdownTimeoutMs(opts: WorkerDrainOpts | null): number {
|
||||
if (opts !== null && opts.shutdownTimeoutMs !== null) {
|
||||
return opts.shutdownTimeoutMs;
|
||||
}
|
||||
return config.shutdownTimeoutMs;
|
||||
}
|
||||
|
||||
async function gracefulStop(slot: WorkerSlot<K>, shutdownTimeoutMs: number): Promise<void> {
|
||||
if (slot.child === null) {
|
||||
return;
|
||||
}
|
||||
@@ -276,7 +289,7 @@ export function createWorkerRuntime<K extends string>(
|
||||
} catch {
|
||||
// IPC channel may have closed between null-check and send
|
||||
}
|
||||
await waitForChildExit(child, config.shutdownTimeoutMs);
|
||||
await waitForChildExit(child, shutdownTimeoutMs);
|
||||
}
|
||||
|
||||
async function handleUnexpectedCrashRecovery(slot: WorkerSlot<K>): Promise<void> {
|
||||
@@ -295,6 +308,9 @@ export function createWorkerRuntime<K extends string>(
|
||||
console.error(
|
||||
`[WorkerRuntime] worker "${String(slot.key)}" exceeded crash limit (${String(config.respawn.maxCrashes)} in ${String(config.respawn.windowMs)}ms); not respawning`,
|
||||
);
|
||||
if (config.onCrashLimitReached !== null) {
|
||||
config.onCrashLimitReached(slot.key);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -303,7 +319,7 @@ export function createWorkerRuntime<K extends string>(
|
||||
}
|
||||
|
||||
async function shutdownWorker(slot: WorkerSlot<K>): Promise<void> {
|
||||
await gracefulStop(slot);
|
||||
await gracefulStop(slot, config.shutdownTimeoutMs);
|
||||
workers.delete(slot.key);
|
||||
}
|
||||
|
||||
@@ -348,22 +364,24 @@ export function createWorkerRuntime<K extends string>(
|
||||
});
|
||||
},
|
||||
|
||||
evict: async (key: K) => {
|
||||
evict: async (key: K, opts: WorkerDrainOpts | null) => {
|
||||
const slot = getOrCreateSlot(key);
|
||||
const shutdownMs = resolveShutdownTimeoutMs(opts);
|
||||
await enqueueOp(slot, async () => {
|
||||
await gracefulStop(slot);
|
||||
await gracefulStop(slot, shutdownMs);
|
||||
workers.delete(key);
|
||||
});
|
||||
},
|
||||
|
||||
drain: async (key: K) => {
|
||||
drain: async (key: K, opts: WorkerDrainOpts | null) => {
|
||||
const slot = getOrCreateSlot(key);
|
||||
const shutdownMs = resolveShutdownTimeoutMs(opts);
|
||||
await enqueueOp(slot, async () => {
|
||||
if (slot.child === null) {
|
||||
await forkAndWaitReady(slot);
|
||||
return;
|
||||
}
|
||||
await gracefulStop(slot);
|
||||
await gracefulStop(slot, shutdownMs);
|
||||
await forkAndWaitReady(slot);
|
||||
});
|
||||
},
|
||||
|
||||
@@ -0,0 +1,256 @@
|
||||
/**
|
||||
* Pure helpers and IPC branching for workflow-manager (keeps workflow-manager.ts lean).
|
||||
*/
|
||||
|
||||
import { dirname, join } from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
|
||||
import type { WorkflowMessage } from "@uncaged/nerve-core";
|
||||
import { START, isPlainRecord } from "@uncaged/nerve-core";
|
||||
|
||||
import type { LogStore, WorkflowRunStatus } from "@uncaged/nerve-store";
|
||||
import type { ResumeThreadMessage, ThreadEventMessage } from "./ipc.js";
|
||||
import type { WorkerToParentMessage } from "./ipc.js";
|
||||
|
||||
export type PendingThread = {
|
||||
runId: string;
|
||||
prompt: string;
|
||||
maxRounds: number;
|
||||
dryRun: boolean;
|
||||
};
|
||||
|
||||
export type WorkflowState = {
|
||||
active: Set<string>;
|
||||
queue: PendingThread[];
|
||||
};
|
||||
|
||||
/** Matches legacy manager: 6 crashes within 60s stops respawn (was `length > 5`). */
|
||||
export const WORKFLOW_WORKER_RESPAWN = {
|
||||
enabled: true,
|
||||
maxCrashes: 6,
|
||||
windowMs: 60_000,
|
||||
delayMs: 0,
|
||||
} as const;
|
||||
|
||||
/**
|
||||
* Worker shutdown timeout — must stay in sync with SHUTDOWN_TIMEOUT_MS in workflow-worker.ts.
|
||||
* The drain timeout passed to drainAndRespawn must be >= this value so the worker has
|
||||
* enough time to finish in-flight threads before the parent force-kills it.
|
||||
*/
|
||||
export const WORKER_SHUTDOWN_TIMEOUT_MS = 10_000;
|
||||
|
||||
export const DEFAULT_MAX_QUEUE = 100;
|
||||
|
||||
export function readLaunchFromTriggerPayload(
|
||||
raw: unknown,
|
||||
engineDefaultMaxRounds: number,
|
||||
): { prompt: string; maxRounds: number; dryRun: boolean } {
|
||||
if (isPlainRecord(raw)) {
|
||||
const o = raw;
|
||||
if (typeof o.prompt === "string" && typeof o.maxRounds === "number") {
|
||||
const dryRun = typeof o.dryRun === "boolean" ? o.dryRun : false;
|
||||
return { prompt: o.prompt, maxRounds: o.maxRounds, dryRun };
|
||||
}
|
||||
}
|
||||
return { prompt: "", maxRounds: engineDefaultMaxRounds, dryRun: false };
|
||||
}
|
||||
|
||||
export function ensureThreadMessagesWithStart(
|
||||
messages: Array<{ role: string; content: string; meta: unknown; timestamp: number }>,
|
||||
threadId: string,
|
||||
fallbackPrompt: string,
|
||||
fallbackMaxRounds: number,
|
||||
): WorkflowMessage[] {
|
||||
const mapped: WorkflowMessage[] = messages.map((m) => ({
|
||||
role: m.role,
|
||||
content: m.content,
|
||||
meta: m.meta,
|
||||
timestamp: m.timestamp,
|
||||
}));
|
||||
if (mapped.length > 0 && mapped[0].role === START) {
|
||||
return mapped;
|
||||
}
|
||||
const start: WorkflowMessage = {
|
||||
role: START,
|
||||
content: fallbackPrompt,
|
||||
meta: { maxRounds: fallbackMaxRounds, threadId },
|
||||
timestamp: Date.now(),
|
||||
};
|
||||
return [start, ...mapped];
|
||||
}
|
||||
|
||||
export function resolveWorkflowWorkerScript(): string {
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dir = dirname(__filename);
|
||||
return join(__dir, "workflow-worker.js");
|
||||
}
|
||||
|
||||
export function mapWorkflowRunStatus(eventType: string): WorkflowRunStatus | null {
|
||||
const map: Record<string, WorkflowRunStatus> = {
|
||||
started: "started",
|
||||
queued: "queued",
|
||||
completed: "completed",
|
||||
failed: "failed",
|
||||
crashed: "crashed",
|
||||
dropped: "dropped",
|
||||
interrupted: "interrupted",
|
||||
killed: "killed",
|
||||
};
|
||||
return map[eventType] ?? null;
|
||||
}
|
||||
|
||||
export function extractExitCodeFromPayload(payload: unknown): number | null {
|
||||
if (isPlainRecord(payload) && typeof payload.exitCode === "number") {
|
||||
return payload.exitCode;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
export function appendWorkflowRunLog(
|
||||
logStore: LogStore,
|
||||
workflowName: string,
|
||||
runId: string,
|
||||
eventType: string,
|
||||
payload: unknown | undefined,
|
||||
exitCode: number | null,
|
||||
): void {
|
||||
const timestamp = Date.now();
|
||||
const serialised = payload !== undefined ? JSON.stringify(payload) : null;
|
||||
const status = mapWorkflowRunStatus(eventType);
|
||||
|
||||
if (status !== null) {
|
||||
logStore.upsertWorkflowRun(
|
||||
{
|
||||
source: "workflow",
|
||||
type: eventType,
|
||||
refId: runId,
|
||||
payload: serialised,
|
||||
timestamp,
|
||||
},
|
||||
{ runId, workflow: workflowName, status, timestamp, exitCode },
|
||||
);
|
||||
} else {
|
||||
logStore.append({
|
||||
source: "workflow",
|
||||
type: eventType,
|
||||
refId: runId,
|
||||
payload: serialised,
|
||||
timestamp,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export function recoverQueuedRun(
|
||||
workflowName: string,
|
||||
runId: string,
|
||||
state: WorkflowState,
|
||||
logStore: LogStore,
|
||||
engineMaxRounds: number,
|
||||
): void {
|
||||
if (state.queue.some((q) => q.runId === runId)) return;
|
||||
const launch = readLaunchFromTriggerPayload(logStore.getTriggerPayload(runId), engineMaxRounds);
|
||||
state.queue.push({
|
||||
runId,
|
||||
prompt: launch.prompt,
|
||||
maxRounds: launch.maxRounds,
|
||||
dryRun: launch.dryRun,
|
||||
});
|
||||
process.stderr.write(
|
||||
`[workflow-manager] crash-recovery: re-queued thread "${runId}" for "${workflowName}"\n`,
|
||||
);
|
||||
}
|
||||
|
||||
export function recoverStartedRun(
|
||||
workflowName: string,
|
||||
runId: string,
|
||||
state: WorkflowState,
|
||||
logStore: LogStore,
|
||||
engineMaxRounds: number,
|
||||
sendResume: (wf: string, msg: ResumeThreadMessage) => void,
|
||||
): void {
|
||||
if (state.active.has(runId)) return;
|
||||
const rawMessages = logStore.getThreadMessages(runId);
|
||||
const launch = readLaunchFromTriggerPayload(logStore.getTriggerPayload(runId), engineMaxRounds);
|
||||
const messages = ensureThreadMessagesWithStart(
|
||||
rawMessages,
|
||||
runId,
|
||||
launch.prompt,
|
||||
launch.maxRounds,
|
||||
);
|
||||
state.active.add(runId);
|
||||
const msg: ResumeThreadMessage = {
|
||||
type: "resume-thread",
|
||||
runId,
|
||||
messages,
|
||||
maxRounds: launch.maxRounds,
|
||||
dryRun: launch.dryRun,
|
||||
};
|
||||
sendResume(workflowName, msg);
|
||||
process.stderr.write(
|
||||
`[workflow-manager] crash-recovery: resuming thread "${runId}" for "${workflowName}" (${String(messages.length)} messages)\n`,
|
||||
);
|
||||
}
|
||||
|
||||
export function recoverThreadsFromStore(
|
||||
workflowName: string,
|
||||
logStore: LogStore,
|
||||
engineMaxRounds: number,
|
||||
getOrCreateState: (name: string) => WorkflowState,
|
||||
sendResume: (wf: string, msg: ResumeThreadMessage) => void,
|
||||
): void {
|
||||
const activeRuns = logStore.getActiveWorkflowRuns(workflowName);
|
||||
const state = getOrCreateState(workflowName);
|
||||
|
||||
for (const run of activeRuns) {
|
||||
if (run.status === "queued") {
|
||||
recoverQueuedRun(workflowName, run.runId, state, logStore, engineMaxRounds);
|
||||
} else if (run.status === "started") {
|
||||
recoverStartedRun(workflowName, run.runId, state, logStore, engineMaxRounds, sendResume);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export type WorkflowManagerMessageDeps = {
|
||||
logStore: LogStore;
|
||||
handleThreadEvent: (workflowName: string, msg: ThreadEventMessage) => void;
|
||||
onWorkflowRoleError: (
|
||||
workflowName: string,
|
||||
runId: string,
|
||||
error: string,
|
||||
exitCode: number,
|
||||
) => void;
|
||||
};
|
||||
|
||||
export function dispatchWorkflowWorkerMessage(
|
||||
workflowName: string,
|
||||
msg: WorkerToParentMessage,
|
||||
deps: WorkflowManagerMessageDeps,
|
||||
): void {
|
||||
if (msg.type === "thread-event") {
|
||||
deps.handleThreadEvent(workflowName, msg);
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.type === "thread-workflow-message") {
|
||||
deps.logStore.append({
|
||||
source: "workflow",
|
||||
type: "thread_workflow_message",
|
||||
refId: msg.runId,
|
||||
payload: JSON.stringify(msg.message),
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.type === "workflow-error") {
|
||||
process.stderr.write(
|
||||
`[workflow-manager] workflow-error for runId "${msg.runId}" in "${workflowName}": ${msg.error}\n`,
|
||||
);
|
||||
deps.onWorkflowRoleError(workflowName, msg.runId, msg.error, msg.exitCode);
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.type === "error") {
|
||||
process.stderr.write(`[workflow-manager] error from "${workflowName}" worker: ${msg.error}\n`);
|
||||
}
|
||||
}
|
||||
@@ -6,33 +6,24 @@
|
||||
* Concurrency and overflow (drop/queue) are enforced here in the parent process.
|
||||
*/
|
||||
|
||||
import { fork } from "node:child_process";
|
||||
import type { ChildProcess } from "node:child_process";
|
||||
import { dirname, join } from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
import type { NerveConfig, WorkflowConfig, WorkflowStatus } from "@uncaged/nerve-core";
|
||||
|
||||
import type {
|
||||
NerveConfig,
|
||||
WorkflowConfig,
|
||||
WorkflowMessage,
|
||||
WorkflowStatus,
|
||||
} from "@uncaged/nerve-core";
|
||||
import { START, isPlainRecord } from "@uncaged/nerve-core";
|
||||
|
||||
import type { LogStore, WorkflowRunStatus } from "@uncaged/nerve-store";
|
||||
import type {
|
||||
KillThreadMessage,
|
||||
ResumeThreadMessage,
|
||||
ShutdownMessage,
|
||||
StartThreadMessage,
|
||||
ThreadEventMessage,
|
||||
} from "./ipc.js";
|
||||
import type { LogStore } from "@uncaged/nerve-store";
|
||||
import type { KillThreadMessage, StartThreadMessage, ThreadEventMessage } from "./ipc.js";
|
||||
import { parseWorkerMessage } from "./ipc.js";
|
||||
import { formatCapturedStderrTail, formatChildExitSummary } from "./worker-fork-support.js";
|
||||
import { createWorkerRuntime } from "./worker-runtime.js";
|
||||
import {
|
||||
formatCapturedStderrTail,
|
||||
formatChildExitSummary,
|
||||
teeCapturedStderr,
|
||||
} from "./worker-fork-support.js";
|
||||
DEFAULT_MAX_QUEUE,
|
||||
WORKER_SHUTDOWN_TIMEOUT_MS,
|
||||
WORKFLOW_WORKER_RESPAWN,
|
||||
type WorkflowState,
|
||||
appendWorkflowRunLog,
|
||||
dispatchWorkflowWorkerMessage,
|
||||
extractExitCodeFromPayload,
|
||||
recoverThreadsFromStore,
|
||||
resolveWorkflowWorkerScript,
|
||||
} from "./workflow-manager-support.js";
|
||||
|
||||
export type WorkflowLaunchParams = {
|
||||
prompt: string;
|
||||
@@ -74,169 +65,109 @@ export type WorkflowManager = {
|
||||
stop: () => Promise<void>;
|
||||
};
|
||||
|
||||
type PendingThread = {
|
||||
runId: string;
|
||||
prompt: string;
|
||||
maxRounds: number;
|
||||
dryRun: boolean;
|
||||
};
|
||||
|
||||
type WorkflowState = {
|
||||
active: Set<string>;
|
||||
queue: PendingThread[];
|
||||
};
|
||||
|
||||
type WorkerEntry = {
|
||||
workflowName: string;
|
||||
process: ChildProcess;
|
||||
stopping: boolean;
|
||||
/** When set, the worker is draining before a hot-reload respawn. */
|
||||
draining: boolean;
|
||||
stderrTail: { value: string };
|
||||
};
|
||||
|
||||
// Crash respawn backoff: track crash timestamps per workflow.
|
||||
const MAX_CRASHES_IN_WINDOW = 5;
|
||||
const CRASH_WINDOW_MS = 60_000;
|
||||
|
||||
/**
|
||||
* Worker shutdown timeout — must stay in sync with SHUTDOWN_TIMEOUT_MS in workflow-worker.ts.
|
||||
* The drain timeout passed to drainAndRespawn must be >= this value so the worker has
|
||||
* enough time to finish in-flight threads before the parent force-kills it.
|
||||
*/
|
||||
const WORKER_SHUTDOWN_TIMEOUT_MS = 10_000;
|
||||
|
||||
const DEFAULT_MAX_QUEUE = 100;
|
||||
|
||||
function readLaunchFromTriggerPayload(
|
||||
raw: unknown,
|
||||
engineDefaultMaxRounds: number,
|
||||
): { prompt: string; maxRounds: number; dryRun: boolean } {
|
||||
if (isPlainRecord(raw)) {
|
||||
const o = raw;
|
||||
if (typeof o.prompt === "string" && typeof o.maxRounds === "number") {
|
||||
const dryRun = typeof o.dryRun === "boolean" ? o.dryRun : false;
|
||||
return { prompt: o.prompt, maxRounds: o.maxRounds, dryRun };
|
||||
}
|
||||
}
|
||||
return { prompt: "", maxRounds: engineDefaultMaxRounds, dryRun: false };
|
||||
}
|
||||
|
||||
function ensureThreadMessagesWithStart(
|
||||
messages: Array<{ role: string; content: string; meta: unknown; timestamp: number }>,
|
||||
threadId: string,
|
||||
fallbackPrompt: string,
|
||||
fallbackMaxRounds: number,
|
||||
): WorkflowMessage[] {
|
||||
const mapped: WorkflowMessage[] = messages.map((m) => ({
|
||||
role: m.role,
|
||||
content: m.content,
|
||||
meta: m.meta,
|
||||
timestamp: m.timestamp,
|
||||
}));
|
||||
if (mapped.length > 0 && mapped[0].role === START) {
|
||||
return mapped;
|
||||
}
|
||||
const start: WorkflowMessage = {
|
||||
role: START,
|
||||
content: fallbackPrompt,
|
||||
meta: { maxRounds: fallbackMaxRounds, threadId },
|
||||
timestamp: Date.now(),
|
||||
};
|
||||
return [start, ...mapped];
|
||||
}
|
||||
|
||||
function resolveWorkerScript(): string {
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dir = dirname(__filename);
|
||||
return join(__dir, "workflow-worker.js");
|
||||
}
|
||||
|
||||
function spawnWorkflowWorker(
|
||||
nerveRoot: string,
|
||||
workflowName: string,
|
||||
workerScript: string,
|
||||
stderrTail: { value: string },
|
||||
): ChildProcess {
|
||||
const child = fork(workerScript, ["--workflow", workflowName, "--root", nerveRoot], {
|
||||
stdio: ["ignore", "inherit", "pipe", "ipc"],
|
||||
});
|
||||
teeCapturedStderr(child, stderrTail);
|
||||
// Prevent unhandled EPIPE when writing to a child whose IPC channel closed
|
||||
child.on("error", (err) => {
|
||||
if ((err as NodeJS.ErrnoException).code !== "EPIPE") {
|
||||
console.error("[worker] error:", err.message);
|
||||
}
|
||||
});
|
||||
return child;
|
||||
}
|
||||
|
||||
function sendStartThread(worker: ChildProcess, msg: StartThreadMessage): void {
|
||||
if (worker.connected === false) return;
|
||||
try {
|
||||
worker.send(msg);
|
||||
} catch {
|
||||
// IPC channel closed between connected check and send
|
||||
}
|
||||
}
|
||||
|
||||
function sendShutdown(worker: ChildProcess, entry: WorkerEntry): void {
|
||||
entry.stopping = true;
|
||||
if (worker.connected === false) return;
|
||||
const msg: ShutdownMessage = { type: "shutdown" };
|
||||
try {
|
||||
worker.send(msg);
|
||||
} catch {
|
||||
// IPC channel closed between connected check and send
|
||||
}
|
||||
}
|
||||
|
||||
function sendResumeThread(worker: ChildProcess, msg: ResumeThreadMessage): void {
|
||||
if (worker.connected === false) return;
|
||||
try {
|
||||
worker.send(msg);
|
||||
} catch {
|
||||
// IPC channel closed between connected check and send
|
||||
}
|
||||
}
|
||||
|
||||
function sendKillThread(worker: ChildProcess, runId: string): void {
|
||||
if (worker.connected === false) return;
|
||||
const msg: KillThreadMessage = { type: "kill-thread", runId };
|
||||
try {
|
||||
worker.send(msg);
|
||||
} catch {
|
||||
// IPC channel closed between connected check and send
|
||||
}
|
||||
}
|
||||
|
||||
function waitForExit(child: ChildProcess, timeoutMs: number): Promise<void> {
|
||||
return new Promise((resolve) => {
|
||||
const timer = setTimeout(() => {
|
||||
child.kill("SIGKILL");
|
||||
resolve();
|
||||
}, timeoutMs);
|
||||
child.once("exit", () => {
|
||||
clearTimeout(timer);
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
export function createWorkflowManager(
|
||||
nerveRoot: string,
|
||||
initialConfig: NerveConfig,
|
||||
logStore: LogStore,
|
||||
): WorkflowManager {
|
||||
const workerScript = resolveWorkerScript();
|
||||
const workerScript = resolveWorkflowWorkerScript();
|
||||
|
||||
/**
|
||||
* Default drain timeout must be at least WORKER_SHUTDOWN_TIMEOUT_MS so the worker
|
||||
* has enough time to finish in-flight threads before the parent force-kills it.
|
||||
*/
|
||||
const DEFAULT_DRAIN_TIMEOUT_MS = Math.max(30_000, WORKER_SHUTDOWN_TIMEOUT_MS + 5_000);
|
||||
|
||||
const states = new Map<string, WorkflowState>();
|
||||
const workers = new Map<string, WorkerEntry>();
|
||||
const crashTimestamps = new Map<string, number[]>();
|
||||
const trackedWorkflows = new Set<string>();
|
||||
const hotReloadEvicting = new Set<string>();
|
||||
const crashRecoveryPending = new Set<string>();
|
||||
const crashLimitBlocked = new Set<string>();
|
||||
let stopped = false;
|
||||
let config = initialConfig;
|
||||
const pendingDrains = new Set<string>();
|
||||
|
||||
function logWorkflowEvent(
|
||||
workflowName: string,
|
||||
runId: string,
|
||||
eventType: string,
|
||||
payload: unknown | null = null,
|
||||
exitCode: number | null = null,
|
||||
): void {
|
||||
appendWorkflowRunLog(logStore, workflowName, runId, eventType, payload, exitCode);
|
||||
}
|
||||
|
||||
const runtime = createWorkerRuntime<string>({
|
||||
script: workerScript,
|
||||
argsForKey: (workflowName) => ["--workflow", workflowName, "--root", nerveRoot],
|
||||
forwardStderr: true,
|
||||
onMessage: (workflowName, raw) => {
|
||||
handleWorkerMessage(workflowName, raw);
|
||||
},
|
||||
onReady: (workflowName, _msg) => {
|
||||
if (crashRecoveryPending.has(workflowName)) {
|
||||
crashRecoveryPending.delete(workflowName);
|
||||
recoverThreadsFromStore(
|
||||
workflowName,
|
||||
logStore,
|
||||
config.maxRounds,
|
||||
getOrCreateState,
|
||||
(wf, msg) => {
|
||||
sendToWorker(wf, msg);
|
||||
},
|
||||
);
|
||||
}
|
||||
},
|
||||
onExit: (workflowName, code, signalStr) => {
|
||||
const sig =
|
||||
signalStr === null || signalStr === undefined || signalStr === ""
|
||||
? null
|
||||
: (signalStr as NodeJS.Signals);
|
||||
|
||||
if (hotReloadEvicting.has(workflowName)) {
|
||||
hotReloadEvicting.delete(workflowName);
|
||||
markActiveRunsInterrupted(workflowName);
|
||||
if (!stopped && workflowConfig(workflowName) !== null) {
|
||||
process.stderr.write(
|
||||
`[workflow-manager] worker for "${workflowName}" drained, respawning\n`,
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (stopped) {
|
||||
const state = states.get(workflowName);
|
||||
if (state !== undefined) {
|
||||
state.active.clear();
|
||||
}
|
||||
crashRecoveryPending.delete(workflowName);
|
||||
return;
|
||||
}
|
||||
|
||||
const summary = formatChildExitSummary(code, sig);
|
||||
const stderrExtra = formatCapturedStderrTail(runtime.stderrTail(workflowName));
|
||||
process.stderr.write(
|
||||
`[workflow-manager] worker for "${workflowName}" exited (${summary})${stderrExtra}\n`,
|
||||
);
|
||||
|
||||
cleanupAfterUnexpectedWorkerExit(workflowName);
|
||||
crashRecoveryPending.add(workflowName);
|
||||
},
|
||||
onCrashLimitReached: (workflowName) => {
|
||||
crashRecoveryPending.delete(workflowName);
|
||||
trackedWorkflows.delete(workflowName);
|
||||
crashLimitBlocked.add(workflowName);
|
||||
process.stderr.write(
|
||||
`[workflow-manager] worker for "${workflowName}" exceeded crash limit (${String(WORKFLOW_WORKER_RESPAWN.maxCrashes)} in ${String(WORKFLOW_WORKER_RESPAWN.windowMs)}ms) — stopping respawn\n`,
|
||||
);
|
||||
},
|
||||
respawn: {
|
||||
...WORKFLOW_WORKER_RESPAWN,
|
||||
allowRespawn: (_wf) => !stopped,
|
||||
},
|
||||
shutdownTimeoutMs: DEFAULT_DRAIN_TIMEOUT_MS,
|
||||
});
|
||||
|
||||
function getOrCreateState(workflowName: string): WorkflowState {
|
||||
let state = states.get(workflowName);
|
||||
if (state === undefined) {
|
||||
@@ -250,60 +181,38 @@ export function createWorkflowManager(
|
||||
return config.workflows[workflowName] ?? null;
|
||||
}
|
||||
|
||||
function toWorkflowRunStatus(eventType: string): WorkflowRunStatus | null {
|
||||
const map: Record<string, WorkflowRunStatus> = {
|
||||
started: "started",
|
||||
queued: "queued",
|
||||
completed: "completed",
|
||||
failed: "failed",
|
||||
crashed: "crashed",
|
||||
dropped: "dropped",
|
||||
interrupted: "interrupted",
|
||||
killed: "killed",
|
||||
};
|
||||
return map[eventType] ?? null;
|
||||
}
|
||||
|
||||
function extractExitCode(payload: unknown): number | null {
|
||||
if (isPlainRecord(payload) && typeof payload.exitCode === "number") {
|
||||
return payload.exitCode;
|
||||
/** IPC send — matches legacy pool: no-op when IPC is disconnected; cold-start via WorkerRuntime.send. */
|
||||
function sendToWorker(workflowName: string, msg: unknown): void {
|
||||
if (crashLimitBlocked.has(workflowName)) {
|
||||
return;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function logWorkflowEvent(
|
||||
workflowName: string,
|
||||
runId: string,
|
||||
eventType: string,
|
||||
payload?: unknown,
|
||||
exitCode: number | null = null,
|
||||
): void {
|
||||
const timestamp = Date.now();
|
||||
const serialised = payload !== undefined ? JSON.stringify(payload) : null;
|
||||
const status = toWorkflowRunStatus(eventType);
|
||||
|
||||
if (status !== null) {
|
||||
logStore.upsertWorkflowRun(
|
||||
{
|
||||
source: "workflow",
|
||||
type: eventType,
|
||||
refId: runId,
|
||||
payload: serialised,
|
||||
timestamp,
|
||||
},
|
||||
{ runId, workflow: workflowName, status, timestamp, exitCode },
|
||||
);
|
||||
} else {
|
||||
logStore.append({
|
||||
source: "workflow",
|
||||
type: eventType,
|
||||
refId: runId,
|
||||
payload: serialised,
|
||||
timestamp,
|
||||
trackedWorkflows.add(workflowName);
|
||||
if (runtime.hasDisconnectedChild(workflowName)) {
|
||||
return;
|
||||
}
|
||||
if (!runtime.trySendSync(workflowName, msg)) {
|
||||
void runtime.send(workflowName, msg).catch(() => {
|
||||
// IPC channel closed — mark any thread from this message as failed
|
||||
if (isStartThreadMsg(msg)) {
|
||||
const state = states.get(workflowName);
|
||||
if (state?.active.has(msg.runId)) {
|
||||
state.active.delete(msg.runId);
|
||||
logWorkflowEvent(workflowName, msg.runId, "failed", { error: "IPC channel closed" }, 1);
|
||||
dequeueNext(workflowName);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function isStartThreadMsg(msg: unknown): msg is StartThreadMessage {
|
||||
return (
|
||||
msg !== null &&
|
||||
typeof msg === "object" &&
|
||||
(msg as Record<string, unknown>).type === "start-thread"
|
||||
);
|
||||
}
|
||||
|
||||
function dispatchThread(
|
||||
workflowName: string,
|
||||
runId: string,
|
||||
@@ -314,7 +223,6 @@ export function createWorkflowManager(
|
||||
const state = getOrCreateState(workflowName);
|
||||
state.active.add(runId);
|
||||
|
||||
const worker = getOrSpawnWorker(workflowName);
|
||||
const msg: StartThreadMessage = {
|
||||
type: "start-thread",
|
||||
runId,
|
||||
@@ -323,7 +231,7 @@ export function createWorkflowManager(
|
||||
maxRounds,
|
||||
dryRun,
|
||||
};
|
||||
sendStartThread(worker.process, msg);
|
||||
sendToWorker(workflowName, msg);
|
||||
logWorkflowEvent(workflowName, runId, "started", { prompt, maxRounds, dryRun });
|
||||
}
|
||||
|
||||
@@ -367,92 +275,20 @@ export function createWorkflowManager(
|
||||
if (msg.eventType === "completed" || msg.eventType === "failed" || msg.eventType === "killed") {
|
||||
state.active.delete(msg.runId);
|
||||
dequeueNext(workflowName);
|
||||
const exitCode = extractExitCode(msg.payload);
|
||||
const exitCode = extractExitCodeFromPayload(msg.payload);
|
||||
logWorkflowEvent(workflowName, msg.runId, msg.eventType, msg.payload, exitCode);
|
||||
maybeDeferredHotReloadDrain(workflowName);
|
||||
}
|
||||
}
|
||||
|
||||
function recoverQueuedRun(workflowName: string, runId: string, state: WorkflowState): void {
|
||||
if (state.queue.some((q) => q.runId === runId)) return;
|
||||
const launch = readLaunchFromTriggerPayload(
|
||||
logStore.getTriggerPayload(runId),
|
||||
config.maxRounds,
|
||||
);
|
||||
state.queue.push({
|
||||
runId,
|
||||
prompt: launch.prompt,
|
||||
maxRounds: launch.maxRounds,
|
||||
dryRun: launch.dryRun,
|
||||
});
|
||||
process.stderr.write(
|
||||
`[workflow-manager] crash-recovery: re-queued thread "${runId}" for "${workflowName}"\n`,
|
||||
);
|
||||
}
|
||||
|
||||
function recoverStartedRun(
|
||||
workflowName: string,
|
||||
runId: string,
|
||||
state: WorkflowState,
|
||||
worker: WorkerEntry,
|
||||
): void {
|
||||
if (state.active.has(runId)) return;
|
||||
const rawMessages = logStore.getThreadMessages(runId);
|
||||
const launch = readLaunchFromTriggerPayload(
|
||||
logStore.getTriggerPayload(runId),
|
||||
config.maxRounds,
|
||||
);
|
||||
const messages = ensureThreadMessagesWithStart(
|
||||
rawMessages,
|
||||
runId,
|
||||
launch.prompt,
|
||||
launch.maxRounds,
|
||||
);
|
||||
state.active.add(runId);
|
||||
const msg: ResumeThreadMessage = {
|
||||
type: "resume-thread",
|
||||
runId,
|
||||
messages,
|
||||
maxRounds: launch.maxRounds,
|
||||
dryRun: launch.dryRun,
|
||||
};
|
||||
sendResumeThread(worker.process, msg);
|
||||
process.stderr.write(
|
||||
`[workflow-manager] crash-recovery: resuming thread "${runId}" for "${workflowName}" (${messages.length} messages)\n`,
|
||||
);
|
||||
}
|
||||
|
||||
function recoverThreadsForWorker(workflowName: string, worker: WorkerEntry): void {
|
||||
const activeRuns = logStore.getActiveWorkflowRuns(workflowName);
|
||||
const state = getOrCreateState(workflowName);
|
||||
|
||||
for (const run of activeRuns) {
|
||||
if (run.status === "queued") {
|
||||
recoverQueuedRun(workflowName, run.runId, state);
|
||||
} else if (run.status === "started") {
|
||||
recoverStartedRun(workflowName, run.runId, state, worker);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function recordCrashAndCheckLimit(workflowName: string): boolean {
|
||||
const now = Date.now();
|
||||
const timestamps = (crashTimestamps.get(workflowName) ?? []).filter(
|
||||
(t) => now - t < CRASH_WINDOW_MS,
|
||||
);
|
||||
timestamps.push(now);
|
||||
crashTimestamps.set(workflowName, timestamps);
|
||||
return timestamps.length > MAX_CRASHES_IN_WINDOW;
|
||||
}
|
||||
|
||||
function handleWorkerCrash(workflowName: string): void {
|
||||
function cleanupAfterUnexpectedWorkerExit(workflowName: string): void {
|
||||
const state = states.get(workflowName);
|
||||
if (state === undefined) return;
|
||||
|
||||
const crashedCount = state.active.size;
|
||||
if (crashedCount > 0) {
|
||||
process.stderr.write(
|
||||
`[workflow-manager] worker for "${workflowName}" crashed with ${crashedCount} active thread(s)\n`,
|
||||
`[workflow-manager] worker for "${workflowName}" crashed with ${String(crashedCount)} active thread(s)\n`,
|
||||
);
|
||||
for (const runId of state.active) {
|
||||
logWorkflowEvent(workflowName, runId, "crashed", undefined, 255);
|
||||
@@ -460,26 +296,13 @@ export function createWorkflowManager(
|
||||
}
|
||||
|
||||
state.active.clear();
|
||||
workers.delete(workflowName);
|
||||
pendingDrains.delete(workflowName);
|
||||
|
||||
if (stopped || workflowConfig(workflowName) === null) return;
|
||||
|
||||
if (recordCrashAndCheckLimit(workflowName)) {
|
||||
const count = crashTimestamps.get(workflowName)?.length ?? 0;
|
||||
if (!stopped && !crashLimitBlocked.has(workflowName) && workflowConfig(workflowName) !== null) {
|
||||
process.stderr.write(
|
||||
`[workflow-manager] worker for "${workflowName}" crashed ${count} times in ${CRASH_WINDOW_MS}ms — stopping respawn\n`,
|
||||
`[workflow-manager] respawning worker for "${workflowName}" after crash\n`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
process.stderr.write(
|
||||
`[workflow-manager] respawning worker for "${workflowName}" after crash\n`,
|
||||
);
|
||||
const newWorker = getOrSpawnWorker(workflowName);
|
||||
setImmediate(() => {
|
||||
recoverThreadsForWorker(workflowName, newWorker);
|
||||
});
|
||||
}
|
||||
|
||||
function handleWorkerMessage(workflowName: string, raw: unknown): void {
|
||||
@@ -490,43 +313,19 @@ export function createWorkflowManager(
|
||||
);
|
||||
return;
|
||||
}
|
||||
const msg = result.value;
|
||||
|
||||
if (msg.type === "thread-event") {
|
||||
handleThreadEvent(workflowName, msg);
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.type === "thread-workflow-message") {
|
||||
logStore.append({
|
||||
source: "workflow",
|
||||
type: "thread_workflow_message",
|
||||
refId: msg.runId,
|
||||
payload: JSON.stringify(msg.message),
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.type === "workflow-error") {
|
||||
process.stderr.write(
|
||||
`[workflow-manager] workflow-error for runId "${msg.runId}" in "${workflowName}": ${msg.error}\n`,
|
||||
);
|
||||
const state = states.get(workflowName);
|
||||
if (state !== undefined) {
|
||||
state.active.delete(msg.runId);
|
||||
dequeueNext(workflowName);
|
||||
}
|
||||
logWorkflowEvent(workflowName, msg.runId, "failed", { error: msg.error }, msg.exitCode);
|
||||
maybeDeferredHotReloadDrain(workflowName);
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.type === "error") {
|
||||
process.stderr.write(
|
||||
`[workflow-manager] error from "${workflowName}" worker: ${msg.error}\n`,
|
||||
);
|
||||
}
|
||||
dispatchWorkflowWorkerMessage(workflowName, result.value, {
|
||||
logStore,
|
||||
handleThreadEvent,
|
||||
onWorkflowRoleError: (wf, runId, error, exitCode) => {
|
||||
const state = states.get(wf);
|
||||
if (state !== undefined) {
|
||||
state.active.delete(runId);
|
||||
dequeueNext(wf);
|
||||
}
|
||||
logWorkflowEvent(wf, runId, "failed", { error }, exitCode);
|
||||
maybeDeferredHotReloadDrain(wf);
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
function markActiveRunsInterrupted(workflowName: string): void {
|
||||
@@ -538,67 +337,6 @@ export function createWorkflowManager(
|
||||
state.active.clear();
|
||||
}
|
||||
|
||||
function handleWorkerExit(
|
||||
workflowName: string,
|
||||
code: number | null,
|
||||
signal: NodeJS.Signals | null,
|
||||
): void {
|
||||
const entry = workers.get(workflowName);
|
||||
if (entry?.draining) {
|
||||
workers.delete(workflowName);
|
||||
markActiveRunsInterrupted(workflowName);
|
||||
if (!stopped && workflowConfig(workflowName) !== null) {
|
||||
process.stderr.write(
|
||||
`[workflow-manager] worker for "${workflowName}" drained, respawning\n`,
|
||||
);
|
||||
getOrSpawnWorker(workflowName);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (entry?.stopping) {
|
||||
workers.delete(workflowName);
|
||||
const state = states.get(workflowName);
|
||||
if (state !== undefined) {
|
||||
state.active.clear();
|
||||
}
|
||||
return;
|
||||
}
|
||||
const summary = formatChildExitSummary(code, signal);
|
||||
const stderrExtra = entry !== undefined ? formatCapturedStderrTail(entry.stderrTail.value) : "";
|
||||
process.stderr.write(
|
||||
`[workflow-manager] worker for "${workflowName}" exited (${summary})${stderrExtra}\n`,
|
||||
);
|
||||
handleWorkerCrash(workflowName);
|
||||
}
|
||||
|
||||
function getOrSpawnWorker(workflowName: string): WorkerEntry {
|
||||
const existing = workers.get(workflowName);
|
||||
if (existing !== undefined && existing.process.exitCode === null) {
|
||||
return existing;
|
||||
}
|
||||
|
||||
const stderrTail = { value: "" };
|
||||
const child = spawnWorkflowWorker(nerveRoot, workflowName, workerScript, stderrTail);
|
||||
|
||||
child.on("message", (raw: unknown) => {
|
||||
handleWorkerMessage(workflowName, raw);
|
||||
});
|
||||
|
||||
child.on("exit", (code, signal) => {
|
||||
handleWorkerExit(workflowName, code, signal ?? null);
|
||||
});
|
||||
|
||||
const entry: WorkerEntry = {
|
||||
workflowName,
|
||||
process: child,
|
||||
stopping: false,
|
||||
draining: false,
|
||||
stderrTail,
|
||||
};
|
||||
workers.set(workflowName, entry);
|
||||
return entry;
|
||||
}
|
||||
|
||||
function killThread(runId: string): boolean {
|
||||
for (const [workflowName, state] of states) {
|
||||
const queueIdx = state.queue.findIndex((q) => q.runId === runId);
|
||||
@@ -609,10 +347,8 @@ export function createWorkflowManager(
|
||||
}
|
||||
|
||||
if (state.active.has(runId)) {
|
||||
const workerEntry = workers.get(workflowName);
|
||||
if (workerEntry !== undefined) {
|
||||
sendKillThread(workerEntry.process, runId);
|
||||
}
|
||||
const msg: KillThreadMessage = { type: "kill-thread", runId };
|
||||
sendToWorker(workflowName, msg);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -663,7 +399,7 @@ export function createWorkflowManager(
|
||||
state.queue.push({ runId, prompt, maxRounds, dryRun });
|
||||
logWorkflowEvent(workflowName, runId, "queued");
|
||||
process.stderr.write(
|
||||
`[workflow-manager] queued thread for "${workflowName}" runId "${runId}" (queue length: ${state.queue.length})\n`,
|
||||
`[workflow-manager] queued thread for "${workflowName}" runId "${runId}" (queue length: ${String(state.queue.length)})\n`,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -707,35 +443,29 @@ export function createWorkflowManager(
|
||||
config = newConfig;
|
||||
}
|
||||
|
||||
/**
|
||||
* Default drain timeout must be at least WORKER_SHUTDOWN_TIMEOUT_MS so the worker
|
||||
* has enough time to finish in-flight threads before the parent force-kills it.
|
||||
*/
|
||||
const DEFAULT_DRAIN_TIMEOUT_MS = Math.max(30_000, WORKER_SHUTDOWN_TIMEOUT_MS + 5_000);
|
||||
|
||||
async function drainAndRespawn(
|
||||
workflowName: string,
|
||||
drainTimeoutMs: number = DEFAULT_DRAIN_TIMEOUT_MS,
|
||||
): Promise<void> {
|
||||
const entry = workers.get(workflowName);
|
||||
if (entry === undefined) {
|
||||
// No active worker — nothing to drain
|
||||
if (!trackedWorkflows.has(workflowName)) {
|
||||
return;
|
||||
}
|
||||
|
||||
entry.draining = true;
|
||||
// Send shutdown without setting stopping=true (so the exit handler uses the draining branch)
|
||||
if (entry.process.connected) {
|
||||
const msg: ShutdownMessage = { type: "shutdown" };
|
||||
try {
|
||||
entry.process.send(msg);
|
||||
} catch {
|
||||
// IPC closed
|
||||
const shutdownMs = Math.max(drainTimeoutMs, WORKER_SHUTDOWN_TIMEOUT_MS);
|
||||
hotReloadEvicting.add(workflowName);
|
||||
try {
|
||||
await runtime.evict(workflowName, { shutdownTimeoutMs: shutdownMs });
|
||||
trackedWorkflows.delete(workflowName);
|
||||
|
||||
if (!stopped && workflowConfig(workflowName) !== null) {
|
||||
trackedWorkflows.add(workflowName);
|
||||
await runtime.start(workflowName);
|
||||
}
|
||||
} finally {
|
||||
hotReloadEvicting.delete(workflowName);
|
||||
}
|
||||
await waitForExit(entry.process, drainTimeoutMs);
|
||||
// The exit handler (draining branch) will respawn the worker automatically
|
||||
}
|
||||
|
||||
function drainWhenIdle(workflowName: string): void {
|
||||
const state = states.get(workflowName);
|
||||
const hasActiveRuns = state !== undefined && state.active.size > 0;
|
||||
@@ -761,20 +491,17 @@ export function createWorkflowManager(
|
||||
|
||||
pendingDrains.add(workflowName);
|
||||
process.stderr.write(
|
||||
`[workflow-manager] deferring hot-reload for "${workflowName}" until ${state.active.size} active run(s) complete\n`,
|
||||
`[workflow-manager] deferring hot-reload for "${workflowName}" until ${String(state.active.size)} active run(s) complete\n`,
|
||||
);
|
||||
}
|
||||
|
||||
async function stop(): Promise<void> {
|
||||
stopped = true;
|
||||
pendingDrains.clear();
|
||||
const exitPromises: Promise<void>[] = [];
|
||||
for (const entry of workers.values()) {
|
||||
sendShutdown(entry.process, entry);
|
||||
exitPromises.push(waitForExit(entry.process, 5000));
|
||||
}
|
||||
await Promise.all(exitPromises);
|
||||
workers.clear();
|
||||
hotReloadEvicting.clear();
|
||||
crashRecoveryPending.clear();
|
||||
await runtime.shutdown();
|
||||
trackedWorkflows.clear();
|
||||
}
|
||||
|
||||
return {
|
||||
|
||||
Reference in New Issue
Block a user