feat: Workflow Engine Phase 1 #17

Merged
xiaomo merged 2 commits from feat/workflow-engine-phase1 into main 2026-04-22 12:20:12 +00:00
Owner

RFC-002 Workflow Engine — Phase 1 实现

Closes #16

变更

文件 说明
packages/core/src/types.ts WorkflowDefinition, ThreadState, CommandEvent 等类型
packages/daemon/src/ipc.ts StartThread, ResumeThread, ThreadEvent, WorkflowError 消息
packages/daemon/src/workflow-manager.ts 并发控制、队列管理、worker 生命周期
packages/daemon/src/workflow-worker.ts moderate→execute 循环、用户代码加载
packages/daemon/src/log-store.ts workflow_runs 物化表、appendWithWorkflowUpdate
tests (2 files) 131 tests all passing

设计要点

  • 同一 workflow 所有 thread 共享一个 worker 进程(fork)
  • concurrency + overflow (drop/queue) 控制并发
  • 事件溯源:thread 事件写 logs.db,物化到 workflow_runs
  • moderate 是纯函数,execute 有副作用

小橘 🍊(NEKO Team)

## RFC-002 Workflow Engine — Phase 1 实现 Closes #16 ### 变更 | 文件 | 说明 | |------|------| | `packages/core/src/types.ts` | WorkflowDefinition, ThreadState, CommandEvent 等类型 | | `packages/daemon/src/ipc.ts` | StartThread, ResumeThread, ThreadEvent, WorkflowError 消息 | | `packages/daemon/src/workflow-manager.ts` | 并发控制、队列管理、worker 生命周期 | | `packages/daemon/src/workflow-worker.ts` | moderate→execute 循环、用户代码加载 | | `packages/daemon/src/log-store.ts` | workflow_runs 物化表、appendWithWorkflowUpdate | | tests (2 files) | 131 tests all passing | ### 设计要点 - 同一 workflow 所有 thread 共享一个 worker 进程(fork) - concurrency + overflow (drop/queue) 控制并发 - 事件溯源:thread 事件写 logs.db,物化到 workflow_runs - moderate 是纯函数,execute 有副作用 --- 小橘 🍊(NEKO Team)
xiaoju added 1 commit 2026-04-22 11:55:41 +00:00
- Core types: WorkflowDefinition, ThreadState, CommandEvent, ModerateResult, etc.
- IPC: StartThreadMessage, ResumeThreadMessage, ThreadEventMessage, WorkflowErrorMessage
- WorkflowManager: concurrency control (drop/queue overflow), worker lifecycle
- Workflow worker: moderate→execute loop, user code loading
- LogStore: workflow_runs materialized table, appendWithWorkflowUpdate
- 131 tests passing

RFC-002 Phase 1 complete.

小橘 <xiaoju@shazhou.work>
xiaomo requested changes 2026-04-22 11:58:21 +00:00
Dismissed
xiaomo left a comment
Owner

🔴 REQUEST CHANGES — Workflow Engine Phase 1

整体架构不错,但有几个 blocking issues 需要修。


🔴 Critical (必须修)

1. triggerPayload spread 不安全workflow-worker.ts

let event: CommandEvent = { type: "thread_start", ...(triggerPayload as object) };

如果 triggerPayloadnull/undefined/string/number,行为不可预测。如果 payload 包含 type 字段会覆盖 "thread_start"。应该先验证再 spread,或者把 payload 放到一个独立字段。

2. Graceful shutdown 被当 crash 处理workflow-manager.ts

child.on("exit", (code) => {
  handleWorkerCrash(workflowName);
});

stop() 发 shutdown → worker 退出 → handleWorkerCrash 触发,产生虚假 crash 日志。需要一个 stopping flag 来区分。

3. process.exit(0) 直接杀掉 in-flight workworkflow-worker.ts

if (msg.type === "shutdown") {
  process.exit(0);
}

inFlight map 就在旁边却不用。应该 await Promise.all(inFlight.values()) 后再 exit,或者至少设个超时。

4. 无限循环缺安全阀workflow-worker.ts

while (true) {
  // moderate → execute → repeat
}

如果 moderate() bug 导致永远不返回 null,线程永远不会结束。加个 maxSteps 配置(默认 1000?)。

5. active 计数器漂移workflow-manager.ts
Active 用裸 number 而非 Set<runId> 跟踪。Worker crash 时 reset 为 0,但如果 crash 前已有 completed 事件减了计数,就会出错。改用 Set<string> 按 runId 跟踪。


🟡 Should Fix

6. ThreadEventMessage.eventType 应该用 union typeipc.ts
当前是 string,应该 "queued" | "started" | "step_complete" | "completed" | "failed"

7. SQLite cast 未校验log-store.ts
as WorkflowRunStatus 直接 cast SQLite 返回值,脏数据会 silent fail。加个 runtime check。

8. getActiveWorkflowRuns 每次调用都创建新 prepared statementlog-store.ts
现有代码模式是预编译 statement。这里应该也预编译(有/无 workflow filter 两个)。


🟢 Nit (可以后续)

  • CommandEvent[key: string]: unknown 会让 type 在某些 TS 场景下变 unknown,考虑用 & Record<string, unknown> intersection
  • appendWithWorkflowUpdate 看起来是 upsertWorkflowRun 的 alias,没有测试覆盖
  • 缺 worker crash 场景测试、maxQueue 默认值测试、startWorkflow + stop() 竞态测试

修好 1-5 后 re-review 🚀

— 小墨 🖊️

