# RFC-002: Workflow Engine > Status: Draft > Author: 小橘 🍊(NEKO Team) > Date: 2026-04-22 ## 1. 动机 RFC-001 定义了 Sense → Signal → Reflex 的无状态观测循环。当 Reflex 的 action 为 `StartWorkflow` 时,需要一个有状态的执行引擎来驱动多步骤工作流。本 RFC 定义 Workflow Engine 的完整设计。 ## 2. 概念模型 ``` Signal 循环 ──→ ThreadStart ──→ Thread 循环 (无状态,幂等,可合并) (有状态,顺序,Command Event 驱动) │ Thread 产出 Log (执行日志,供 retrospection) ``` ### 2.1 核心概念 - **Workflow**:一个有状态工作流的定义,包含并发策略和溢出策略 - **Thread**:Workflow 的一次执行实例,有唯一 `runId` - **Role**:执行具体动作的角色,有副作用(调 API、改文件等) - **Moderator**:在 Role 之间递话筒的调度逻辑(纯函数) - **CommandEvent**:Thread 内部的事件流,驱动 Role 之间的流转 Role 和 Moderator 是 Workflow 的内部细节,不跨 Workflow 共享,不作为顶层扩展点。 ### 2.2 与 Sense 循环的边界 | | Signal 循环 | Thread 循环 | |---|---|---| | 状态 | 无状态 | 有状态(事件溯源) | | 幂等性 | 幂等、可合并、可丢弃 | 严格顺序、每个 event 必须响应 | | 触发 | Reflex | Moderator 递话筒 | | 产出 | Signal → Bus | Log → logs.db | **关键约束**:Thread 产出的 Log 不能触发 Reflex(见 RFC-001 §2.4),只能被 Sense 的 compute 查询用于 retrospection。 ## 3. 配置 ```yaml # nerve.yaml workflows: cleanup: concurrency: 1 # 同时最多 1 个 thread overflow: drop # 已有活跃 thread 时丢弃新请求 execute-task: concurrency: 10 # 可并行 10 个 thread overflow: queue # 超出时排队 code-review: concurrency: 3 overflow: queue maxQueue: 20 # 队列上限,超出丢弃最旧请求 ``` - **concurrency**:同时允许的最大活跃 Thread 数 - **overflow**: - `drop`:丢弃,适用于幂等操作(如 cleanup) - `queue`:排队等待,适用于每次需要执行的操作 - **maxQueue**(仅 `overflow: queue`):队列上限,默认 100,超出丢弃最旧请求 不需要 throttle——触发频率由上游 Sense 的 throttle 控制。 ## 4. 用户代码结构 ``` ~/.uncaged-nerve/ workflows/ cleanup/ index.ts # workflow 定义 code-review/ index.ts ``` ### 4.1 Workflow 定义(用户代码) ```typescript // workflows/cleanup/index.ts import type { WorkflowDefinition } from "@uncaged/nerve-daemon"; const workflow: WorkflowDefinition = { roles: { scanner: { execute: async (prompt, ctx) => { // 扫描需要清理的资源 const stale = await findStaleResources(); return { type: "scan_complete", resources: stale }; }, }, cleaner: { execute: async (prompt, ctx) => { // 执行清理 await deleteResources(prompt.resources); return { type: "cleanup_done", count: prompt.resources.length }; }, }, }, moderate(thread, event) { // 纯函数:决定下一步交给谁 if (event.type === "thread_start") { return { role: "scanner", prompt: {} }; } if (event.type === "scan_complete") { if (event.resources.length === 0) return null; // 结束 return { role: "cleaner", prompt: { resources: event.resources } }; } return null; // 结束 }, }; export default workflow; ``` ### 4.2 类型定义 ```typescript // 用户需要实现的接口 export type RoleExecuteFn = ( prompt: unknown, ctx: WorkflowContext, ) => Promise; export type Role = { execute: RoleExecuteFn; }; export type ModerateFn = ( thread: ThreadState, event: CommandEvent, ) => ModerateResult | null; // null = thread 完成 export type ModerateResult = { role: string; prompt: unknown; }; export type WorkflowDefinition = { roles: Record; moderate: ModerateFn; }; export type WorkflowContext = { runId: string; workflowName: string; log: (message: string) => void; }; export type CommandEvent = { type: string; [key: string]: unknown; }; export type ThreadState = { runId: string; events: CommandEvent[]; // 历史事件,供 moderate 做决策 }; ``` ## 5. 运行时架构 ### 5.1 进程模型 同一个 workflow 的所有 thread 共享一个 worker 进程。`concurrency` 控制进程内的并发 async task 数。 ``` nerve-engine (kernel) ├─ nerve-worker sense --group system (永驻) ├─ nerve-worker sense --group tasks (永驻) ├─ nerve-worker workflow --name cleanup (按需启动) └─ nerve-worker workflow --name code-review (按需启动) ``` - 进程数 = workflow 种类数,不会膨胀 - 有活跃 thread 时启动,所有 thread 完成后退出(或保持待命 `idleTimeout`,默认 30s) - 崩溃后 engine respawn worker,从持久化状态恢复 thread ### 5.2 IPC 扩展 在现有 IPC 协议基础上扩展 workflow 相关消息: ```typescript // Parent → Workflow Worker type StartThreadMessage = { type: "start-thread"; runId: string; workflow: string; triggerPayload: unknown; // Reflex 传入的 signal payload }; type ResumeThreadMessage = { type: "resume-thread"; runId: string; }; type ShutdownMessage = { type: "shutdown"; }; // Workflow Worker → Parent type ThreadEventMessage = { type: "thread-event"; runId: string; eventType: string; // queued, started, step_complete, completed, failed payload: unknown; }; type WorkflowReadyMessage = { type: "ready"; }; type WorkflowErrorMessage = { type: "error"; runId: string; error: string; }; ``` ### 5.3 Thread 生命周期 ``` ┌──────────────────────────────────────────┐ │ │ trigger ──→ queued ──→ started ──→ step_complete ──→ completed │ ↑ │ │ └─────┘ │ └──→ failed └──→ crashed (worker 崩溃) ``` 1. Reflex `StartWorkflow` 触发 → kernel 检查 concurrency 2. 有空位 → 状态 `started`,发 `start-thread` 给 worker 3. 超出 concurrency: - `overflow: drop` → 丢弃,写 `dropped` log - `overflow: queue` → 状态 `queued`,等空位 4. Worker 内部循环:`moderate → execute → moderate → execute → ...` 5. `moderate` 返回 `null` → `completed` 6. Role `execute` 抛异常 → `failed` 7. Worker 进程崩溃 → 所有活跃 thread 标记 `crashed` ### 5.4 Workflow Worker 内部实现 ```typescript // workflow-worker.ts(engine 代码,不是用户代码) async function runThread( def: WorkflowDefinition, runId: string, triggerPayload: unknown, sendToParent: (msg: WorkerToParentMessage) => void, ): Promise { const state: ThreadState = { runId, events: [] }; const ctx: WorkflowContext = { runId, workflowName, log: (msg) => sendToParent({ type: "thread-event", runId, eventType: "step_complete", payload: { message: msg }, }), }; // 初始事件 let event: CommandEvent = { type: "thread_start", ...triggerPayload }; while (true) { state.events.push(event); const next = def.moderate(state, event); if (next === null) { sendToParent({ type: "thread-event", runId, eventType: "completed", payload: null }); return; } const role = def.roles[next.role]; if (!role) { sendToParent({ type: "error", runId, error: `Unknown role: ${next.role}` }); return; } event = await role.execute(next.prompt, ctx); } } ``` ## 6. 持久化 ### 6.1 事件溯源 Thread 状态以 append-only 事件流为 source of truth,写入统一的 `logs.db`: ```sql -- logs 表(已存在于 RFC-001),source = "workflow" -- type 取值:queued, started, step_complete, completed, failed, crashed, dropped ``` ### 6.2 物化表 为避免每次查活跃 workflow 都扫描全表,维护一张物化表,在写 log 的同一事务中 UPSERT: ```sql CREATE TABLE workflow_runs ( run_id TEXT PRIMARY KEY, workflow TEXT NOT NULL, status TEXT NOT NULL, -- queued, started, completed, failed, crashed ts INTEGER NOT NULL ); CREATE INDEX idx_workflow_runs_status ON workflow_runs(status); CREATE INDEX idx_workflow_runs_workflow ON workflow_runs(workflow); ``` 写入流程(同一事务): ```sql BEGIN; INSERT INTO logs (source, type, ref_id, payload, ts) VALUES ('workflow', 'started', 'run-7', '{}', 1001); INSERT OR REPLACE INTO workflow_runs (run_id, workflow, status, ts) VALUES ('run-7', 'cleanup', 'started', 1001); COMMIT; ``` 物化表是 logs 的派生数据——数据丢失时可从 logs 重建。 ### 6.3 崩溃恢复 Worker 重启时: 1. 查 `workflow_runs WHERE status IN ('queued', 'started')` 2. `queued` → 重新排队 3. `started` → 从 logs 重建 `ThreadState`(事件列表),resume 幂等 workflow(`overflow: drop`)直接标记 `crashed` 重新来。 ## 7. Kernel 扩展 ### 7.1 WorkflowManager Kernel 新增 `WorkflowManager` 组件: ```typescript export type WorkflowManager = { /** 触发一个 workflow(来自 Reflex) */ startWorkflow: (workflowName: string, payload: unknown) => void; /** 获取活跃 thread 数 */ activeCount: (workflowName: string) => number; /** 获取队列长度 */ queueLength: (workflowName: string) => number; /** 停止所有 workflow */ stop: () => Promise; }; ``` ### 7.2 Sense Scheduler 扩展 当前 `SenseScheduler` 只处理 `kind: "sense"` 的 reflex。扩展为同时处理 `kind: "workflow"`: ```typescript // sense-scheduler.ts 扩展 if (reflex.kind === "workflow") { const workflowName = reflex.workflow; if (reflex.on !== null && reflex.on.length > 0) { const watchedSenses = new Set(reflex.on); const unsub = bus.subscribe((signal) => { if (watchedSenses.has(signal.senseId)) { workflowManager.startWorkflow(workflowName, signal.payload); } }); unsubscribers.push(unsub); } } ``` ### 7.3 热更新 | 变化 | 处理 | |------|------| | workflow ts 文件修改 | drain(等活跃 thread 完成,`drainTimeout` 后 force kill + 标记 crashed)→ respawn | | nerve.yaml workflow 配置修改 | 更新 concurrency/overflow,不重启 worker | | 新增 workflow | 下次触发时按需启动 worker | | 删除 workflow | drain + 移除 | ## 8. 实现计划 ### Phase 1:核心类型与 WorkflowManager 骨架 - [ ] `packages/core/src/types.ts` — 扩展 workflow 相关类型(`WorkflowDefinition`、`ThreadState`、`CommandEvent`、`ModerateResult` 等) - [ ] `packages/daemon/src/ipc.ts` — 扩展 IPC 消息类型(`StartThreadMessage`、`ThreadEventMessage` 等) - [ ] `packages/daemon/src/workflow-manager.ts` — WorkflowManager 实现(并发控制、队列管理、thread 生命周期) - [ ] `packages/daemon/src/workflow-worker.ts` — Workflow worker 进程(加载用户代码、运行 moderate/execute 循环) - [ ] `packages/daemon/src/log-store.ts` — 扩展 LogStore,添加 `workflow_runs` 物化表 + 查询方法 - [ ] 单元测试覆盖:并发控制、队列溢出、thread 生命周期 ### Phase 2:Kernel 集成 - [ ] `packages/daemon/src/kernel.ts` — 集成 WorkflowManager,处理 workflow worker 的生命周期 - [ ] `packages/daemon/src/sense-scheduler.ts` — 扩展支持 `kind: "workflow"` 的 reflex - [ ] 集成测试:Sense signal → SenseScheduler → Workflow 全链路 ### Phase 3:崩溃恢复与热更新 - [ ] Worker crash recovery — 从持久化状态恢复 thread - [ ] 热更新 — workflow 代码变更时 drain + respawn - [ ] nerve.yaml workflow 配置变更的增量更新 ### Phase 4:CLI 与用户体验 - [ ] `nerve workflow list` — 查看活跃 workflow - [ ] `nerve workflow inspect ` — 查看 thread 详情 - [ ] `nerve workflow trigger ` — 手动触发 - [ ] scaffold 模板 — `nerve init workflow `