fix(daemon): defer hot-reload drain until in-flight runs complete #135
@@ -31,6 +31,7 @@ function makeMockWorkflowManager() {
|
||||
stop: vi.fn(async () => {}),
|
||||
totalActiveCount: vi.fn(() => 0),
|
||||
drainAndRespawn: vi.fn(async () => {}),
|
||||
drainWhenIdle: vi.fn(),
|
||||
updateConfig: vi.fn(),
|
||||
getActiveWorkflowRuns: vi.fn(() => []),
|
||||
};
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
*
|
||||
* Verifies that:
|
||||
* - drainAndRespawn() sends shutdown, waits for exit, then respawns the worker
|
||||
* - drainWhenIdle() defers drain+respawn until in-flight threads finish
|
||||
* - Kernel dispatches handleWorkflowFileChange when file-watcher emits a workflow change
|
||||
* - Kernel logs a workflow_reload system event on hot reload
|
||||
* - drainAndRespawn on a non-existent worker is a no-op
|
||||
@@ -238,6 +239,199 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe("WorkflowManager — drainWhenIdle (hot reload without interrupting in-flight)", () => {
|
||||
beforeEach(() => {
|
||||
mockChildren.length = 0;
|
||||
vi.useFakeTimers({ shouldAdvanceTime: true });
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
it("does not send shutdown while a thread is still active", () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } });
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
const child = mockChildren[0];
|
||||
|
||||
mgr.drainWhenIdle("my-wf");
|
||||
|
||||
const shutdownCalls = (child.send as ReturnType<typeof vi.fn>).mock.calls.filter(
|
||||
(args: unknown[]) =>
|
||||
args[0] !== null &&
|
||||
typeof args[0] === "object" &&
|
||||
(args[0] as Record<string, unknown>).type === "shutdown",
|
||||
);
|
||||
expect(shutdownCalls).toHaveLength(0);
|
||||
});
|
||||
|
||||
it("sends shutdown after the last active thread completes (deferred drain)", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } });
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
const child = mockChildren[0];
|
||||
const runId = (child.send as ReturnType<typeof vi.fn>).mock.calls[0][0] as { runId: string };
|
||||
|
||||
mgr.drainWhenIdle("my-wf");
|
||||
|
||||
child.emit("message", {
|
||||
type: "thread-event",
|
||||
runId: runId.runId,
|
||||
eventType: "completed",
|
||||
payload: null,
|
||||
});
|
||||
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
expect(child.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" }));
|
||||
expect(mockChildren).toHaveLength(2);
|
||||
|
||||
const stopPromise = mgr.stop();
|
||||
await vi.runAllTimersAsync();
|
||||
await stopPromise;
|
||||
});
|
||||
|
||||
it("waits for all concurrent threads before draining", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeWfConfig({ "my-wf": { concurrency: 2, overflow: "drop" } });
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "a", maxRounds: 10, dryRun: false });
|
||||
mgr.startWorkflow("my-wf", { prompt: "b", maxRounds: 10, dryRun: false });
|
||||
const child = mockChildren[0];
|
||||
const sendMock = child.send as ReturnType<typeof vi.fn>;
|
||||
const runIdA = (sendMock.mock.calls[0][0] as { runId: string }).runId;
|
||||
const runIdB = (sendMock.mock.calls[1][0] as { runId: string }).runId;
|
||||
|
||||
mgr.drainWhenIdle("my-wf");
|
||||
|
||||
child.emit("message", {
|
||||
type: "thread-event",
|
||||
runId: runIdA,
|
||||
eventType: "completed",
|
||||
payload: null,
|
||||
});
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
const shutdownBefore = sendMock.mock.calls.filter(
|
||||
(args: unknown[]) =>
|
||||
args[0] !== null &&
|
||||
typeof args[0] === "object" &&
|
||||
(args[0] as Record<string, unknown>).type === "shutdown",
|
||||
);
|
||||
expect(shutdownBefore).toHaveLength(0);
|
||||
|
||||
child.emit("message", {
|
||||
type: "thread-event",
|
||||
runId: runIdB,
|
||||
eventType: "completed",
|
||||
payload: null,
|
||||
});
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
expect(child.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" }));
|
||||
|
||||
const stopPromise = mgr.stop();
|
||||
await vi.runAllTimersAsync();
|
||||
await stopPromise;
|
||||
});
|
||||
|
||||
it("duplicate drainWhenIdle while busy only schedules one deferred drain", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } });
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
const child = mockChildren[0];
|
||||
const runId = (child.send as ReturnType<typeof vi.fn>).mock.calls[0][0] as { runId: string };
|
||||
|
||||
mgr.drainWhenIdle("my-wf");
|
||||
mgr.drainWhenIdle("my-wf");
|
||||
|
||||
child.emit("message", {
|
||||
type: "thread-event",
|
||||
runId: runId.runId,
|
||||
eventType: "completed",
|
||||
payload: null,
|
||||
});
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
const shutdownCalls = (child.send as ReturnType<typeof vi.fn>).mock.calls.filter(
|
||||
(args: unknown[]) =>
|
||||
args[0] !== null &&
|
||||
typeof args[0] === "object" &&
|
||||
(args[0] as Record<string, unknown>).type === "shutdown",
|
||||
);
|
||||
expect(shutdownCalls).toHaveLength(1);
|
||||
|
||||
const stopPromise = mgr.stop();
|
||||
await vi.runAllTimersAsync();
|
||||
await stopPromise;
|
||||
});
|
||||
|
||||
it("deferred drain runs after workflow-error clears the active thread", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } });
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
const child = mockChildren[0];
|
||||
const runId = (child.send as ReturnType<typeof vi.fn>).mock.calls[0][0] as { runId: string };
|
||||
|
||||
mgr.drainWhenIdle("my-wf");
|
||||
|
||||
child.emit("message", {
|
||||
type: "workflow-error",
|
||||
runId: runId.runId,
|
||||
error: "role failed",
|
||||
exitCode: 1,
|
||||
});
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
expect(child.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" }));
|
||||
|
||||
const stopPromise = mgr.stop();
|
||||
await vi.runAllTimersAsync();
|
||||
await stopPromise;
|
||||
});
|
||||
|
||||
it("drains immediately when there is no in-flight thread", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } });
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "once", maxRounds: 10, dryRun: false });
|
||||
const firstChild = mockChildren[0];
|
||||
const runId = (firstChild.send as ReturnType<typeof vi.fn>).mock.calls[0][0] as {
|
||||
runId: string;
|
||||
};
|
||||
|
||||
firstChild.emit("message", {
|
||||
type: "thread-event",
|
||||
runId: runId.runId,
|
||||
eventType: "completed",
|
||||
payload: null,
|
||||
});
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
mgr.drainWhenIdle("my-wf");
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
expect(firstChild.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" }));
|
||||
expect(mockChildren).toHaveLength(2);
|
||||
|
||||
const stopPromise = mgr.stop();
|
||||
await vi.runAllTimersAsync();
|
||||
await stopPromise;
|
||||
});
|
||||
});
|
||||
|
||||
describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
|
||||
let nerveRoot: string;
|
||||
|
||||
|
||||
@@ -57,10 +57,7 @@ export function createKernelFileWatchHandlers(deps: KernelFileWatchDeps): Kernel
|
||||
payload: null,
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
deps.workflowManager.drainAndRespawn(workflowName).catch((e) => {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`[kernel] drainAndRespawn error for "${workflowName}": ${msg}\n`);
|
||||
});
|
||||
deps.workflowManager.drainWhenIdle(workflowName);
|
||||
}
|
||||
|
||||
function onConfigFileChange(): void {
|
||||
|
||||
@@ -57,6 +57,12 @@ export type WorkflowManager = {
|
||||
* Waits up to `drainTimeoutMs` for threads to complete before force-killing.
|
||||
*/
|
||||
drainAndRespawn: (workflowName: string, drainTimeoutMs?: number) => Promise<void>;
|
||||
/**
|
||||
* Schedule a drain+respawn that waits for in-flight runs to finish first.
|
||||
* If no runs are active, drains immediately. Otherwise marks a pending reload
|
||||
* and drains automatically when the last active run completes.
|
||||
*/
|
||||
drainWhenIdle: (workflowName: string) => void;
|
||||
/** Gracefully shut down all workflow workers. */
|
||||
stop: () => Promise<void>;
|
||||
};
|
||||
@@ -222,6 +228,7 @@ export function createWorkflowManager(
|
||||
const crashTimestamps = new Map<string, number[]>();
|
||||
let stopped = false;
|
||||
let config = initialConfig;
|
||||
const pendingDrains = new Set<string>();
|
||||
|
||||
function getOrCreateState(workflowName: string): WorkflowState {
|
||||
let state = states.get(workflowName);
|
||||
@@ -328,6 +335,24 @@ export function createWorkflowManager(
|
||||
}
|
||||
}
|
||||
|
||||
/** If a hot-reload was deferred, run drain+respawn once there are no active threads. */
|
||||
function maybeDeferredHotReloadDrain(workflowName: string): void {
|
||||
if (!pendingDrains.has(workflowName)) return;
|
||||
const state = states.get(workflowName);
|
||||
if (state === undefined || state.active.size !== 0) return;
|
||||
|
||||
pendingDrains.delete(workflowName);
|
||||
process.stderr.write(
|
||||
`[workflow-manager] all runs complete for "${workflowName}", executing deferred hot-reload drain\n`,
|
||||
);
|
||||
drainAndRespawn(workflowName).catch((e) => {
|
||||
const errMsg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(
|
||||
`[workflow-manager] deferred drainAndRespawn error for "${workflowName}": ${errMsg}\n`,
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
function handleThreadEvent(workflowName: string, msg: ThreadEventMessage): void {
|
||||
const state = states.get(workflowName);
|
||||
if (state === undefined) return;
|
||||
@@ -337,6 +362,7 @@ export function createWorkflowManager(
|
||||
dequeueNext(workflowName);
|
||||
const exitCode = extractExitCode(msg.payload);
|
||||
logWorkflowEvent(workflowName, msg.runId, msg.eventType, msg.payload, exitCode);
|
||||
maybeDeferredHotReloadDrain(workflowName);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -428,6 +454,7 @@ export function createWorkflowManager(
|
||||
|
||||
state.active.clear();
|
||||
workers.delete(workflowName);
|
||||
pendingDrains.delete(workflowName);
|
||||
|
||||
if (stopped || workflowConfig(workflowName) === null) return;
|
||||
|
||||
@@ -484,6 +511,7 @@ export function createWorkflowManager(
|
||||
dequeueNext(workflowName);
|
||||
}
|
||||
logWorkflowEvent(workflowName, msg.runId, "failed", { error: msg.error }, msg.exitCode);
|
||||
maybeDeferredHotReloadDrain(workflowName);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -681,9 +709,38 @@ export function createWorkflowManager(
|
||||
await waitForExit(entry.process, drainTimeoutMs);
|
||||
// The exit handler (draining branch) will respawn the worker automatically
|
||||
}
|
||||
function drainWhenIdle(workflowName: string): void {
|
||||
const state = states.get(workflowName);
|
||||
const hasActiveRuns = state !== undefined && state.active.size > 0;
|
||||
|
||||
if (!hasActiveRuns) {
|
||||
pendingDrains.delete(workflowName);
|
||||
drainAndRespawn(workflowName).catch((e) => {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(
|
||||
`[workflow-manager] drainAndRespawn error for "${workflowName}": ${msg}\n`,
|
||||
);
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
// Defer until all active runs finish
|
||||
if (pendingDrains.has(workflowName)) {
|
||||
process.stderr.write(
|
||||
`[workflow-manager] hot-reload already pending for "${workflowName}", skipping duplicate\n`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
pendingDrains.add(workflowName);
|
||||
process.stderr.write(
|
||||
`[workflow-manager] deferring hot-reload for "${workflowName}" until ${state.active.size} active run(s) complete\n`,
|
||||
);
|
||||
}
|
||||
|
||||
async function stop(): Promise<void> {
|
||||
stopped = true;
|
||||
pendingDrains.clear();
|
||||
const exitPromises: Promise<void>[] = [];
|
||||
for (const entry of workers.values()) {
|
||||
sendShutdown(entry.process, entry);
|
||||
@@ -701,6 +758,7 @@ export function createWorkflowManager(
|
||||
totalActiveCount,
|
||||
updateConfig,
|
||||
drainAndRespawn,
|
||||
drainWhenIdle,
|
||||
stop,
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user