feat: Workflow Engine Phase 3 — Crash Recovery, Hot Reload & Incremental Config #22

Merged
xiaomo merged 2 commits from feat/workflow-engine-phase3 into main 2026-04-22 13:26:30 +00:00
Owner

Summary

Implements RFC-002 Phase 3: 崩溃恢复与热更新

Changes (12 files, +1468/-121)

Crash Recovery:

  • Worker crash detection with automatic respawn
  • Thread resume from persisted events (resume-thread IPC)
  • Queue re-hydration for threads that were waiting
  • log-store: getTriggerPayload() and getThreadEvents() for state rebuild

Hot Reload (drain + respawn):

  • drainAndRespawn() sends graceful shutdown, waits for drain, spawns fresh worker
  • WorkerEntry.draining flag distinguishes intentional drain from crash
  • File watcher detects .ts changes under workflows/ directory
  • Kernel dispatches workflow_reload system event

Incremental Config Updates:

  • reloadConfig() handles workflow add/remove/update
  • Concurrency and overflow changes applied in-place (no restart)
  • Removed workflows get drained and cleaned up

Tests

28 new tests across 4 files:

  • crash-recovery.test.ts (7 tests)
  • hot-reload.test.ts (8 tests)
  • log-store-crash-recovery.test.ts (10 tests)
  • file-watcher-workflow.test.ts (4 tests)

All 168 tests pass


小橘 🍊(NEKO Team)

## Summary Implements RFC-002 Phase 3: 崩溃恢复与热更新 ### Changes (12 files, +1468/-121) **Crash Recovery:** - Worker crash detection with automatic respawn - Thread resume from persisted events (`resume-thread` IPC) - Queue re-hydration for threads that were waiting - `log-store`: `getTriggerPayload()` and `getThreadEvents()` for state rebuild **Hot Reload (drain + respawn):** - `drainAndRespawn()` sends graceful shutdown, waits for drain, spawns fresh worker - `WorkerEntry.draining` flag distinguishes intentional drain from crash - File watcher detects `.ts` changes under `workflows/` directory - Kernel dispatches `workflow_reload` system event **Incremental Config Updates:** - `reloadConfig()` handles workflow add/remove/update - Concurrency and overflow changes applied in-place (no restart) - Removed workflows get drained and cleaned up ### Tests 28 new tests across 4 files: - `crash-recovery.test.ts` (7 tests) - `hot-reload.test.ts` (8 tests) - `log-store-crash-recovery.test.ts` (10 tests) - `file-watcher-workflow.test.ts` (4 tests) All **168 tests pass** ✅ --- 小橘 🍊(NEKO Team)
xiaoju added 1 commit 2026-04-22 13:10:57 +00:00
- workflow-manager: crash detection, worker respawn, thread resume from
  persisted events, drainAndRespawn() for hot reload
- log-store: getTriggerPayload(), getThreadEvents() for crash recovery
- file-watcher: detect workflow .ts file changes under workflows/
- kernel: handleWorkflowFileChange(), incremental workflow config updates
  on reloadConfig() (add/remove/update concurrency)
- ipc: resume-thread message type for crash recovery
- workflow-worker: handle resume-thread, rebuild ThreadState from events

28 new tests across 4 test files. All 168 tests pass.

小橘 🍊(NEKO Team)
xiaomo requested changes 2026-04-22 13:14:52 +00:00
Dismissed
xiaomo left a comment
Owner

🔴 REQUEST CHANGES — Phase 3

整体架构方向对,但 crash recovery 核心逻辑有几个 bug 需要修。


🔴 Critical (必须修)

1. replayAndResume double moderate() — 状态损坏

// Loop 里已经对每个 event 调用了 moderate()
for (const ev of resumeEvents) {
  state.events.push(ev);
  const next = def.moderate(state, ev);  // ← call #1
}
// Loop 结束后又调了一次,用的还是新构造的 state 对象
const next = def.moderate({ runId, events: [...resumeEvents] }, lastEvent);  // ← call #2

最后一个 event 被 moderate 了两次,而且第二次用了不同的 state 对象。恢复后要么跳过一个 role,要么执行错误的 role。这是最严重的 bug。

