This repository has been archived on 2026-06-01. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
nerve/docs/rfc-002-workflow-engine.md
T
xiaoju b269f76b33 refactor(daemon): rename reflex-scheduler → sense-scheduler
Rename ReflexScheduler to SenseScheduler, update all file names,
imports, comments, test descriptions, and log source values.

Fixes #202
2026-04-27 12:07:22 +00:00

12 KiB

RFC-002: Workflow Engine

Status: Draft Author: 小橘 🍊(NEKO Team) Date: 2026-04-22

1. 动机

RFC-001 定义了 Sense → Signal → Reflex 的无状态观测循环。当 Reflex 的 action 为 StartWorkflow 时,需要一个有状态的执行引擎来驱动多步骤工作流。本 RFC 定义 Workflow Engine 的完整设计。

2. 概念模型

Signal 循环 ──→ ThreadStart ──→ Thread 循环
(无状态,幂等,可合并)       (有状态,顺序,Command Event 驱动)
                                    │
                               Thread 产出 Log
                              (执行日志,供 retrospection)

2.1 核心概念

  • Workflow:一个有状态工作流的定义,包含并发策略和溢出策略
  • Thread:Workflow 的一次执行实例,有唯一 runId
  • Role:执行具体动作的角色,有副作用(调 API、改文件等)
  • Moderator:在 Role 之间递话筒的调度逻辑(纯函数)
  • CommandEvent:Thread 内部的事件流,驱动 Role 之间的流转

Role 和 Moderator 是 Workflow 的内部细节,不跨 Workflow 共享,不作为顶层扩展点。

2.2 与 Sense 循环的边界

Signal 循环 Thread 循环
状态 无状态 有状态(事件溯源)
幂等性 幂等、可合并、可丢弃 严格顺序、每个 event 必须响应
触发 Reflex Moderator 递话筒
产出 Signal → Bus Log → logs.db

关键约束:Thread 产出的 Log 不能触发 Reflex(见 RFC-001 §2.4),只能被 Sense 的 compute 查询用于 retrospection。

3. 配置

# nerve.yaml
workflows:
  cleanup:
    concurrency: 1          # 同时最多 1 个 thread
    overflow: drop           # 已有活跃 thread 时丢弃新请求
  execute-task:
    concurrency: 10          # 可并行 10 个 thread
    overflow: queue           # 超出时排队
  code-review:
    concurrency: 3
    overflow: queue
    maxQueue: 20             # 队列上限,超出丢弃最旧请求
  • concurrency:同时允许的最大活跃 Thread 数
  • overflow
    • drop:丢弃,适用于幂等操作(如 cleanup)
    • queue:排队等待,适用于每次需要执行的操作
  • maxQueue(仅 overflow: queue):队列上限,默认 100,超出丢弃最旧请求

不需要 throttle——触发频率由上游 Sense 的 throttle 控制。

4. 用户代码结构

~/.uncaged-nerve/
  workflows/
    cleanup/
      index.ts               # workflow 定义
    code-review/
      index.ts

4.1 Workflow 定义(用户代码)

// workflows/cleanup/index.ts
import type { WorkflowDefinition } from "@uncaged/nerve-daemon";

const workflow: WorkflowDefinition = {
  roles: {
    scanner: {
      execute: async (prompt, ctx) => {
        // 扫描需要清理的资源
        const stale = await findStaleResources();
        return { type: "scan_complete", resources: stale };
      },
    },
    cleaner: {
      execute: async (prompt, ctx) => {
        // 执行清理
        await deleteResources(prompt.resources);
        return { type: "cleanup_done", count: prompt.resources.length };
      },
    },
  },

  moderate(thread, event) {
    // 纯函数:决定下一步交给谁
    if (event.type === "thread_start") {
      return { role: "scanner", prompt: {} };
    }
    if (event.type === "scan_complete") {
      if (event.resources.length === 0) return null; // 结束
      return { role: "cleaner", prompt: { resources: event.resources } };
    }
    return null; // 结束
  },
};

export default workflow;

4.2 类型定义

// 用户需要实现的接口
export type RoleExecuteFn = (
  prompt: unknown,
  ctx: WorkflowContext,
) => Promise<CommandEvent>;

export type Role = {
  execute: RoleExecuteFn;
};

