fix: address review feedback (星月) for Phase 3
1. sendToWorker: IPC send failure now marks thread as failed + dequeues next 2. crashLimitBlocked Set: prevents new startWorkflow from bypassing crash limit 3. "respawning" log skipped when crash limit is active 4. logWorkflowEvent payload: unknown | null (project convention, not ?:)
This commit is contained in:
@@ -82,6 +82,7 @@ export function createWorkflowManager(
|
||||
const trackedWorkflows = new Set<string>();
|
||||
const hotReloadEvicting = new Set<string>();
|
||||
const crashRecoveryPending = new Set<string>();
|
||||
const crashLimitBlocked = new Set<string>();
|
||||
let stopped = false;
|
||||
let config = initialConfig;
|
||||
const pendingDrains = new Set<string>();
|
||||
@@ -90,7 +91,7 @@ export function createWorkflowManager(
|
||||
workflowName: string,
|
||||
runId: string,
|
||||
eventType: string,
|
||||
payload?: unknown,
|
||||
payload: unknown | null = null,
|
||||
exitCode: number | null = null,
|
||||
): void {
|
||||
appendWorkflowRunLog(logStore, workflowName, runId, eventType, payload, exitCode);
|
||||
@@ -155,6 +156,7 @@ export function createWorkflowManager(
|
||||
onCrashLimitReached: (workflowName) => {
|
||||
crashRecoveryPending.delete(workflowName);
|
||||
trackedWorkflows.delete(workflowName);
|
||||
crashLimitBlocked.add(workflowName);
|
||||
process.stderr.write(
|
||||
`[workflow-manager] worker for "${workflowName}" exceeded crash limit (${String(WORKFLOW_WORKER_RESPAWN.maxCrashes)} in ${String(WORKFLOW_WORKER_RESPAWN.windowMs)}ms) — stopping respawn\n`,
|
||||
);
|
||||
@@ -181,17 +183,36 @@ export function createWorkflowManager(
|
||||
|
||||
/** IPC send — matches legacy pool: no-op when IPC is disconnected; cold-start via WorkerRuntime.send. */
|
||||
function sendToWorker(workflowName: string, msg: unknown): void {
|
||||
if (crashLimitBlocked.has(workflowName)) {
|
||||
return;
|
||||
}
|
||||
trackedWorkflows.add(workflowName);
|
||||
if (runtime.hasDisconnectedChild(workflowName)) {
|
||||
return;
|
||||
}
|
||||
if (!runtime.trySendSync(workflowName, msg)) {
|
||||
void runtime.send(workflowName, msg).catch(() => {
|
||||
// IPC channel may close between scheduling and send
|
||||
// IPC channel closed — mark any thread from this message as failed
|
||||
if (isStartThreadMsg(msg)) {
|
||||
const state = states.get(workflowName);
|
||||
if (state?.active.has(msg.runId)) {
|
||||
state.active.delete(msg.runId);
|
||||
logWorkflowEvent(workflowName, msg.runId, "failed", { error: "IPC channel closed" }, 1);
|
||||
dequeueNext(workflowName);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function isStartThreadMsg(msg: unknown): msg is StartThreadMessage {
|
||||
return (
|
||||
msg !== null &&
|
||||
typeof msg === "object" &&
|
||||
(msg as Record<string, unknown>).type === "start-thread"
|
||||
);
|
||||
}
|
||||
|
||||
function dispatchThread(
|
||||
workflowName: string,
|
||||
runId: string,
|
||||
@@ -277,7 +298,7 @@ export function createWorkflowManager(
|
||||
state.active.clear();
|
||||
pendingDrains.delete(workflowName);
|
||||
|
||||
if (!stopped && workflowConfig(workflowName) !== null) {
|
||||
if (!stopped && !crashLimitBlocked.has(workflowName) && workflowConfig(workflowName) !== null) {
|
||||
process.stderr.write(
|
||||
`[workflow-manager] respawning worker for "${workflowName}" after crash\n`,
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user