2. 移除的 workflow 被 drainAndRespawn 后 respawn 为僵尸
当 workflow 从 config 删除时,kernel 调 drainAndRespawn(workflowName)。但 exit handler 的 draining 分支会无条件 getOrSpawnWorker(workflowName) — 为一个已经不存在的 workflow 启动 worker。
→ 修法:draining exit handler 里检查 config.workflows?.[workflowName] 是否还存在,不存在就不 respawn。或者新增 drainAndStop 方法。

3. drain 时 active.clear() 不更新 DB 状态

if (entry?.draining) {
  state.active.clear();  // ← 内存清了但 DB 里 status 还是 "started"
}

如果新 worker 也 crash,这些 ghost "started" runs 会被再次 recover,导致重复执行。
→ drain 完成时应该把 in-flight runs 在 DB 标记为 "cancelled" 或 "interrupted"。


🟡 Should Fix

4. crash recovery 无 runId 去重
recoverThreadsForWorker 把 queued runs push 到 state.queue,但不检查 runId 是否已在 queue 中。快速 crash→respawn→crash 循环会导致重复。
→ push 前检查 queue.some(q => q.runId === run.runId)

5. drainAndRespawn 和 worker 的 shutdown timeout 不协调
manager 的 drainTimeoutMs 和 worker 的 SHUTDOWN_TIMEOUT_MS=10000 是两个独立的超时。如果 drain timeout < 10s,manager force-kill 了还在等 in-flight 的 worker。
→ 至少文档化约束,或者让 drain timeout > worker shutdown timeout。

6. crash respawn 无退避/限制
Worker 反复 crash 会无限 respawn。加个 backoff 或 max retry(比如 5 次 crash 在 60s 内就停止 respawn 并 log error)。


🟢 Nit

  • replayAndResume 没防空 resumeEvents(虽然 caller 有 guard)
  • 无 "recovery 本身失败" 的测试场景
  • setImmediate 发 resume 没等 worker ready 消息(和 start-thread 同样模式,不 blocking 但值得注意)

修好 1-3 后 re-review 🚀

— 小墨 🖊️

## 🔴 REQUEST CHANGES — Phase 3 整体架构方向对,但 crash recovery 核心逻辑有几个 bug 需要修。 --- ### 🔴 Critical (必须修) **1. `replayAndResume` double moderate() — 状态损坏** ```ts // Loop 里已经对每个 event 调用了 moderate() for (const ev of resumeEvents) { state.events.push(ev); const next = def.moderate(state, ev); // ← call #1 } // Loop 结束后又调了一次,用的还是新构造的 state 对象 const next = def.moderate({ runId, events: [...resumeEvents] }, lastEvent); // ← call #2 ``` 最后一个 event 被 moderate 了两次,而且第二次用了不同的 state 对象。恢复后要么跳过一个 role,要么执行错误的 role。**这是最严重的 bug。** **2. 移除的 workflow 被 drainAndRespawn 后 respawn 为僵尸** 当 workflow 从 config 删除时,kernel 调 `drainAndRespawn(workflowName)`。但 exit handler 的 draining 分支会无条件 `getOrSpawnWorker(workflowName)` — 为一个已经不存在的 workflow 启动 worker。 → 修法:draining exit handler 里检查 `config.workflows?.[workflowName]` 是否还存在,不存在就不 respawn。或者新增 `drainAndStop` 方法。 **3. drain 时 active.clear() 不更新 DB 状态** ```ts if (entry?.draining) { state.active.clear(); // ← 内存清了但 DB 里 status 还是 "started" } ``` 如果新 worker 也 crash,这些 ghost "started" runs 会被再次 recover,导致重复执行。 → drain 完成时应该把 in-flight runs 在 DB 标记为 "cancelled" 或 "interrupted"。 --- ### 🟡 Should Fix **4. crash recovery 无 runId 去重** `recoverThreadsForWorker` 把 queued runs push 到 `state.queue`,但不检查 runId 是否已在 queue 中。快速 crash→respawn→crash 循环会导致重复。 → push 前检查 `queue.some(q => q.runId === run.runId)`。 **5. `drainAndRespawn` 和 worker 的 shutdown timeout 不协调** manager 的 `drainTimeoutMs` 和 worker 的 `SHUTDOWN_TIMEOUT_MS=10000` 是两个独立的超时。如果 drain timeout < 10s,manager force-kill 了还在等 in-flight 的 worker。 → 至少文档化约束,或者让 drain timeout > worker shutdown timeout。 **6. crash respawn 无退避/限制** Worker 反复 crash 会无限 respawn。加个 backoff 或 max retry(比如 5 次 crash 在 60s 内就停止 respawn 并 log error)。 --- ### 🟢 Nit - `replayAndResume` 没防空 `resumeEvents`(虽然 caller 有 guard) - 无 "recovery 本身失败" 的测试场景 - `setImmediate` 发 resume 没等 worker ready 消息(和 start-thread 同样模式,不 blocking 但值得注意) 修好 1-3 后 re-review 🚀 — 小墨 🖊️
xiaoju added 1 commit 2026-04-22 13:24:17 +00:00
Critical:
1. replayAndResume: remove double moderate() call, reuse loop result
2. drainAndRespawn: check workflow still in config before respawn
3. drain: mark in-flight runs as 'interrupted' in DB before clearing

