From 5d0770b10ee1ad749d5f6d9c669fe4f789bdb87f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Sat, 4 Apr 2026 14:18:48 +0000 Subject: [PATCH] =?UTF-8?q?docs:=20Baton=20v2=20=E2=80=94=20=E7=AE=80?= =?UTF-8?q?=E5=8C=96=E6=95=B0=E6=8D=AE=E6=A8=A1=E5=9E=8B=20+=20Queue=20?= =?UTF-8?q?=E4=BA=8B=E4=BB=B6=E9=A9=B1=E5=8A=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 核心改动: - Baton 数据模型大幅简化:核心就是一段 prompt - 工具从 allowlist 改为 hints(建议而非限制) - 调度方案改为 Queue-first(天然匹配事件模型) - 去掉 goal/context/tools/constraints 等冗余字段 - 删除 A/B/C 分阶段方案,直接用 Queue 小橘 🍊 (NEKO Team) --- docs/shared/baton-task-relay.md | 349 ++++++++++++++------------------ 1 file changed, 155 insertions(+), 194 deletions(-) diff --git a/docs/shared/baton-task-relay.md b/docs/shared/baton-task-relay.md index 412eb21..419497a 100644 --- a/docs/shared/baton-task-relay.md +++ b/docs/shared/baton-task-relay.md @@ -29,32 +29,23 @@ tags: [baton, uncaged, architecture, serverless, task-scheduling] ### Baton(接力棒) -一个 Baton 是一个**完整的、自包含的任务描述**。任何一个无状态的 worker 拿到它就能开始工作: +一个 Baton 是一个**自包含的任务描述**。它的核心就是一段 prompt——用自然语言完整描述了"要做什么"。 ```typescript interface Baton { id: string // "bt_abc123" - - // ── 任务定义 ── - goal: string // 要完成什么 - context: Record // 上下文信息(用户、来源、任何相关数据) - tools?: string[] // 工具白名单(空 = 全部可用) - prompt?: string // 针对这个任务的额外指令 - constraints?: { - max_rounds?: number // agentic loop 最大轮数 - timeout_hint?: number // 建议执行时间(秒),worker 据此决定是否 breakdown - } - - // ── 任务树 ── parent_id?: string // 父 Baton(null = 根任务) - children?: string[] // 子 Baton ID 列表(breakdown 时填入) depth: number // 递归深度(根 = 0) - + + // ── 核心:任务就是一段 prompt ── + prompt: string // 完整的任务描述(目标、上下文、约束,全在里面) + hints?: string[] // 建议的工具名(帮 worker 快速 ramp up,不是限制) + // ── 状态 ── status: 'pending' | 'running' | 'completed' | 'failed' | 'spawned' - result?: any // 执行结果(completed 时) - error?: string // 错误信息(failed 时) - + result?: string // 执行结果 + error?: string // 错误信息 + // ── 元数据 ── created_at: number updated_at: number @@ -63,24 +54,34 @@ interface Baton { } ``` +为什么这么简单? + +**因为 worker 就是一个 LLM agentic loop。** LLM 最擅长理解的就是自然语言。把任务硬拆成 `goal + context + tools + constraints + max_rounds + timeout_hint`,是在用结构化字段**模拟自然语言已经能表达的东西**。 + +一段好的 prompt 里可以包含一切: + +> "查询北京当前天气。这是用户 Scott 在 Telegram 上的请求。可以试试 cap_weather 工具。如果没有现成的天气工具,从 Sigil 搜一个或者创建一个。" + +目标、上下文、工具建议、备选方案——全在一段话里。自然、完整、不需要额外的 schema。 + +**工具是建议,不是围栏。** `hints` 里列出的工具名帮 worker 快速找到起点,但 worker 作为一个完整的 agent,完全有能力自己通过 Sigil query 发现和加载更多工具。建议是 ramp up 的加速器,不是权限的边界。 + ### Worker Worker 是**无状态的执行器**。它不知道自己是"主 agent"还是"子 agent"——这个区别不存在。它只知道: 1. 拿到一个 Baton -2. 执行它 +2. 读 prompt,干活 3. 报告结果 ```typescript async function executeBaton(baton: Baton): Promise { // 唯一的决策:我能在时间窗口内完成吗? if (shouldBreakdown(baton)) { - // 太大了 → 拆分 const children = await planBreakdown(baton) await spawnChildren(baton.id, children) await updateStatus(baton.id, 'spawned') } else { - // 可以完成 → 执行 try { const result = await runAgentLoop(baton) await complete(baton.id, result) @@ -99,112 +100,131 @@ async function executeBaton(baton: Baton): Promise { |------|------|----------| | **completed** | 任务完成,result 填入 | 触发 parent 的 children check | | **failed** | 任务失败,error 填入 | 触发 parent 的错误处理 | -| **spawned** | 任务太大,已拆分成子 Baton | 子 Baton 进入 pending,等待调度 | +| **spawned** | 任务太大,已拆分成子 Baton | 子 Baton 入队,等待调度 | -第三种是递归的——子 Baton 也可以再拆分。 +第三种是递归的——子 Baton 也可以再拆分。形成一棵任务树,叶子节点是实际执行,非叶子节点是协调。 -### 事件驱动调度 +### Breakdown 决策 -**没有轮询,没有长连接。** 全靠事件: +Worker 怎么判断"我能在当前执行窗口内完成吗"? -``` -Baton 状态变更 → 事件 → 调度器检查 → 触发下一步 -``` +**直接问 LLM。** -核心调度逻辑: +Worker 的 system prompt 里包含执行窗口的信息: + +> "你有一个有限的执行窗口。如果你认为当前任务无法在窗口内完成,请把它拆分成可以独立完成的子任务。每个子任务应该是自包含的——另一个 worker 拿到它就能独立执行。" + +LLM 天然擅长判断任务复杂度。"查天气"→ 一轮就搞定,直接做。"写一篇竞品分析报告"→ 需要搜索 + 对比 + 整理 + 写作,拆分。 + +**递归的自然退出条件**:当任务小到一个 worker 能在窗口内完成时,递归就停了。不需要预设"最多拆几层"——**任务的复杂度决定递归的深度。** + +## 事件驱动调度 + +### 核心机制:Queue + +Baton 的调度通过 **CF Queues** 实现。每一次状态变更就是一个事件,事件通过队列传递: ```typescript -async function onBatonStatusChange(batonId: string, newStatus: string) { - const baton = await loadBaton(batonId) - - if (newStatus === 'completed' || newStatus === 'failed') { - if (baton.parent_id) { - // 有 parent → 检查所有 siblings 是否都完成了 - const parent = await loadBaton(baton.parent_id) - const children = await loadChildren(parent.id) - const allDone = children.every(c => c.status === 'completed' || c.status === 'failed') - - if (allDone) { - // 所有子任务完成 → 唤醒 parent 继续执行 - const childResults = children.map(c => ({ id: c.id, goal: c.goal, result: c.result, error: c.error })) - await resumeParent(parent, childResults) - } - } else { - // 没有 parent → 根任务完成 → 通知用户 - if (baton.notify) { - await notifyUser(baton) - } +// 创建新 Baton → 入队 +await env.BATON_QUEUE.send({ batonId: child.id, event: 'created' }) + +// Baton 完成 → 入队通知 parent +await env.BATON_QUEUE.send({ batonId: baton.parent_id, event: 'child_completed', childId: baton.id }) +``` + +Queue Consumer 是事件循环的核心: + +```typescript +async queue(batch: MessageBatch, env: Env) { + for (const msg of batch.messages) { + const { batonId, event } = msg.body + + switch (event) { + case 'created': + // 新 Baton → 派发执行 + await executeBaton(await loadBaton(batonId, env), env) + break + + case 'child_completed': + case 'child_failed': + // 子任务完成 → 检查是否所有 children 都完成了 + const parent = await loadBaton(batonId, env) + const children = await loadChildren(batonId, env) + const allDone = children.every(c => + c.status === 'completed' || c.status === 'failed' + ) + + if (allDone) { + // 所有子任务完成 → 唤醒 parent,带上子任务结果 + const results = children.map(c => ({ + goal: c.prompt.slice(0, 100), + result: c.result, + error: c.error, + })) + await resumeParent(parent, results, env) + } + break } - } - - if (newStatus === 'pending') { - // 新任务 → 派发给 worker 执行 - await dispatchToWorker(baton) + + msg.ack() } } ``` +### 为什么是 Queue + +| | waitUntil 自调用 | **Queue** | Durable Objects | +|--|--|--|--| +| 事件驱动 | ❌ 在模拟 | **✅ 天然** | ✅ | +| 重试 | ❌ 需手写 | **✅ 内置** | ✅ | +| 并发控制 | ❌ 无 | **✅ batch + concurrency** | ✅ | +| 死信处理 | ❌ 无 | **✅ DLQ** | ❌ 需手写 | +| 复杂度 | 低 | **低** | 高 | +| 解耦 | ❌ 调度和执行耦合 | **✅ 完全解耦** | ✅ | + +Queue 和 Baton 的事件驱动模型是**天然匹配**的。状态变更 = 事件 = 消息。用 HTTP 自调用来模拟事件是 workaround,Queue 才是正解。 + ### 事件接力图 ``` 用户消息 │ ▼ -┌─────────────────────┐ -│ 请求 1: Worker │ -│ 拿到 bt_root │ -│ → 太大,breakdown │ -│ → spawn bt_a, bt_b │ -│ → 退出 │ -└─────────────────────┘ - │ │ - ▼ ▼ -┌──────────┐ ┌──────────┐ -│ 请求 2 │ │ 请求 3 │ ← 并发! -│ 执行 bt_a │ │ 执行 bt_b │ -│ → completed│ │ → completed│ -└──────────┘ └──────────┘ - │ │ - └───────┬────────┘ - ▼ -┌─────────────────────┐ -│ 请求 4: Worker │ -│ bt_root 被唤醒 │ -│ → 汇总 bt_a + bt_b │ -│ → completed │ -│ → 通知用户 │ -└─────────────────────┘ +┌─────────────────────────┐ +│ Worker A │ +│ 拿到 bt_root │ +│ → 太大,breakdown │ +│ → 写入 bt_a, bt_b 到 D1 │ +│ → 入队 {bt_a, created} │ +│ → 入队 {bt_b, created} │ +│ → 退出 │ +└─────────────────────────┘ + │ + ┌────┴────┐ + ▼ ▼ Queue Consumer 并发消费 +┌────────┐ ┌────────┐ +│Worker B│ │Worker C│ +│执行 bt_a│ │执行 bt_b│ +│→ done │ │→ done │ +│→ 入队 │ │→ 入队 │ +│ {root, │ │ {root, │ +│ child_ │ │ child_ │ +│ done} │ │ done} │ +└────────┘ └────────┘ + │ │ + └────┬────┘ + ▼ +┌─────────────────────────┐ +│ Worker D │ +│ bt_root 被唤醒 │ +│ → 读取 bt_a + bt_b 结果 │ +│ → 汇总 → completed │ +│ → 通知用户 │ +│ → 退出 │ +└─────────────────────────┘ ``` -每个请求都是短暂的。没有任何一个 Worker 需要跑超过执行窗口。但整个任务可以跨越任意长的时间、任意深的递归。 - -**用事件接力代替长进程。** - -### Breakdown 决策 - -Worker 怎么判断"我能完成吗"?两个信号: - -**1. 时间预估(硬约束)** - -```typescript -function shouldBreakdown(baton: Baton): boolean { - const timeHint = baton.constraints?.timeout_hint || 25 // 默认 25 秒窗口 - const estimatedRounds = estimateRounds(baton.goal, baton.tools) - const avgRoundTime = 5 // 每轮 LLM 调用约 5 秒 - - return estimatedRounds * avgRoundTime > timeHint -} -``` - -**2. LLM 判断(软约束)** - -也可以直接问 LLM: - -> "你有约 25 秒的执行窗口。以下任务能在窗口内完成吗?如果不能,请拆分成可以独立完成的子任务。" - -LLM 天然擅长判断任务复杂度。如果它认为"查天气"一轮就搞定,就直接做;如果认为"写一篇分析报告"需要搜索 + 整理 + 写作,就拆分。 - -**递归的自然退出条件**:任务小到一个 worker 能在时间窗口内完成时,递归就停了。不需要预设"最多拆几层"——复杂度决定深度。 +每个 Worker 都是短暂的。没有长进程。但整个任务树可以任意深、任意宽。 ## 存储层 @@ -215,28 +235,24 @@ CREATE TABLE batons ( id TEXT PRIMARY KEY, parent_id TEXT, depth INTEGER DEFAULT 0, - - -- 任务定义 - goal TEXT NOT NULL, - context TEXT, -- JSON - tools TEXT, -- JSON array - prompt TEXT, - max_rounds INTEGER DEFAULT 6, - timeout_hint INTEGER DEFAULT 25, - + + -- 核心 + prompt TEXT NOT NULL, + hints TEXT, -- JSON array,建议工具名 + -- 状态 - status TEXT DEFAULT 'pending', -- pending/running/completed/failed/spawned - result TEXT, -- JSON + status TEXT DEFAULT 'pending', + result TEXT, error TEXT, - + -- 通知 - channel TEXT, -- telegram / api / a2a + channel TEXT, notify INTEGER DEFAULT 0, - + -- 时间 created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL, - + FOREIGN KEY (parent_id) REFERENCES batons(id) ); @@ -247,88 +263,35 @@ CREATE INDEX idx_batons_status ON batons(status); ### 为什么是 D1 而不是 KV - **强一致性** — 状态机需要 read-then-write 原子性,KV 的 60 秒最终一致性会导致 race condition -- **SQL 查询** — "查找某个 parent 下所有 children 的状态"是高频操作,D1 原生支持 +- **SQL 查询** — "查找某个 parent 下所有 children 的状态"是高频操作,SQL 原生支持 - **事务** — 更新 Baton 状态 + 检查 siblings 需要在同一个事务里 -## 调度机制 - -### CF Workers 实现 - -在 CF Workers 环境下,调度有几种方式: - -**方案 A:自调用 + waitUntil(推荐起步)** - -```typescript -// 创建子 Baton 后,通过 waitUntil 异步触发执行 -for (const child of children) { - ctx.waitUntil( - fetch(`https://doudou.shazhou.work/baton/${child.id}/run`, { - method: 'POST', - headers: { 'Authorization': `Bearer ${token}` }, - }) - ) -} -``` - -- ✅ 零额外基础设施 -- ✅ 天然并发(多个 waitUntil 并行) -- ⚠️ 需要 Custom Domain(避免同 account Worker 互调限制) - -**方案 B:Queue + Consumer** - -```typescript -// Baton 状态变更时推入 Queue -await env.BATON_QUEUE.send({ batonId: child.id, action: 'execute' }) -``` - -CF Queues 的 Consumer 天然是事件驱动的,重试、死信队列都有。 - -- ✅ 真正的异步,解耦调度和执行 -- ✅ 内置重试和错误处理 -- ⚠️ 需要配置 Queue binding - -**方案 C:Durable Objects 协调器(远期)** - -一个 DO 实例管理整棵任务树的状态。 - -- ✅ 强一致 + 实时事件 -- ⚠️ 复杂度高,起步不需要 - -### 建议路径 - -**Phase 1**:方案 A(自调用 + waitUntil)。验证 Baton 模型本身是否 work。 - -**Phase 2**:方案 B(Queue)。当任务量上来需要更可靠的调度时引入。 - -**Phase 3**:方案 C(DO)。当需要实时状态推送、复杂协调逻辑时考虑。 - ## 与 Uncaged 集成 ### 新端点 ``` -POST /baton → 创建 Baton(外部触发) -POST /baton/:id/run → 执行 Baton(内部调度) +POST /baton → 创建 Baton(外部触发,入队执行) GET /baton/:id → 查询状态 GET /baton/:id/tree → 查询完整任务树 ``` ### 新内置 Tool +LLM 在 agentic loop 中可以调用 `spawn_task` 创建并发子任务: + ```typescript -// 主 agent 的 agentic loop 中可用 { name: "spawn_task", - description: "创建一个并发子任务。任务会被独立执行,完成后结果自动汇总。", + description: "创建一个并发子任务。会被独立执行,完成后结果自动汇总回来。", parameters: { - goal: { type: "string", description: "子任务目标" }, - tools: { type: "array", description: "工具白名单(可选)" }, - context: { type: "object", description: "额外上下文(可选)" }, + prompt: { type: "string", description: "完整的任务描述" }, + hints: { type: "array", description: "建议使用的工具(可选,仅供参考)" }, } } ``` -LLM 可以在 agentic loop 中调用 `spawn_task` 创建并发子任务。当所有子任务完成后,结果自动注入到主 agent 的下一轮对话中。 +当所有 spawn 的子任务完成后,结果自动注入到主 agent 的下一轮对话中。 ### 用户体验 @@ -338,13 +301,9 @@ LLM 可以在 agentic loop 中调用 `spawn_task` 创建并发子任务。当所 2. 豆豆说"让我想想…"(或直接开始回复) 3. 一段时间后,收到完整的回复 -如果任务很快(单个 Baton 直接完成),体验和现在一样。如果任务复杂(breakdown 了好几层),用户只是等得稍微久一点,但最终收到的是一个汇总好的完整回答。 +如果任务简单(直接完成),体验和现在一样。如果任务复杂(breakdown 了好几层),只是等得稍微久一点,但最终收到的是汇总好的完整回答。 -可选的透明度增强: - -- 豆豆可以先发一条"我正在并行处理 3 个子任务…" -- 子任务完成时逐个推送进展 -- 这由根 Baton 的 `notify` 策略控制 +可选:豆豆可以先发"我正在并行处理 3 个子任务…",逐步推送进展。这由根 Baton 的 `notify` 策略控制。 ## 与 Sigil 的关系 @@ -362,11 +321,13 @@ Sigil + Baton = **agent 既不需要预装所有工具,也不需要一口气 ## 设计原则 -1. **任务是一等公民,agent 不是** — Baton 是主语,worker 是动词。没有"子 agent"的概念。 -2. **无状态 worker** — 任何 worker 拿到任何 Baton 都能执行。不依赖特定实例。 -3. **事件驱动** — 没有轮询,没有长连接。状态变更触发下一步。 -4. **递归 breakdown 自然收敛** — 时间窗口是唯一约束,任务复杂度决定递归深度。 -5. **用户无感** — Baton 是内部实现,用户只看到"发消息 → 收到回复"。 +1. **Baton 就是一段 prompt** — 不要用结构化字段模拟自然语言已经能表达的东西。 +2. **工具是建议,不是围栏** — hints 帮 worker 快速 ramp up,worker 可以自由发现更多工具。 +3. **任务是一等公民,agent 不是** — Baton 是主语,worker 是动词。 +4. **无状态 worker** — 任何 worker 拿到任何 Baton 都能执行。 +5. **事件驱动** — Queue 天然就是事件总线。状态变更 = 消息。 +6. **递归 breakdown 自然收敛** — 任务复杂度决定递归深度,不需要硬编码层数限制。 +7. **用户无感** — Baton 是内部机制,用户只看到"发消息 → 收到回复"。 ---