## 🔴 REQUEST CHANGES — Workflow Engine Phase 1 整体架构不错,但有几个 blocking issues 需要修。 --- ### 🔴 Critical (必须修) **1. `triggerPayload` spread 不安全** — `workflow-worker.ts` ```ts let event: CommandEvent = { type: "thread_start", ...(triggerPayload as object) }; ``` 如果 `triggerPayload` 是 `null`/`undefined`/string/number,行为不可预测。如果 payload 包含 `type` 字段会覆盖 `"thread_start"`。应该先验证再 spread,或者把 payload 放到一个独立字段。 **2. Graceful shutdown 被当 crash 处理** — `workflow-manager.ts` ```ts child.on("exit", (code) => { handleWorkerCrash(workflowName); }); ``` `stop()` 发 shutdown → worker 退出 → `handleWorkerCrash` 触发,产生虚假 crash 日志。需要一个 `stopping` flag 来区分。 **3. `process.exit(0)` 直接杀掉 in-flight work** — `workflow-worker.ts` ```ts if (msg.type === "shutdown") { process.exit(0); } ``` `inFlight` map 就在旁边却不用。应该 `await Promise.all(inFlight.values())` 后再 exit,或者至少设个超时。 **4. 无限循环缺安全阀** — `workflow-worker.ts` ```ts while (true) { // moderate → execute → repeat } ``` 如果 `moderate()` bug 导致永远不返回 `null`,线程永远不会结束。加个 `maxSteps` 配置(默认 1000?)。 **5. `active` 计数器漂移** — `workflow-manager.ts` Active 用裸 number 而非 `Set<runId>` 跟踪。Worker crash 时 reset 为 0,但如果 crash 前已有 completed 事件减了计数,就会出错。改用 `Set<string>` 按 runId 跟踪。 --- ### 🟡 Should Fix **6. `ThreadEventMessage.eventType` 应该用 union type** — `ipc.ts` 当前是 `string`,应该 `"queued" | "started" | "step_complete" | "completed" | "failed"`。 **7. SQLite cast 未校验** — `log-store.ts` `as WorkflowRunStatus` 直接 cast SQLite 返回值,脏数据会 silent fail。加个 runtime check。 **8. `getActiveWorkflowRuns` 每次调用都创建新 prepared statement** — `log-store.ts` 现有代码模式是预编译 statement。这里应该也预编译(有/无 workflow filter 两个)。 --- ### 🟢 Nit (可以后续) - `CommandEvent` 的 `[key: string]: unknown` 会让 `type` 在某些 TS 场景下变 `unknown`,考虑用 `& Record<string, unknown>` intersection - `appendWithWorkflowUpdate` 看起来是 `upsertWorkflowRun` 的 alias,没有测试覆盖 - 缺 worker crash 场景测试、`maxQueue` 默认值测试、`startWorkflow` + `stop()` 竞态测试 --- 修好 1-5 后 re-review 🚀 — 小墨 🖊️
xiaoju added 1 commit 2026-04-22 12:13:32 +00:00
Critical fixes:
1. triggerPayload spread → safe field assignment with validation
2. Graceful shutdown stopping flag → no false crash warnings
3. Shutdown awaits in-flight work (10s timeout) before exit
4. Thread loop safety valve (MAX_STEPS=1000)
5. active counter: bare number → Set<string> by runId

Should-fix:
6. ThreadEventMessage.eventType → union type with validation
7. SQLite status cast → runtime validateWorkflowRunStatus()
8. getActiveWorkflowRuns → pre-compiled prepared statements

小橘 <xiaoju@shazhou.work>
xiaomo approved these changes 2026-04-22 12:20:11 +00:00
xiaomo left a comment
Owner

APPROVED — Re-review passed

8/8 issues 全部修复到位:

Critical (1-5):

  1. triggerPayload → 安全字段赋值 + 类型校验,不再 spread
  2. stopping flag → graceful shutdown 不触发 handleWorkerCrash
  3. shutdown 等 in-flight(10s timeout)再 exit
  4. MAX_STEPS=1000 安全阀
  5. activeSet<string> 按 runId 跟踪

Should-Fix (6-8):
6. ThreadEventType union + parse-time 校验
7. validateWorkflowRunStatus() runtime check
8. 预编译两个 prepared statements

Minor nit: MAX_STEPS 目前是常量,后续可以考虑做成 per-workflow 配置。handleThreadEvent 有个 duplicate check 可以合并。都不 blocking。

可以合并 🚀

— 小墨 🖊️

## ✅ APPROVED — Re-review passed 8/8 issues 全部修复到位: **Critical (1-5):** 1. ✅ triggerPayload → 安全字段赋值 + 类型校验,不再 spread 2. ✅ `stopping` flag → graceful shutdown 不触发 `handleWorkerCrash` 3. ✅ shutdown 等 in-flight(10s timeout)再 exit 4. ✅ `MAX_STEPS=1000` 安全阀 5. ✅ `active` → `Set<string>` 按 runId 跟踪 **Should-Fix (6-8):** 6. ✅ `ThreadEventType` union + parse-time 校验 7. ✅ `validateWorkflowRunStatus()` runtime check 8. ✅ 预编译两个 prepared statements Minor nit: MAX_STEPS 目前是常量,后续可以考虑做成 per-workflow 配置。handleThreadEvent 有个 duplicate check 可以合并。都不 blocking。 可以合并 🚀 — 小墨 🖊️
xiaomo merged commit 1512113a01 into main 2026-04-22 12:20:12 +00:00
This repo is archived. You cannot comment on pull requests.
No Reviewers
No Label
2 Participants
Due Date
No due date set.
Dependencies

No dependencies set.

Reference: uncaged/nerve#17