docs: add RFC-002 Workflow Engine
Defines the workflow execution engine design: - Thread lifecycle with event sourcing - Concurrency control (drop/queue overflow) - WorkflowManager, workflow worker process model - IPC extension, crash recovery, hot reload - 4-phase implementation plan 小橘 <xiaoju@shazhou.work>
This commit is contained in:
@@ -0,0 +1,410 @@
|
||||
# RFC-002: Workflow Engine
|
||||
|
||||
> Status: Draft
|
||||
> Author: 小橘 🍊(NEKO Team)
|
||||
> Date: 2026-04-22
|
||||
|
||||
## 1. 动机
|
||||
|
||||
RFC-001 定义了 Sense → Signal → Reflex 的无状态观测循环。当 Reflex 的 action 为 `StartWorkflow` 时,需要一个有状态的执行引擎来驱动多步骤工作流。本 RFC 定义 Workflow Engine 的完整设计。
|
||||
|
||||
## 2. 概念模型
|
||||
|
||||
```
|
||||
Signal 循环 ──→ ThreadStart ──→ Thread 循环
|
||||
(无状态,幂等,可合并) (有状态,顺序,Command Event 驱动)
|
||||
│
|
||||
Thread 产出 Log
|
||||
(执行日志,供 retrospection)
|
||||
```
|
||||
|
||||
### 2.1 核心概念
|
||||
|
||||
- **Workflow**:一个有状态工作流的定义,包含并发策略和溢出策略
|
||||
- **Thread**:Workflow 的一次执行实例,有唯一 `runId`
|
||||
- **Role**:执行具体动作的角色,有副作用(调 API、改文件等)
|
||||
- **Moderator**:在 Role 之间递话筒的调度逻辑(纯函数)
|
||||
- **CommandEvent**:Thread 内部的事件流,驱动 Role 之间的流转
|
||||
|
||||
Role 和 Moderator 是 Workflow 的内部细节,不跨 Workflow 共享,不作为顶层扩展点。
|
||||
|
||||
### 2.2 与 Sense 循环的边界
|
||||
|
||||
| | Signal 循环 | Thread 循环 |
|
||||
|---|---|---|
|
||||
| 状态 | 无状态 | 有状态(事件溯源) |
|
||||
| 幂等性 | 幂等、可合并、可丢弃 | 严格顺序、每个 event 必须响应 |
|
||||
| 触发 | Reflex | Moderator 递话筒 |
|
||||
| 产出 | Signal → Bus | Log → logs.db |
|
||||
|
||||
**关键约束**:Thread 产出的 Log 不能触发 Reflex(见 RFC-001 §2.4),只能被 Sense 的 compute 查询用于 retrospection。
|
||||
|
||||
## 3. 配置
|
||||
|
||||
```yaml
|
||||
# nerve.yaml
|
||||
workflows:
|
||||
cleanup:
|
||||
concurrency: 1 # 同时最多 1 个 thread
|
||||
overflow: drop # 已有活跃 thread 时丢弃新请求
|
||||
execute-task:
|
||||
concurrency: 10 # 可并行 10 个 thread
|
||||
overflow: queue # 超出时排队
|
||||
code-review:
|
||||
concurrency: 3
|
||||
overflow: queue
|
||||
maxQueue: 20 # 队列上限,超出丢弃最旧请求
|
||||
```
|
||||
|
||||
- **concurrency**:同时允许的最大活跃 Thread 数
|
||||
- **overflow**:
|
||||
- `drop`:丢弃,适用于幂等操作(如 cleanup)
|
||||
- `queue`:排队等待,适用于每次需要执行的操作
|
||||
- **maxQueue**(仅 `overflow: queue`):队列上限,默认 100,超出丢弃最旧请求
|
||||
|
||||
不需要 throttle——触发频率由上游 Sense 的 throttle 控制。
|
||||
|
||||
## 4. 用户代码结构
|
||||
|
||||
```
|
||||
~/.uncaged-nerve/
|
||||
workflows/
|
||||
cleanup/
|
||||
index.ts # workflow 定义
|
||||
code-review/
|
||||
index.ts
|
||||
```
|
||||
|
||||
### 4.1 Workflow 定义(用户代码)
|
||||
|
||||
```typescript
|
||||
// workflows/cleanup/index.ts
|
||||
import type { WorkflowDefinition } from "@uncaged/nerve-daemon";
|
||||
|
||||
const workflow: WorkflowDefinition = {
|
||||
roles: {
|
||||
scanner: {
|
||||
execute: async (prompt, ctx) => {
|
||||
// 扫描需要清理的资源
|
||||
const stale = await findStaleResources();
|
||||
return { type: "scan_complete", resources: stale };
|
||||
},
|
||||
},
|
||||
cleaner: {
|
||||
execute: async (prompt, ctx) => {
|
||||
// 执行清理
|
||||
await deleteResources(prompt.resources);
|
||||
return { type: "cleanup_done", count: prompt.resources.length };
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
moderate(thread, event) {
|
||||
// 纯函数:决定下一步交给谁
|
||||
if (event.type === "thread_start") {
|
||||
return { role: "scanner", prompt: {} };
|
||||
}
|
||||
if (event.type === "scan_complete") {
|
||||
if (event.resources.length === 0) return null; // 结束
|
||||
return { role: "cleaner", prompt: { resources: event.resources } };
|
||||
}
|
||||
return null; // 结束
|
||||
},
|
||||
};
|
||||
|
||||
export default workflow;
|
||||
```
|
||||
|
||||
### 4.2 类型定义
|
||||
|
||||
```typescript
|
||||
// 用户需要实现的接口
|
||||
export type RoleExecuteFn = (
|
||||
prompt: unknown,
|
||||
ctx: WorkflowContext,
|
||||
) => Promise<CommandEvent>;
|
||||
|
||||
export type Role = {
|
||||
execute: RoleExecuteFn;
|
||||
};
|
||||
|
||||
export type ModerateFn = (
|
||||
thread: ThreadState,
|
||||
event: CommandEvent,
|
||||
) => ModerateResult | null; // null = thread 完成
|
||||
|
||||
export type ModerateResult = {
|
||||
role: string;
|
||||
prompt: unknown;
|
||||
};
|
||||
|
||||
export type WorkflowDefinition = {
|
||||
roles: Record<string, Role>;
|
||||
moderate: ModerateFn;
|
||||
};
|
||||
|
||||
export type WorkflowContext = {
|
||||
runId: string;
|
||||
workflowName: string;
|
||||
log: (message: string) => void;
|
||||
};
|
||||
|
||||
export type CommandEvent = {
|
||||
type: string;
|
||||
[key: string]: unknown;
|
||||
};
|
||||
|
||||
export type ThreadState = {
|
||||
runId: string;
|
||||
events: CommandEvent[]; // 历史事件,供 moderate 做决策
|
||||
};
|
||||
```
|
||||
|
||||
## 5. 运行时架构
|
||||
|
||||
### 5.1 进程模型
|
||||
|
||||
同一个 workflow 的所有 thread 共享一个 worker 进程。`concurrency` 控制进程内的并发 async task 数。
|
||||
|
||||
```
|
||||
nerve-engine (kernel)
|
||||
├─ nerve-worker sense --group system (永驻)
|
||||
├─ nerve-worker sense --group tasks (永驻)
|
||||
├─ nerve-worker workflow --name cleanup (按需启动)
|
||||
└─ nerve-worker workflow --name code-review (按需启动)
|
||||
```
|
||||
|
||||
- 进程数 = workflow 种类数,不会膨胀
|
||||
- 有活跃 thread 时启动,所有 thread 完成后退出(或保持待命 `idleTimeout`,默认 30s)
|
||||
- 崩溃后 engine respawn worker,从持久化状态恢复 thread
|
||||
|
||||
### 5.2 IPC 扩展
|
||||
|
||||
在现有 IPC 协议基础上扩展 workflow 相关消息:
|
||||
|
||||
```typescript
|
||||
// Parent → Workflow Worker
|
||||
type StartThreadMessage = {
|
||||
type: "start-thread";
|
||||
runId: string;
|
||||
workflow: string;
|
||||
triggerPayload: unknown; // Reflex 传入的 signal payload
|
||||
};
|
||||
|
||||
type ResumeThreadMessage = {
|
||||
type: "resume-thread";
|
||||
runId: string;
|
||||
};
|
||||
|
||||
type ShutdownMessage = {
|
||||
type: "shutdown";
|
||||
};
|
||||
|
||||
// Workflow Worker → Parent
|
||||
type ThreadEventMessage = {
|
||||
type: "thread-event";
|
||||
runId: string;
|
||||
eventType: string; // queued, started, step_complete, completed, failed
|
||||
payload: unknown;
|
||||
};
|
||||
|
||||
type WorkflowReadyMessage = {
|
||||
type: "ready";
|
||||
};
|
||||
|
||||
type WorkflowErrorMessage = {
|
||||
type: "error";
|
||||
runId: string;
|
||||
error: string;
|
||||
};
|
||||
```
|
||||
|
||||
### 5.3 Thread 生命周期
|
||||
|
||||
```
|
||||
┌──────────────────────────────────────────┐
|
||||
│ │
|
||||
trigger ──→ queued ──→ started ──→ step_complete ──→ completed
|
||||
│ ↑ │
|
||||
│ └─────┘
|
||||
│
|
||||
└──→ failed
|
||||
└──→ crashed (worker 崩溃)
|
||||
```
|
||||
|
||||
1. Reflex `StartWorkflow` 触发 → kernel 检查 concurrency
|
||||
2. 有空位 → 状态 `started`,发 `start-thread` 给 worker
|
||||
3. 超出 concurrency:
|
||||
- `overflow: drop` → 丢弃,写 `dropped` log
|
||||
- `overflow: queue` → 状态 `queued`,等空位
|
||||
4. Worker 内部循环:`moderate → execute → moderate → execute → ...`
|
||||
5. `moderate` 返回 `null` → `completed`
|
||||
6. Role `execute` 抛异常 → `failed`
|
||||
7. Worker 进程崩溃 → 所有活跃 thread 标记 `crashed`
|
||||
|
||||
### 5.4 Workflow Worker 内部实现
|
||||
|
||||
```typescript
|
||||
// workflow-worker.ts(engine 代码,不是用户代码)
|
||||
async function runThread(
|
||||
def: WorkflowDefinition,
|
||||
runId: string,
|
||||
triggerPayload: unknown,
|
||||
sendToParent: (msg: WorkerToParentMessage) => void,
|
||||
): Promise<void> {
|
||||
const state: ThreadState = { runId, events: [] };
|
||||
const ctx: WorkflowContext = {
|
||||
runId,
|
||||
workflowName,
|
||||
log: (msg) => sendToParent({
|
||||
type: "thread-event", runId, eventType: "step_complete", payload: { message: msg },
|
||||
}),
|
||||
};
|
||||
|
||||
// 初始事件
|
||||
let event: CommandEvent = { type: "thread_start", ...triggerPayload };
|
||||
|
||||
while (true) {
|
||||
state.events.push(event);
|
||||
const next = def.moderate(state, event);
|
||||
if (next === null) {
|
||||
sendToParent({ type: "thread-event", runId, eventType: "completed", payload: null });
|
||||
return;
|
||||
}
|
||||
const role = def.roles[next.role];
|
||||
if (!role) {
|
||||
sendToParent({ type: "error", runId, error: `Unknown role: ${next.role}` });
|
||||
return;
|
||||
}
|
||||
event = await role.execute(next.prompt, ctx);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## 6. 持久化
|
||||
|
||||
### 6.1 事件溯源
|
||||
|
||||
Thread 状态以 append-only 事件流为 source of truth,写入统一的 `logs.db`:
|
||||
|
||||
```sql
|
||||
-- logs 表(已存在于 RFC-001),source = "workflow"
|
||||
-- type 取值:queued, started, step_complete, completed, failed, crashed, dropped
|
||||
```
|
||||
|
||||
### 6.2 物化表
|
||||
|
||||
为避免每次查活跃 workflow 都扫描全表,维护一张物化表,在写 log 的同一事务中 UPSERT:
|
||||
|
||||
```sql
|
||||
CREATE TABLE workflow_runs (
|
||||
run_id TEXT PRIMARY KEY,
|
||||
workflow TEXT NOT NULL,
|
||||
status TEXT NOT NULL, -- queued, started, completed, failed, crashed
|
||||
ts INTEGER NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX idx_workflow_runs_status ON workflow_runs(status);
|
||||
CREATE INDEX idx_workflow_runs_workflow ON workflow_runs(workflow);
|
||||
```
|
||||
|
||||
写入流程(同一事务):
|
||||
|
||||
```sql
|
||||
BEGIN;
|
||||
INSERT INTO logs (source, type, ref_id, payload, ts)
|
||||
VALUES ('workflow', 'started', 'run-7', '{}', 1001);
|
||||
INSERT OR REPLACE INTO workflow_runs (run_id, workflow, status, ts)
|
||||
VALUES ('run-7', 'cleanup', 'started', 1001);
|
||||
COMMIT;
|
||||
```
|
||||
|
||||
物化表是 logs 的派生数据——数据丢失时可从 logs 重建。
|
||||
|
||||
### 6.3 崩溃恢复
|
||||
|
||||
Worker 重启时:
|
||||
|
||||
1. 查 `workflow_runs WHERE status IN ('queued', 'started')`
|
||||
2. `queued` → 重新排队
|
||||
3. `started` → 从 logs 重建 `ThreadState`(事件列表),resume
|
||||
|
||||
幂等 workflow(`overflow: drop`)直接标记 `crashed` 重新来。
|
||||
|
||||
## 7. Kernel 扩展
|
||||
|
||||
### 7.1 WorkflowManager
|
||||
|
||||
Kernel 新增 `WorkflowManager` 组件:
|
||||
|
||||
```typescript
|
||||
export type WorkflowManager = {
|
||||
/** 触发一个 workflow(来自 Reflex) */
|
||||
startWorkflow: (workflowName: string, payload: unknown) => void;
|
||||
/** 获取活跃 thread 数 */
|
||||
activeCount: (workflowName: string) => number;
|
||||
/** 获取队列长度 */
|
||||
queueLength: (workflowName: string) => number;
|
||||
/** 停止所有 workflow */
|
||||
stop: () => Promise<void>;
|
||||
};
|
||||
```
|
||||
|
||||
### 7.2 Reflex Scheduler 扩展
|
||||
|
||||
当前 `ReflexScheduler` 只处理 `kind: "sense"` 的 reflex。扩展为同时处理 `kind: "workflow"`:
|
||||
|
||||
```typescript
|
||||
// reflex-scheduler.ts 扩展
|
||||
if (reflex.kind === "workflow") {
|
||||
const workflowName = reflex.workflow;
|
||||
if (reflex.on !== null && reflex.on.length > 0) {
|
||||
const watchedSenses = new Set(reflex.on);
|
||||
const unsub = bus.subscribe((signal) => {
|
||||
if (watchedSenses.has(signal.senseId)) {
|
||||
workflowManager.startWorkflow(workflowName, signal.payload);
|
||||
}
|
||||
});
|
||||
unsubscribers.push(unsub);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 7.3 热更新
|
||||
|
||||
| 变化 | 处理 |
|
||||
|------|------|
|
||||
| workflow ts 文件修改 | drain(等活跃 thread 完成,`drainTimeout` 后 force kill + 标记 crashed)→ respawn |
|
||||
| nerve.yaml workflow 配置修改 | 更新 concurrency/overflow,不重启 worker |
|
||||
| 新增 workflow | 下次触发时按需启动 worker |
|
||||
| 删除 workflow | drain + 移除 |
|
||||
|
||||
## 8. 实现计划
|
||||
|
||||
### Phase 1:核心类型与 WorkflowManager 骨架
|
||||
|
||||
- [ ] `packages/core/src/types.ts` — 扩展 workflow 相关类型(`WorkflowDefinition`、`ThreadState`、`CommandEvent`、`ModerateResult` 等)
|
||||
- [ ] `packages/daemon/src/ipc.ts` — 扩展 IPC 消息类型(`StartThreadMessage`、`ThreadEventMessage` 等)
|
||||
- [ ] `packages/daemon/src/workflow-manager.ts` — WorkflowManager 实现(并发控制、队列管理、thread 生命周期)
|
||||
- [ ] `packages/daemon/src/workflow-worker.ts` — Workflow worker 进程(加载用户代码、运行 moderate/execute 循环)
|
||||
- [ ] `packages/daemon/src/log-store.ts` — 扩展 LogStore,添加 `workflow_runs` 物化表 + 查询方法
|
||||
- [ ] 单元测试覆盖:并发控制、队列溢出、thread 生命周期
|
||||
|
||||
### Phase 2:Kernel 集成
|
||||
|
||||
- [ ] `packages/daemon/src/kernel.ts` — 集成 WorkflowManager,处理 workflow worker 的生命周期
|
||||
- [ ] `packages/daemon/src/reflex-scheduler.ts` — 扩展支持 `kind: "workflow"` 的 reflex
|
||||
- [ ] 集成测试:Sense signal → Reflex → Workflow 全链路
|
||||
|
||||
### Phase 3:崩溃恢复与热更新
|
||||
|
||||
- [ ] Worker crash recovery — 从持久化状态恢复 thread
|
||||
- [ ] 热更新 — workflow 代码变更时 drain + respawn
|
||||
- [ ] nerve.yaml workflow 配置变更的增量更新
|
||||
|
||||
### Phase 4:CLI 与用户体验
|
||||
|
||||
- [ ] `nerve workflow list` — 查看活跃 workflow
|
||||
- [ ] `nerve workflow inspect <runId>` — 查看 thread 详情
|
||||
- [ ] `nerve workflow trigger <name>` — 手动触发
|
||||
- [ ] scaffold 模板 — `nerve init workflow <name>`
|
||||
Reference in New Issue
Block a user