export type ModerateFn = (
  thread: ThreadState,
  event: CommandEvent,
) => ModerateResult | null;  // null = thread 完成

export type ModerateResult = {
  role: string;
  prompt: unknown;
};

export type WorkflowDefinition = {
  roles: Record<string, Role>;
  moderate: ModerateFn;
};

export type WorkflowContext = {
  runId: string;
  workflowName: string;
  log: (message: string) => void;
};

export type CommandEvent = {
  type: string;
  [key: string]: unknown;
};

export type ThreadState = {
  runId: string;
  events: CommandEvent[];   // 历史事件,供 moderate 做决策
};

5. 运行时架构

5.1 进程模型

同一个 workflow 的所有 thread 共享一个 worker 进程。concurrency 控制进程内的并发 async task 数。

nerve-engine (kernel)
  ├─ nerve-worker sense --group system          (永驻)
  ├─ nerve-worker sense --group tasks           (永驻)
  ├─ nerve-worker workflow --name cleanup       (按需启动)
  └─ nerve-worker workflow --name code-review   (按需启动)
  • 进程数 = workflow 种类数,不会膨胀
  • 有活跃 thread 时启动,所有 thread 完成后退出(或保持待命 idleTimeout,默认 30s)
  • 崩溃后 engine respawn worker,从持久化状态恢复 thread

5.2 IPC 扩展

在现有 IPC 协议基础上扩展 workflow 相关消息:

// Parent → Workflow Worker
type StartThreadMessage = {
  type: "start-thread";
  runId: string;
  workflow: string;
  triggerPayload: unknown;   // Reflex 传入的 signal payload
};

type ResumeThreadMessage = {
  type: "resume-thread";
  runId: string;
};

type ShutdownMessage = {
  type: "shutdown";
};

// Workflow Worker → Parent
type ThreadEventMessage = {
  type: "thread-event";
  runId: string;
  eventType: string;         // queued, started, step_complete, completed, failed
  payload: unknown;
};

type WorkflowReadyMessage = {
  type: "ready";
};

type WorkflowErrorMessage = {
  type: "error";
  runId: string;
  error: string;
};

5.3 Thread 生命周期

           ┌──────────────────────────────────────────┐
           │                                          │
  trigger ──→ queued ──→ started ──→ step_complete ──→ completed
                                        │       ↑     │
                                        │       └─────┘
                                        │
                                        └──→ failed
                                        └──→ crashed (worker 崩溃)
  1. Reflex StartWorkflow 触发 → kernel 检查 concurrency
  2. 有空位 → 状态 started,发 start-thread 给 worker
  3. 超出 concurrency:
    • overflow: drop → 丢弃,写 dropped log
    • overflow: queue → 状态 queued,等空位
  4. Worker 内部循环:moderate → execute → moderate → execute → ...
  5. moderate 返回 nullcompleted
  6. Role execute 抛异常 → failed
  7. Worker 进程崩溃 → 所有活跃 thread 标记 crashed

5.4 Workflow Worker 内部实现

// workflow-worker.ts(engine 代码,不是用户代码)
async function runThread(
  def: WorkflowDefinition,
  runId: string,
  triggerPayload: unknown,
  sendToParent: (msg: WorkerToParentMessage) => void,
): Promise<void> {
  const state: ThreadState = { runId, events: [] };
  const ctx: WorkflowContext = {
    runId,
    workflowName,
    log: (msg) => sendToParent({
      type: "thread-event", runId, eventType: "step_complete", payload: { message: msg },
    }),
  };

  // 初始事件
  let event: CommandEvent = { type: "thread_start", ...triggerPayload };

  while (true) {
    state.events.push(event);
    const next = def.moderate(state, event);
    if (next === null) {
      sendToParent({ type: "thread-event", runId, eventType: "completed", payload: null });
      return;
    }
    const role = def.roles[next.role];
    if (!role) {
      sendToParent({ type: "error", runId, error: `Unknown role: ${next.role}` });
      return;
    }
    event = await role.execute(next.prompt, ctx);
  }
}

6. 持久化

6.1 事件溯源

Thread 状态以 append-only 事件流为 source of truth,写入统一的 logs.db

-- logs 表(已存在于 RFC-001),source = "workflow"
-- type 取值:queued, started, step_complete, completed, failed, crashed, dropped

6.2 物化表

为避免每次查活跃 workflow 都扫描全表,维护一张物化表,在写 log 的同一事务中 UPSERT:

CREATE TABLE workflow_runs (
  run_id    TEXT PRIMARY KEY,
  workflow  TEXT NOT NULL,
  status    TEXT NOT NULL,       -- queued, started, completed, failed, crashed
  ts        INTEGER NOT NULL
);

CREATE INDEX idx_workflow_runs_status ON workflow_runs(status);
CREATE INDEX idx_workflow_runs_workflow ON workflow_runs(workflow);

写入流程(同一事务):

BEGIN;
INSERT INTO logs (source, type, ref_id, payload, ts)
  VALUES ('workflow', 'started', 'run-7', '{}', 1001);
INSERT OR REPLACE INTO workflow_runs (run_id, workflow, status, ts)
  VALUES ('run-7', 'cleanup', 'started', 1001);
COMMIT;

物化表是 logs 的派生数据——数据丢失时可从 logs 重建。

6.3 崩溃恢复

Worker 重启时:

  1. workflow_runs WHERE status IN ('queued', 'started')
  2. queued → 重新排队
  3. started → 从 logs 重建 ThreadState(事件列表),resume

幂等 workflow(overflow: drop)直接标记 crashed 重新来。

7. Kernel 扩展

7.1 WorkflowManager

Kernel 新增 WorkflowManager 组件:

export type WorkflowManager = {
  /** 触发一个 workflow(来自 Reflex) */
  startWorkflow: (workflowName: string, payload: unknown) => void;
  /** 获取活跃 thread 数 */
  activeCount: (workflowName: string) => number;
  /** 获取队列长度 */
  queueLength: (workflowName: string) => number;
  /** 停止所有 workflow */
  stop: () => Promise<void>;
};

7.2 Sense Scheduler 扩展

当前 SenseScheduler 只处理 kind: "sense" 的 reflex。扩展为同时处理 kind: "workflow"

// sense-scheduler.ts 扩展
if (reflex.kind === "workflow") {
  const workflowName = reflex.workflow;
  if (reflex.on !== null && reflex.on.length > 0) {
    const watchedSenses = new Set(reflex.on);
    const unsub = bus.subscribe((signal) => {
      if (watchedSenses.has(signal.senseId)) {
        workflowManager.startWorkflow(workflowName, signal.payload);
      }
    });
    unsubscribers.push(unsub);
  }
}

7.3 热更新

变化 处理
workflow ts 文件修改 drain(等活跃 thread 完成,drainTimeout 后 force kill + 标记 crashed)→ respawn
nerve.yaml workflow 配置修改 更新 concurrency/overflow,不重启 worker
新增 workflow 下次触发时按需启动 worker
删除 workflow drain + 移除

8. 实现计划

Phase 1:核心类型与 WorkflowManager 骨架

  • packages/core/src/types.ts — 扩展 workflow 相关类型(WorkflowDefinitionThreadStateCommandEventModerateResult 等)
  • packages/daemon/src/ipc.ts — 扩展 IPC 消息类型(StartThreadMessageThreadEventMessage 等)
  • packages/daemon/src/workflow-manager.ts — WorkflowManager 实现(并发控制、队列管理、thread 生命周期)
  • packages/daemon/src/workflow-worker.ts — Workflow worker 进程(加载用户代码、运行 moderate/execute 循环)
  • packages/daemon/src/log-store.ts — 扩展 LogStore,添加 workflow_runs 物化表 + 查询方法
  • 单元测试覆盖:并发控制、队列溢出、thread 生命周期

Phase 2:Kernel 集成

  • packages/daemon/src/kernel.ts — 集成 WorkflowManager,处理 workflow worker 的生命周期
  • packages/daemon/src/sense-scheduler.ts — 扩展支持 kind: "workflow" 的 reflex
  • 集成测试:Sense signal → SenseScheduler → Workflow 全链路

Phase 3:崩溃恢复与热更新

  • Worker crash recovery — 从持久化状态恢复 thread
  • 热更新 — workflow 代码变更时 drain + respawn
  • nerve.yaml workflow 配置变更的增量更新

Phase 4:CLI 与用户体验

  • nerve workflow list — 查看活跃 workflow
  • nerve workflow inspect <runId> — 查看 thread 详情
  • nerve workflow trigger <name> — 手动触发
  • scaffold 模板 — nerve init workflow <name>