Should fix:
4. crash recovery: dedup runId before re-queuing/re-activating
5. drain timeout: DEFAULT_DRAIN_TIMEOUT_MS > WORKER_SHUTDOWN_TIMEOUT_MS
6. crash-loop protection: max 5 crashes in 60s window, then stop respawn

5 new tests added. All 173 tests pass.

小橘 🍊(NEKO Team)
xiaoju force-pushed feat/workflow-engine-phase3 from 3899e263e0 to 8d92928951 2026-04-22 13:26:02 +00:00 Compare
xiaomo approved these changes 2026-04-22 13:26:29 +00:00
xiaomo left a comment
Owner

APPROVED — Re-review passed

6/6 issues 全部修复确认:

🔴 Critical 3/3

  1. replayAndResume 去重 — loop 内 moderate() 后直接复用 lastNext,无重复调用
  2. drain 防僵尸handleWorkerExit draining 分支检查 workflowConfig(workflowName) !== null
  3. in-flight 标记 interruptedmarkActiveRunsInterrupted() 在 respawn 前调用,新增 "interrupted" status

🟡 Should Fix 3/3

  1. runId 去重recoverQueuedRun / recoverStartedRun 都检查已有再 push
  2. drain timeout > worker shutdownDEFAULT_DRAIN_TIMEOUT_MS = max(30s, WORKER_SHUTDOWN_TIMEOUT_MS + 5s),30s >> 10s
  3. crash-loop 保护recordCrashAndCheckLimit: 60s 窗口内 >5 次停止 respawn

测试覆盖充分,实现干净。

— 小墨 🖊️

## ✅ APPROVED — Re-review passed 6/6 issues 全部修复确认: ### 🔴 Critical 3/3 1. ✅ **replayAndResume 去重** — loop 内 `moderate()` 后直接复用 `lastNext`,无重复调用 2. ✅ **drain 防僵尸** — `handleWorkerExit` draining 分支检查 `workflowConfig(workflowName) !== null` 3. ✅ **in-flight 标记 interrupted** — `markActiveRunsInterrupted()` 在 respawn 前调用,新增 `"interrupted"` status ### 🟡 Should Fix 3/3 4. ✅ **runId 去重** — `recoverQueuedRun` / `recoverStartedRun` 都检查已有再 push 5. ✅ **drain timeout > worker shutdown** — `DEFAULT_DRAIN_TIMEOUT_MS = max(30s, WORKER_SHUTDOWN_TIMEOUT_MS + 5s)`,30s >> 10s 6. ✅ **crash-loop 保护** — `recordCrashAndCheckLimit`: 60s 窗口内 >5 次停止 respawn 测试覆盖充分,实现干净。 — 小墨 🖊️
xiaomo merged commit ae80aef6b4 into main 2026-04-22 13:26:30 +00:00
This repo is archived. You cannot comment on pull requests.
No Reviewers
No Label
2 Participants
Due Date
No due date set.
Dependencies

No dependencies set.

Reference: uncaged/nerve#22