docs: Baton v2 — 简化数据模型 + Queue 事件驱动
核心改动:
- Baton 数据模型大幅简化:核心就是一段 prompt
- 工具从 allowlist 改为 hints(建议而非限制)
- 调度方案改为 Queue-first(天然匹配事件模型)
- 去掉 goal/context/tools/constraints 等冗余字段
- 删除 A/B/C 分阶段方案,直接用 Queue
小橘 🍊 (NEKO Team)
This commit is contained in:
parent
b2c04d49d6
commit
5d0770b10e
@ -29,32 +29,23 @@ tags: [baton, uncaged, architecture, serverless, task-scheduling]
|
||||
|
||||
### Baton(接力棒)
|
||||
|
||||
一个 Baton 是一个**完整的、自包含的任务描述**。任何一个无状态的 worker 拿到它就能开始工作:
|
||||
一个 Baton 是一个**自包含的任务描述**。它的核心就是一段 prompt——用自然语言完整描述了"要做什么"。
|
||||
|
||||
```typescript
|
||||
interface Baton {
|
||||
id: string // "bt_abc123"
|
||||
|
||||
// ── 任务定义 ──
|
||||
goal: string // 要完成什么
|
||||
context: Record<string, any> // 上下文信息(用户、来源、任何相关数据)
|
||||
tools?: string[] // 工具白名单(空 = 全部可用)
|
||||
prompt?: string // 针对这个任务的额外指令
|
||||
constraints?: {
|
||||
max_rounds?: number // agentic loop 最大轮数
|
||||
timeout_hint?: number // 建议执行时间(秒),worker 据此决定是否 breakdown
|
||||
}
|
||||
|
||||
// ── 任务树 ──
|
||||
parent_id?: string // 父 Baton(null = 根任务)
|
||||
children?: string[] // 子 Baton ID 列表(breakdown 时填入)
|
||||
depth: number // 递归深度(根 = 0)
|
||||
|
||||
|
||||
// ── 核心:任务就是一段 prompt ──
|
||||
prompt: string // 完整的任务描述(目标、上下文、约束,全在里面)
|
||||
hints?: string[] // 建议的工具名(帮 worker 快速 ramp up,不是限制)
|
||||
|
||||
// ── 状态 ──
|
||||
status: 'pending' | 'running' | 'completed' | 'failed' | 'spawned'
|
||||
result?: any // 执行结果(completed 时)
|
||||
error?: string // 错误信息(failed 时)
|
||||
|
||||
result?: string // 执行结果
|
||||
error?: string // 错误信息
|
||||
|
||||
// ── 元数据 ──
|
||||
created_at: number
|
||||
updated_at: number
|
||||
@ -63,24 +54,34 @@ interface Baton {
|
||||
}
|
||||
```
|
||||
|
||||
为什么这么简单?
|
||||
|
||||
**因为 worker 就是一个 LLM agentic loop。** LLM 最擅长理解的就是自然语言。把任务硬拆成 `goal + context + tools + constraints + max_rounds + timeout_hint`,是在用结构化字段**模拟自然语言已经能表达的东西**。
|
||||
|
||||
一段好的 prompt 里可以包含一切:
|
||||
|
||||
> "查询北京当前天气。这是用户 Scott 在 Telegram 上的请求。可以试试 cap_weather 工具。如果没有现成的天气工具,从 Sigil 搜一个或者创建一个。"
|
||||
|
||||
目标、上下文、工具建议、备选方案——全在一段话里。自然、完整、不需要额外的 schema。
|
||||
|
||||
**工具是建议,不是围栏。** `hints` 里列出的工具名帮 worker 快速找到起点,但 worker 作为一个完整的 agent,完全有能力自己通过 Sigil query 发现和加载更多工具。建议是 ramp up 的加速器,不是权限的边界。
|
||||
|
||||
### Worker
|
||||
|
||||
Worker 是**无状态的执行器**。它不知道自己是"主 agent"还是"子 agent"——这个区别不存在。它只知道:
|
||||
|
||||
1. 拿到一个 Baton
|
||||
2. 执行它
|
||||
2. 读 prompt,干活
|
||||
3. 报告结果
|
||||
|
||||
```typescript
|
||||
async function executeBaton(baton: Baton): Promise<void> {
|
||||
// 唯一的决策:我能在时间窗口内完成吗?
|
||||
if (shouldBreakdown(baton)) {
|
||||
// 太大了 → 拆分
|
||||
const children = await planBreakdown(baton)
|
||||
await spawnChildren(baton.id, children)
|
||||
await updateStatus(baton.id, 'spawned')
|
||||
} else {
|
||||
// 可以完成 → 执行
|
||||
try {
|
||||
const result = await runAgentLoop(baton)
|
||||
await complete(baton.id, result)
|
||||
@ -99,112 +100,131 @@ async function executeBaton(baton: Baton): Promise<void> {
|
||||
|------|------|----------|
|
||||
| **completed** | 任务完成,result 填入 | 触发 parent 的 children check |
|
||||
| **failed** | 任务失败,error 填入 | 触发 parent 的错误处理 |
|
||||
| **spawned** | 任务太大,已拆分成子 Baton | 子 Baton 进入 pending,等待调度 |
|
||||
| **spawned** | 任务太大,已拆分成子 Baton | 子 Baton 入队,等待调度 |
|
||||
|
||||
第三种是递归的——子 Baton 也可以再拆分。
|
||||
第三种是递归的——子 Baton 也可以再拆分。形成一棵任务树,叶子节点是实际执行,非叶子节点是协调。
|
||||
|
||||
### 事件驱动调度
|
||||
### Breakdown 决策
|
||||
|
||||
**没有轮询,没有长连接。** 全靠事件:
|
||||
Worker 怎么判断"我能在当前执行窗口内完成吗"?
|
||||
|
||||
```
|
||||
Baton 状态变更 → 事件 → 调度器检查 → 触发下一步
|
||||
```
|
||||
**直接问 LLM。**
|
||||
|
||||
核心调度逻辑:
|
||||
Worker 的 system prompt 里包含执行窗口的信息:
|
||||
|
||||
> "你有一个有限的执行窗口。如果你认为当前任务无法在窗口内完成,请把它拆分成可以独立完成的子任务。每个子任务应该是自包含的——另一个 worker 拿到它就能独立执行。"
|
||||
|
||||
LLM 天然擅长判断任务复杂度。"查天气"→ 一轮就搞定,直接做。"写一篇竞品分析报告"→ 需要搜索 + 对比 + 整理 + 写作,拆分。
|
||||
|
||||
**递归的自然退出条件**:当任务小到一个 worker 能在窗口内完成时,递归就停了。不需要预设"最多拆几层"——**任务的复杂度决定递归的深度。**
|
||||
|
||||
## 事件驱动调度
|
||||
|
||||
### 核心机制:Queue
|
||||
|
||||
Baton 的调度通过 **CF Queues** 实现。每一次状态变更就是一个事件,事件通过队列传递:
|
||||
|
||||
```typescript
|
||||
async function onBatonStatusChange(batonId: string, newStatus: string) {
|
||||
const baton = await loadBaton(batonId)
|
||||
|
||||
if (newStatus === 'completed' || newStatus === 'failed') {
|
||||
if (baton.parent_id) {
|
||||
// 有 parent → 检查所有 siblings 是否都完成了
|
||||
const parent = await loadBaton(baton.parent_id)
|
||||
const children = await loadChildren(parent.id)
|
||||
const allDone = children.every(c => c.status === 'completed' || c.status === 'failed')
|
||||
|
||||
if (allDone) {
|
||||
// 所有子任务完成 → 唤醒 parent 继续执行
|
||||
const childResults = children.map(c => ({ id: c.id, goal: c.goal, result: c.result, error: c.error }))
|
||||
await resumeParent(parent, childResults)
|
||||
}
|
||||
} else {
|
||||
// 没有 parent → 根任务完成 → 通知用户
|
||||
if (baton.notify) {
|
||||
await notifyUser(baton)
|
||||
}
|
||||
// 创建新 Baton → 入队
|
||||
await env.BATON_QUEUE.send({ batonId: child.id, event: 'created' })
|
||||
|
||||
// Baton 完成 → 入队通知 parent
|
||||
await env.BATON_QUEUE.send({ batonId: baton.parent_id, event: 'child_completed', childId: baton.id })
|
||||
```
|
||||
|
||||
Queue Consumer 是事件循环的核心:
|
||||
|
||||
```typescript
|
||||
async queue(batch: MessageBatch<BatonEvent>, env: Env) {
|
||||
for (const msg of batch.messages) {
|
||||
const { batonId, event } = msg.body
|
||||
|
||||
switch (event) {
|
||||
case 'created':
|
||||
// 新 Baton → 派发执行
|
||||
await executeBaton(await loadBaton(batonId, env), env)
|
||||
break
|
||||
|
||||
case 'child_completed':
|
||||
case 'child_failed':
|
||||
// 子任务完成 → 检查是否所有 children 都完成了
|
||||
const parent = await loadBaton(batonId, env)
|
||||
const children = await loadChildren(batonId, env)
|
||||
const allDone = children.every(c =>
|
||||
c.status === 'completed' || c.status === 'failed'
|
||||
)
|
||||
|
||||
if (allDone) {
|
||||
// 所有子任务完成 → 唤醒 parent,带上子任务结果
|
||||
const results = children.map(c => ({
|
||||
goal: c.prompt.slice(0, 100),
|
||||
result: c.result,
|
||||
error: c.error,
|
||||
}))
|
||||
await resumeParent(parent, results, env)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if (newStatus === 'pending') {
|
||||
// 新任务 → 派发给 worker 执行
|
||||
await dispatchToWorker(baton)
|
||||
|
||||
msg.ack()
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 为什么是 Queue
|
||||
|
||||
| | waitUntil 自调用 | **Queue** | Durable Objects |
|
||||
|--|--|--|--|
|
||||
| 事件驱动 | ❌ 在模拟 | **✅ 天然** | ✅ |
|
||||
| 重试 | ❌ 需手写 | **✅ 内置** | ✅ |
|
||||
| 并发控制 | ❌ 无 | **✅ batch + concurrency** | ✅ |
|
||||
| 死信处理 | ❌ 无 | **✅ DLQ** | ❌ 需手写 |
|
||||
| 复杂度 | 低 | **低** | 高 |
|
||||
| 解耦 | ❌ 调度和执行耦合 | **✅ 完全解耦** | ✅ |
|
||||
|
||||
Queue 和 Baton 的事件驱动模型是**天然匹配**的。状态变更 = 事件 = 消息。用 HTTP 自调用来模拟事件是 workaround,Queue 才是正解。
|
||||
|
||||
### 事件接力图
|
||||
|
||||
```
|
||||
用户消息
|
||||
│
|
||||
▼
|
||||
┌─────────────────────┐
|
||||
│ 请求 1: Worker │
|
||||
│ 拿到 bt_root │
|
||||
│ → 太大,breakdown │
|
||||
│ → spawn bt_a, bt_b │
|
||||
│ → 退出 │
|
||||
└─────────────────────┘
|
||||
│ │
|
||||
▼ ▼
|
||||
┌──────────┐ ┌──────────┐
|
||||
│ 请求 2 │ │ 请求 3 │ ← 并发!
|
||||
│ 执行 bt_a │ │ 执行 bt_b │
|
||||
│ → completed│ │ → completed│
|
||||
└──────────┘ └──────────┘
|
||||
│ │
|
||||
└───────┬────────┘
|
||||
▼
|
||||
┌─────────────────────┐
|
||||
│ 请求 4: Worker │
|
||||
│ bt_root 被唤醒 │
|
||||
│ → 汇总 bt_a + bt_b │
|
||||
│ → completed │
|
||||
│ → 通知用户 │
|
||||
└─────────────────────┘
|
||||
┌─────────────────────────┐
|
||||
│ Worker A │
|
||||
│ 拿到 bt_root │
|
||||
│ → 太大,breakdown │
|
||||
│ → 写入 bt_a, bt_b 到 D1 │
|
||||
│ → 入队 {bt_a, created} │
|
||||
│ → 入队 {bt_b, created} │
|
||||
│ → 退出 │
|
||||
└─────────────────────────┘
|
||||
│
|
||||
┌────┴────┐
|
||||
▼ ▼ Queue Consumer 并发消费
|
||||
┌────────┐ ┌────────┐
|
||||
│Worker B│ │Worker C│
|
||||
│执行 bt_a│ │执行 bt_b│
|
||||
│→ done │ │→ done │
|
||||
│→ 入队 │ │→ 入队 │
|
||||
│ {root, │ │ {root, │
|
||||
│ child_ │ │ child_ │
|
||||
│ done} │ │ done} │
|
||||
└────────┘ └────────┘
|
||||
│ │
|
||||
└────┬────┘
|
||||
▼
|
||||
┌─────────────────────────┐
|
||||
│ Worker D │
|
||||
│ bt_root 被唤醒 │
|
||||
│ → 读取 bt_a + bt_b 结果 │
|
||||
│ → 汇总 → completed │
|
||||
│ → 通知用户 │
|
||||
│ → 退出 │
|
||||
└─────────────────────────┘
|
||||
```
|
||||
|
||||
每个请求都是短暂的。没有任何一个 Worker 需要跑超过执行窗口。但整个任务可以跨越任意长的时间、任意深的递归。
|
||||
|
||||
**用事件接力代替长进程。**
|
||||
|
||||
### Breakdown 决策
|
||||
|
||||
Worker 怎么判断"我能完成吗"?两个信号:
|
||||
|
||||
**1. 时间预估(硬约束)**
|
||||
|
||||
```typescript
|
||||
function shouldBreakdown(baton: Baton): boolean {
|
||||
const timeHint = baton.constraints?.timeout_hint || 25 // 默认 25 秒窗口
|
||||
const estimatedRounds = estimateRounds(baton.goal, baton.tools)
|
||||
const avgRoundTime = 5 // 每轮 LLM 调用约 5 秒
|
||||
|
||||
return estimatedRounds * avgRoundTime > timeHint
|
||||
}
|
||||
```
|
||||
|
||||
**2. LLM 判断(软约束)**
|
||||
|
||||
也可以直接问 LLM:
|
||||
|
||||
> "你有约 25 秒的执行窗口。以下任务能在窗口内完成吗?如果不能,请拆分成可以独立完成的子任务。"
|
||||
|
||||
LLM 天然擅长判断任务复杂度。如果它认为"查天气"一轮就搞定,就直接做;如果认为"写一篇分析报告"需要搜索 + 整理 + 写作,就拆分。
|
||||
|
||||
**递归的自然退出条件**:任务小到一个 worker 能在时间窗口内完成时,递归就停了。不需要预设"最多拆几层"——复杂度决定深度。
|
||||
每个 Worker 都是短暂的。没有长进程。但整个任务树可以任意深、任意宽。
|
||||
|
||||
## 存储层
|
||||
|
||||
@ -215,28 +235,24 @@ CREATE TABLE batons (
|
||||
id TEXT PRIMARY KEY,
|
||||
parent_id TEXT,
|
||||
depth INTEGER DEFAULT 0,
|
||||
|
||||
-- 任务定义
|
||||
goal TEXT NOT NULL,
|
||||
context TEXT, -- JSON
|
||||
tools TEXT, -- JSON array
|
||||
prompt TEXT,
|
||||
max_rounds INTEGER DEFAULT 6,
|
||||
timeout_hint INTEGER DEFAULT 25,
|
||||
|
||||
|
||||
-- 核心
|
||||
prompt TEXT NOT NULL,
|
||||
hints TEXT, -- JSON array,建议工具名
|
||||
|
||||
-- 状态
|
||||
status TEXT DEFAULT 'pending', -- pending/running/completed/failed/spawned
|
||||
result TEXT, -- JSON
|
||||
status TEXT DEFAULT 'pending',
|
||||
result TEXT,
|
||||
error TEXT,
|
||||
|
||||
|
||||
-- 通知
|
||||
channel TEXT, -- telegram / api / a2a
|
||||
channel TEXT,
|
||||
notify INTEGER DEFAULT 0,
|
||||
|
||||
|
||||
-- 时间
|
||||
created_at INTEGER NOT NULL,
|
||||
updated_at INTEGER NOT NULL,
|
||||
|
||||
|
||||
FOREIGN KEY (parent_id) REFERENCES batons(id)
|
||||
);
|
||||
|
||||
@ -247,88 +263,35 @@ CREATE INDEX idx_batons_status ON batons(status);
|
||||
### 为什么是 D1 而不是 KV
|
||||
|
||||
- **强一致性** — 状态机需要 read-then-write 原子性,KV 的 60 秒最终一致性会导致 race condition
|
||||
- **SQL 查询** — "查找某个 parent 下所有 children 的状态"是高频操作,D1 原生支持
|
||||
- **SQL 查询** — "查找某个 parent 下所有 children 的状态"是高频操作,SQL 原生支持
|
||||
- **事务** — 更新 Baton 状态 + 检查 siblings 需要在同一个事务里
|
||||
|
||||
## 调度机制
|
||||
|
||||
### CF Workers 实现
|
||||
|
||||
在 CF Workers 环境下,调度有几种方式:
|
||||
|
||||
**方案 A:自调用 + waitUntil(推荐起步)**
|
||||
|
||||
```typescript
|
||||
// 创建子 Baton 后,通过 waitUntil 异步触发执行
|
||||
for (const child of children) {
|
||||
ctx.waitUntil(
|
||||
fetch(`https://doudou.shazhou.work/baton/${child.id}/run`, {
|
||||
method: 'POST',
|
||||
headers: { 'Authorization': `Bearer ${token}` },
|
||||
})
|
||||
)
|
||||
}
|
||||
```
|
||||
|
||||
- ✅ 零额外基础设施
|
||||
- ✅ 天然并发(多个 waitUntil 并行)
|
||||
- ⚠️ 需要 Custom Domain(避免同 account Worker 互调限制)
|
||||
|
||||
**方案 B:Queue + Consumer**
|
||||
|
||||
```typescript
|
||||
// Baton 状态变更时推入 Queue
|
||||
await env.BATON_QUEUE.send({ batonId: child.id, action: 'execute' })
|
||||
```
|
||||
|
||||
CF Queues 的 Consumer 天然是事件驱动的,重试、死信队列都有。
|
||||
|
||||
- ✅ 真正的异步,解耦调度和执行
|
||||
- ✅ 内置重试和错误处理
|
||||
- ⚠️ 需要配置 Queue binding
|
||||
|
||||
**方案 C:Durable Objects 协调器(远期)**
|
||||
|
||||
一个 DO 实例管理整棵任务树的状态。
|
||||
|
||||
- ✅ 强一致 + 实时事件
|
||||
- ⚠️ 复杂度高,起步不需要
|
||||
|
||||
### 建议路径
|
||||
|
||||
**Phase 1**:方案 A(自调用 + waitUntil)。验证 Baton 模型本身是否 work。
|
||||
|
||||
**Phase 2**:方案 B(Queue)。当任务量上来需要更可靠的调度时引入。
|
||||
|
||||
**Phase 3**:方案 C(DO)。当需要实时状态推送、复杂协调逻辑时考虑。
|
||||
|
||||
## 与 Uncaged 集成
|
||||
|
||||
### 新端点
|
||||
|
||||
```
|
||||
POST /baton → 创建 Baton(外部触发)
|
||||
POST /baton/:id/run → 执行 Baton(内部调度)
|
||||
POST /baton → 创建 Baton(外部触发,入队执行)
|
||||
GET /baton/:id → 查询状态
|
||||
GET /baton/:id/tree → 查询完整任务树
|
||||
```
|
||||
|
||||
### 新内置 Tool
|
||||
|
||||
LLM 在 agentic loop 中可以调用 `spawn_task` 创建并发子任务:
|
||||
|
||||
```typescript
|
||||
// 主 agent 的 agentic loop 中可用
|
||||
{
|
||||
name: "spawn_task",
|
||||
description: "创建一个并发子任务。任务会被独立执行,完成后结果自动汇总。",
|
||||
description: "创建一个并发子任务。会被独立执行,完成后结果自动汇总回来。",
|
||||
parameters: {
|
||||
goal: { type: "string", description: "子任务目标" },
|
||||
tools: { type: "array", description: "工具白名单(可选)" },
|
||||
context: { type: "object", description: "额外上下文(可选)" },
|
||||
prompt: { type: "string", description: "完整的任务描述" },
|
||||
hints: { type: "array", description: "建议使用的工具(可选,仅供参考)" },
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
LLM 可以在 agentic loop 中调用 `spawn_task` 创建并发子任务。当所有子任务完成后,结果自动注入到主 agent 的下一轮对话中。
|
||||
当所有 spawn 的子任务完成后,结果自动注入到主 agent 的下一轮对话中。
|
||||
|
||||
### 用户体验
|
||||
|
||||
@ -338,13 +301,9 @@ LLM 可以在 agentic loop 中调用 `spawn_task` 创建并发子任务。当所
|
||||
2. 豆豆说"让我想想…"(或直接开始回复)
|
||||
3. 一段时间后,收到完整的回复
|
||||
|
||||
如果任务很快(单个 Baton 直接完成),体验和现在一样。如果任务复杂(breakdown 了好几层),用户只是等得稍微久一点,但最终收到的是一个汇总好的完整回答。
|
||||
如果任务简单(直接完成),体验和现在一样。如果任务复杂(breakdown 了好几层),只是等得稍微久一点,但最终收到的是汇总好的完整回答。
|
||||
|
||||
可选的透明度增强:
|
||||
|
||||
- 豆豆可以先发一条"我正在并行处理 3 个子任务…"
|
||||
- 子任务完成时逐个推送进展
|
||||
- 这由根 Baton 的 `notify` 策略控制
|
||||
可选:豆豆可以先发"我正在并行处理 3 个子任务…",逐步推送进展。这由根 Baton 的 `notify` 策略控制。
|
||||
|
||||
## 与 Sigil 的关系
|
||||
|
||||
@ -362,11 +321,13 @@ Sigil + Baton = **agent 既不需要预装所有工具,也不需要一口气
|
||||
|
||||
## 设计原则
|
||||
|
||||
1. **任务是一等公民,agent 不是** — Baton 是主语,worker 是动词。没有"子 agent"的概念。
|
||||
2. **无状态 worker** — 任何 worker 拿到任何 Baton 都能执行。不依赖特定实例。
|
||||
3. **事件驱动** — 没有轮询,没有长连接。状态变更触发下一步。
|
||||
4. **递归 breakdown 自然收敛** — 时间窗口是唯一约束,任务复杂度决定递归深度。
|
||||
5. **用户无感** — Baton 是内部实现,用户只看到"发消息 → 收到回复"。
|
||||
1. **Baton 就是一段 prompt** — 不要用结构化字段模拟自然语言已经能表达的东西。
|
||||
2. **工具是建议,不是围栏** — hints 帮 worker 快速 ramp up,worker 可以自由发现更多工具。
|
||||
3. **任务是一等公民,agent 不是** — Baton 是主语,worker 是动词。
|
||||
4. **无状态 worker** — 任何 worker 拿到任何 Baton 都能执行。
|
||||
5. **事件驱动** — Queue 天然就是事件总线。状态变更 = 消息。
|
||||
6. **递归 breakdown 自然收敛** — 任务复杂度决定递归深度,不需要硬编码层数限制。
|
||||
7. **用户无感** — Baton 是内部机制,用户只看到"发消息 → 收到回复"。
|
||||
|
||||
---
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user