fix(daemon): defer hot-reload drain until in-flight runs complete #135

Merged
xiaomo merged 1 commits from fix/134-hot-reload-in-flight into main 2026-04-25 05:38:53 +00:00
4 changed files with 254 additions and 4 deletions
@@ -31,6 +31,7 @@ function makeMockWorkflowManager() {
stop: vi.fn(async () => {}),
totalActiveCount: vi.fn(() => 0),
drainAndRespawn: vi.fn(async () => {}),
drainWhenIdle: vi.fn(),
updateConfig: vi.fn(),
getActiveWorkflowRuns: vi.fn(() => []),
};
@@ -3,6 +3,7 @@
*
* Verifies that:
* - drainAndRespawn() sends shutdown, waits for exit, then respawns the worker
* - drainWhenIdle() defers drain+respawn until in-flight threads finish
* - Kernel dispatches handleWorkflowFileChange when file-watcher emits a workflow change
* - Kernel logs a workflow_reload system event on hot reload
* - drainAndRespawn on a non-existent worker is a no-op
@@ -238,6 +239,199 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => {
});
});
describe("WorkflowManager — drainWhenIdle (hot reload without interrupting in-flight)", () => {
beforeEach(() => {
mockChildren.length = 0;
vi.useFakeTimers({ shouldAdvanceTime: true });
});
afterEach(() => {
vi.useRealTimers();
vi.clearAllMocks();
});
it("does not send shutdown while a thread is still active", () => {
const logStore = makeLogStore();
const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } });
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
const child = mockChildren[0];
mgr.drainWhenIdle("my-wf");
const shutdownCalls = (child.send as ReturnType<typeof vi.fn>).mock.calls.filter(
(args: unknown[]) =>
args[0] !== null &&
typeof args[0] === "object" &&
(args[0] as Record<string, unknown>).type === "shutdown",
);
expect(shutdownCalls).toHaveLength(0);
});
it("sends shutdown after the last active thread completes (deferred drain)", async () => {
const logStore = makeLogStore();
const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } });
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
const child = mockChildren[0];
const runId = (child.send as ReturnType<typeof vi.fn>).mock.calls[0][0] as { runId: string };
mgr.drainWhenIdle("my-wf");
child.emit("message", {
type: "thread-event",
runId: runId.runId,
eventType: "completed",
payload: null,
});
await vi.runAllTimersAsync();
expect(child.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" }));
expect(mockChildren).toHaveLength(2);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
it("waits for all concurrent threads before draining", async () => {
const logStore = makeLogStore();
const config = makeWfConfig({ "my-wf": { concurrency: 2, overflow: "drop" } });
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", { prompt: "a", maxRounds: 10, dryRun: false });
mgr.startWorkflow("my-wf", { prompt: "b", maxRounds: 10, dryRun: false });
const child = mockChildren[0];
const sendMock = child.send as ReturnType<typeof vi.fn>;
const runIdA = (sendMock.mock.calls[0][0] as { runId: string }).runId;
const runIdB = (sendMock.mock.calls[1][0] as { runId: string }).runId;
mgr.drainWhenIdle("my-wf");
child.emit("message", {
type: "thread-event",
runId: runIdA,
eventType: "completed",
payload: null,
});
await vi.runAllTimersAsync();
const shutdownBefore = sendMock.mock.calls.filter(
(args: unknown[]) =>
args[0] !== null &&
typeof args[0] === "object" &&
(args[0] as Record<string, unknown>).type === "shutdown",
);
expect(shutdownBefore).toHaveLength(0);
child.emit("message", {
type: "thread-event",
runId: runIdB,
eventType: "completed",
payload: null,
});
await vi.runAllTimersAsync();
expect(child.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" }));
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
it("duplicate drainWhenIdle while busy only schedules one deferred drain", async () => {
const logStore = makeLogStore();
const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } });
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
const child = mockChildren[0];
const runId = (child.send as ReturnType<typeof vi.fn>).mock.calls[0][0] as { runId: string };
mgr.drainWhenIdle("my-wf");
mgr.drainWhenIdle("my-wf");
child.emit("message", {
type: "thread-event",
runId: runId.runId,
eventType: "completed",
payload: null,
});
await vi.runAllTimersAsync();
const shutdownCalls = (child.send as ReturnType<typeof vi.fn>).mock.calls.filter(
(args: unknown[]) =>
args[0] !== null &&
typeof args[0] === "object" &&
(args[0] as Record<string, unknown>).type === "shutdown",
);
expect(shutdownCalls).toHaveLength(1);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
it("deferred drain runs after workflow-error clears the active thread", async () => {
const logStore = makeLogStore();
const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } });
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
const child = mockChildren[0];
const runId = (child.send as ReturnType<typeof vi.fn>).mock.calls[0][0] as { runId: string };
mgr.drainWhenIdle("my-wf");
child.emit("message", {
type: "workflow-error",
runId: runId.runId,
error: "role failed",
exitCode: 1,
});
await vi.runAllTimersAsync();
expect(child.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" }));
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
it("drains immediately when there is no in-flight thread", async () => {
const logStore = makeLogStore();
const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } });
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", { prompt: "once", maxRounds: 10, dryRun: false });
const firstChild = mockChildren[0];
const runId = (firstChild.send as ReturnType<typeof vi.fn>).mock.calls[0][0] as {
runId: string;
};
firstChild.emit("message", {
type: "thread-event",
runId: runId.runId,
eventType: "completed",
payload: null,
});
await vi.runAllTimersAsync();
mgr.drainWhenIdle("my-wf");
await vi.runAllTimersAsync();
expect(firstChild.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" }));
expect(mockChildren).toHaveLength(2);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
});
describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
let nerveRoot: string;
+1 -4
View File
@@ -57,10 +57,7 @@ export function createKernelFileWatchHandlers(deps: KernelFileWatchDeps): Kernel
payload: null,
timestamp: Date.now(),
});
deps.workflowManager.drainAndRespawn(workflowName).catch((e) => {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[kernel] drainAndRespawn error for "${workflowName}": ${msg}\n`);
});
deps.workflowManager.drainWhenIdle(workflowName);
}
function onConfigFileChange(): void {
+58
View File
@@ -57,6 +57,12 @@ export type WorkflowManager = {
* Waits up to `drainTimeoutMs` for threads to complete before force-killing.
*/
drainAndRespawn: (workflowName: string, drainTimeoutMs?: number) => Promise<void>;
/**
* Schedule a drain+respawn that waits for in-flight runs to finish first.
* If no runs are active, drains immediately. Otherwise marks a pending reload
* and drains automatically when the last active run completes.
*/
drainWhenIdle: (workflowName: string) => void;
/** Gracefully shut down all workflow workers. */
stop: () => Promise<void>;
};
@@ -222,6 +228,7 @@ export function createWorkflowManager(
const crashTimestamps = new Map<string, number[]>();
let stopped = false;
let config = initialConfig;
const pendingDrains = new Set<string>();
function getOrCreateState(workflowName: string): WorkflowState {
let state = states.get(workflowName);
@@ -328,6 +335,24 @@ export function createWorkflowManager(
}
}
/** If a hot-reload was deferred, run drain+respawn once there are no active threads. */
function maybeDeferredHotReloadDrain(workflowName: string): void {
if (!pendingDrains.has(workflowName)) return;
const state = states.get(workflowName);
if (state === undefined || state.active.size !== 0) return;
pendingDrains.delete(workflowName);
process.stderr.write(
`[workflow-manager] all runs complete for "${workflowName}", executing deferred hot-reload drain\n`,
);
drainAndRespawn(workflowName).catch((e) => {
const errMsg = e instanceof Error ? e.message : String(e);
process.stderr.write(
`[workflow-manager] deferred drainAndRespawn error for "${workflowName}": ${errMsg}\n`,
);
});
}
function handleThreadEvent(workflowName: string, msg: ThreadEventMessage): void {
const state = states.get(workflowName);
if (state === undefined) return;
@@ -337,6 +362,7 @@ export function createWorkflowManager(
dequeueNext(workflowName);
const exitCode = extractExitCode(msg.payload);
logWorkflowEvent(workflowName, msg.runId, msg.eventType, msg.payload, exitCode);
maybeDeferredHotReloadDrain(workflowName);
}
}
@@ -428,6 +454,7 @@ export function createWorkflowManager(
state.active.clear();
workers.delete(workflowName);
pendingDrains.delete(workflowName);
if (stopped || workflowConfig(workflowName) === null) return;
@@ -484,6 +511,7 @@ export function createWorkflowManager(
dequeueNext(workflowName);
}
logWorkflowEvent(workflowName, msg.runId, "failed", { error: msg.error }, msg.exitCode);
maybeDeferredHotReloadDrain(workflowName);
return;
}
@@ -681,9 +709,38 @@ export function createWorkflowManager(
await waitForExit(entry.process, drainTimeoutMs);
// The exit handler (draining branch) will respawn the worker automatically
}
function drainWhenIdle(workflowName: string): void {
const state = states.get(workflowName);
const hasActiveRuns = state !== undefined && state.active.size > 0;
if (!hasActiveRuns) {
pendingDrains.delete(workflowName);
drainAndRespawn(workflowName).catch((e) => {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(
`[workflow-manager] drainAndRespawn error for "${workflowName}": ${msg}\n`,
);
});
return;
}
// Defer until all active runs finish
if (pendingDrains.has(workflowName)) {
process.stderr.write(
`[workflow-manager] hot-reload already pending for "${workflowName}", skipping duplicate\n`,
);
return;
}
pendingDrains.add(workflowName);
process.stderr.write(
`[workflow-manager] deferring hot-reload for "${workflowName}" until ${state.active.size} active run(s) complete\n`,
);
}
async function stop(): Promise<void> {
stopped = true;
pendingDrains.clear();
const exitPromises: Promise<void>[] = [];
for (const entry of workers.values()) {
sendShutdown(entry.process, entry);
@@ -701,6 +758,7 @@ export function createWorkflowManager(
totalActiveCount,
updateConfig,
drainAndRespawn,
drainWhenIdle,
stop,
};
}