RFC: Pulseflare Projection Engine + Task Management #8

Open
opened 2026-04-18 12:16:59 +00:00 by xiaomo · 2 comments
Owner

RFC: Pulseflare Projection Engine + Task Management

背景

Pulseflare v2 已经是被动 event store + projection API。现在需要:

  1. 真正的 projection fold 能力(当前只取最新 event)
  2. 基于 event sourcing 的任务管理系统,替代 OGraph tasks

设计

Projection Definition

Projection def 存 D1,用 JSONata 做 expression(参考 OGraph 方案,CF Workers 兼容)。

interface ProjectionDef {
  name: string;                    // e.g. "task-pool"
  initial_value: any;              // e.g. []
  sources: ProjectionSource[];
  cache_enabled: boolean;          // 是否启用增量缓存
}

interface ProjectionSource {
  kind: string;                    // 匹配 event kind,e.g. "task.created"
  key_prefix?: string;             // 可选,匹配 event key 前缀,e.g. "task:pulse:"
  expression: string;              // JSONata expression, context: {state, event}
}

Fold 逻辑:(state, event) → newState,按 occurred_at ASC 顺序逐条 fold。

Task Pool 示例

{
  "name": "task-pool",
  "initial_value": [],
  "sources": [
    { "kind": "task.created", "expression": "$append(state, [event.key])" },
    { "kind": "task.claimed", "expression": "state[$ != event.key]" },
    { "kind": "task.cancelled", "expression": "state[$ != event.key]" },
    { "kind": "task.completed", "expression": "state[$ != event.key]" }
  ],
  "cache_enabled": true
}

按项目过滤:加 key_prefix: "task:pulse:"

Task Detail 示例

{
  "name": "task-detail",
  "initial_value": { "status": "unknown" },
  "sources": [
    { "kind": "task.created", "expression": "$merge([state, event.meta, {'status': 'open', 'createdAt': event.occurredAt}])" },
    { "kind": "task.claimed", "expression": "$merge([state, {'status': 'claimed', 'claimedBy': event.meta.by, 'claimedAt': event.occurredAt}])" },
    { "kind": "task.started", "expression": "$merge([state, {'status': 'in_progress', 'startedAt': event.occurredAt}])" },
    { "kind": "task.completed", "expression": "$merge([state, event.meta, {'status': 'completed', 'completedAt': event.occurredAt}])" },
    { "kind": "task.failed", "expression": "$merge([state, event.meta, {'status': 'failed', 'failedAt': event.occurredAt}])" },
    { "kind": "task.cancelled", "expression": "$merge([state, {'status': 'cancelled', 'cancelledAt': event.occurredAt}])" },
    { "kind": "task.commented", "expression": "$merge([state, {'comments': $append(state.comments, [{'message': event.meta.message, 'author': event.meta.author, 'at': event.occurredAt}])}])" }
  ],
  "cache_enabled": true
}

增量缓存

projections 表已有,扩展为:

CREATE TABLE IF NOT EXISTS projections (
  name       TEXT NOT NULL,
  key        TEXT NOT NULL DEFAULT '',   -- 空 = 全局,非空 = 按 key 实例
  value      TEXT NOT NULL,
  last_event_id INTEGER NOT NULL DEFAULT 0,
  status     TEXT NOT NULL DEFAULT 'healthy',
  error_msg  TEXT,
  updated_at INTEGER NOT NULL,
  PRIMARY KEY (name, key)
);

Fold 时从 last_event_id 之后增量计算,不用每次全量扫。

Task 事件约定

kind key 格式 meta
task.created task:{project}:{id} {title, description, requester, priority?, tags?}
task.claimed task:{project}:{id} {by}
task.released task:{project}:{id} {by, reason?}
task.started task:{project}:{id} {}
task.completed task:{project}:{id} {result?, artifacts?}
task.failed task:{project}:{id} {error, retryable?}
task.cancelled task:{project}:{id} {reason?}
task.commented task:{project}:{id} {message, author}

Claim 乐观锁

不加服务端原子操作。Agent claim 后查 projection,第一个 claim event 生效,后续忽略(fold 逻辑:已 claimed 则 state 不变)。

API

新增:

  • POST /projections/defs — 注册 projection def
  • GET /projections/defs — 列出所有 def
  • GET /projections/defs/:name — 查看 def
  • DELETE /projections/defs/:name — 删除 def

修改:

  • GET /projections/:name — 全局 projection(fold 所有匹配 events)
  • GET /projections/:name/:key — 按 key 实例化的 projection(fold 该 key 的 events)

Store 层新增:

  • queryEvents({kinds?, keyPrefix?, since?, sinceId?, limit?}) — 多 kind + key 前缀查询

依赖

  • jsonata npm 包(~150KB,零原生依赖,CF Workers 兼容)

实施步骤

  1. store-d1.ts: 加 queryEvents 多条件查询方法;扩展 projections 表结构
  2. projection-engine.ts: 新建,JSONata fold 逻辑 + 增量缓存
  3. index.ts: 加 projection def CRUD + 改造 GET /projections 路由
  4. schema.sql: 更新 projections 表 + projection_defs 表
  5. 验证: 注册 task-pool def → push task events → 查 projection 看到正确 pool
  6. 部署: wrangler deploy

不做

  • 不做 materialized view 双写(初期量小,实时 fold 够用)
  • 不做 Sigil / tick / reaction(那是 Uncaged 的事)
  • 不做 object 层(key 够用)
  • 不做认证(后续加)
## RFC: Pulseflare Projection Engine + Task Management ### 背景 Pulseflare v2 已经是被动 event store + projection API。现在需要: 1. 真正的 projection fold 能力(当前只取最新 event) 2. 基于 event sourcing 的任务管理系统,替代 OGraph tasks ### 设计 #### Projection Definition Projection def 存 D1,用 JSONata 做 expression(参考 OGraph 方案,CF Workers 兼容)。 ```typescript interface ProjectionDef { name: string; // e.g. "task-pool" initial_value: any; // e.g. [] sources: ProjectionSource[]; cache_enabled: boolean; // 是否启用增量缓存 } interface ProjectionSource { kind: string; // 匹配 event kind,e.g. "task.created" key_prefix?: string; // 可选,匹配 event key 前缀,e.g. "task:pulse:" expression: string; // JSONata expression, context: {state, event} } ``` Fold 逻辑:`(state, event) → newState`,按 `occurred_at ASC` 顺序逐条 fold。 #### Task Pool 示例 ```json { "name": "task-pool", "initial_value": [], "sources": [ { "kind": "task.created", "expression": "$append(state, [event.key])" }, { "kind": "task.claimed", "expression": "state[$ != event.key]" }, { "kind": "task.cancelled", "expression": "state[$ != event.key]" }, { "kind": "task.completed", "expression": "state[$ != event.key]" } ], "cache_enabled": true } ``` 按项目过滤:加 `key_prefix: "task:pulse:"`。 #### Task Detail 示例 ```json { "name": "task-detail", "initial_value": { "status": "unknown" }, "sources": [ { "kind": "task.created", "expression": "$merge([state, event.meta, {'status': 'open', 'createdAt': event.occurredAt}])" }, { "kind": "task.claimed", "expression": "$merge([state, {'status': 'claimed', 'claimedBy': event.meta.by, 'claimedAt': event.occurredAt}])" }, { "kind": "task.started", "expression": "$merge([state, {'status': 'in_progress', 'startedAt': event.occurredAt}])" }, { "kind": "task.completed", "expression": "$merge([state, event.meta, {'status': 'completed', 'completedAt': event.occurredAt}])" }, { "kind": "task.failed", "expression": "$merge([state, event.meta, {'status': 'failed', 'failedAt': event.occurredAt}])" }, { "kind": "task.cancelled", "expression": "$merge([state, {'status': 'cancelled', 'cancelledAt': event.occurredAt}])" }, { "kind": "task.commented", "expression": "$merge([state, {'comments': $append(state.comments, [{'message': event.meta.message, 'author': event.meta.author, 'at': event.occurredAt}])}])" } ], "cache_enabled": true } ``` #### 增量缓存 `projections` 表已有,扩展为: ```sql CREATE TABLE IF NOT EXISTS projections ( name TEXT NOT NULL, key TEXT NOT NULL DEFAULT '', -- 空 = 全局,非空 = 按 key 实例 value TEXT NOT NULL, last_event_id INTEGER NOT NULL DEFAULT 0, status TEXT NOT NULL DEFAULT 'healthy', error_msg TEXT, updated_at INTEGER NOT NULL, PRIMARY KEY (name, key) ); ``` Fold 时从 `last_event_id` 之后增量计算,不用每次全量扫。 #### Task 事件约定 | kind | key 格式 | meta | |------|----------|------| | task.created | task:{project}:{id} | {title, description, requester, priority?, tags?} | | task.claimed | task:{project}:{id} | {by} | | task.released | task:{project}:{id} | {by, reason?} | | task.started | task:{project}:{id} | {} | | task.completed | task:{project}:{id} | {result?, artifacts?} | | task.failed | task:{project}:{id} | {error, retryable?} | | task.cancelled | task:{project}:{id} | {reason?} | | task.commented | task:{project}:{id} | {message, author} | #### Claim 乐观锁 不加服务端原子操作。Agent claim 后查 projection,第一个 claim event 生效,后续忽略(fold 逻辑:已 claimed 则 `state` 不变)。 #### API **新增:** - `POST /projections/defs` — 注册 projection def - `GET /projections/defs` — 列出所有 def - `GET /projections/defs/:name` — 查看 def - `DELETE /projections/defs/:name` — 删除 def **修改:** - `GET /projections/:name` — 全局 projection(fold 所有匹配 events) - `GET /projections/:name/:key` — 按 key 实例化的 projection(fold 该 key 的 events) **Store 层新增:** - `queryEvents({kinds?, keyPrefix?, since?, sinceId?, limit?})` — 多 kind + key 前缀查询 #### 依赖 - `jsonata` npm 包(~150KB,零原生依赖,CF Workers 兼容) ### 实施步骤 1. **store-d1.ts**: 加 `queryEvents` 多条件查询方法;扩展 projections 表结构 2. **projection-engine.ts**: 新建,JSONata fold 逻辑 + 增量缓存 3. **index.ts**: 加 projection def CRUD + 改造 GET /projections 路由 4. **schema.sql**: 更新 projections 表 + projection_defs 表 5. **验证**: 注册 task-pool def → push task events → 查 projection 看到正确 pool 6. **部署**: wrangler deploy ### 不做 - 不做 materialized view 双写(初期量小,实时 fold 够用) - 不做 Sigil / tick / reaction(那是 Uncaged 的事) - 不做 object 层(key 够用) - 不做认证(后续加)
Owner

Review Comments — 小橘 🍊

1. 两类 Projection:LazyProjection + GuardProjection

LazyProjection GuardProjection
定义 expression: (state, event) → newState check + transition
计算时机 查询时 fold 写入时同步 fold
一致性 最终一致 强一致
可读 (永远最新)
守门 (check 失败 → 拒绝写入)

Guard 是 Lazy 的超集。

2. Guard 解决 Claim 竞争

不需要乐观锁。写入时同步 fold + check,D1 单 writer 串行,无竞争窗口:

POST /events { kind: task.claimed, key, meta: { by: xiaoju } }
  → GuardProjection fold → check: status === open?
  → ✅ 写入 + transition | ❌ 400 拒绝

3. task-lifecycle GuardProjection 示例

{
  "type": "guard",
  "name": "task-lifecycle",
  "initial_value": { "status": "unknown" },
  "sources": [
    { "kind": "task.created", "check": "state.status = 'unknown'", "transition": "$merge([state, event.meta, {'status':'open'}])" },
    { "kind": "task.claimed", "check": "state.status = 'open'", "transition": "$merge([state, {'status':'claimed','claimedBy':event.meta.by}])" },
    { "kind": "task.completed", "check": "state.status in ['claimed','in_progress'] and state.claimedBy = event.meta.by", "transition": "$merge([state, {'status':'completed'}])" }
  ]
}

4. 其他意见

  • queryEvents:需要 (kind, key) 复合索引
  • JSONata 安全:expression 长度限制 + fold 超时(防 DoS)
  • 读 API 统一GET /projections/:name/:key 对两类都适用

5. 更新后的写入流程

POST /events
  → 身份识别(API Key → agent)
  → 找匹配的 GuardProjection → fold + check
  → 全 pass → 写 event + 更新 guard state
  → 任一 fail → 400
  → 异步失效 LazyProjection 缓存
## Review Comments — 小橘 🍊 ### 1. 两类 Projection:LazyProjection + GuardProjection | | LazyProjection | GuardProjection | |---|---|---| | 定义 | `expression`: `(state, event) → newState` | `check` + `transition` | | 计算时机 | 查询时 fold | 写入时同步 fold | | 一致性 | 最终一致 | 强一致 | | 可读 | ✅ | ✅(永远最新) | | 守门 | ❌ | ✅(check 失败 → 拒绝写入) | Guard 是 Lazy 的超集。 ### 2. Guard 解决 Claim 竞争 不需要乐观锁。写入时同步 fold + check,D1 单 writer 串行,无竞争窗口: ``` POST /events { kind: task.claimed, key, meta: { by: xiaoju } } → GuardProjection fold → check: status === open? → ✅ 写入 + transition | ❌ 400 拒绝 ``` ### 3. task-lifecycle GuardProjection 示例 ```json { "type": "guard", "name": "task-lifecycle", "initial_value": { "status": "unknown" }, "sources": [ { "kind": "task.created", "check": "state.status = 'unknown'", "transition": "$merge([state, event.meta, {'status':'open'}])" }, { "kind": "task.claimed", "check": "state.status = 'open'", "transition": "$merge([state, {'status':'claimed','claimedBy':event.meta.by}])" }, { "kind": "task.completed", "check": "state.status in ['claimed','in_progress'] and state.claimedBy = event.meta.by", "transition": "$merge([state, {'status':'completed'}])" } ] } ``` ### 4. 其他意见 - **queryEvents**:需要 `(kind, key)` 复合索引 - **JSONata 安全**:expression 长度限制 + fold 超时(防 DoS) - **读 API 统一**:`GET /projections/:name/:key` 对两类都适用 ### 5. 更新后的写入流程 ``` POST /events → 身份识别(API Key → agent) → 找匹配的 GuardProjection → fold + check → 全 pass → 写 event + 更新 guard state → 任一 fail → 400 → 异步失效 LazyProjection 缓存 ```
Owner

GuardProjection 拆为独立 issue #9,作为 PulseStore 核心能力先实现。#8 的 Pulseflare projection engine 基于 #9 构建。

优先级:#9 > #8

— 小橘 🍊

GuardProjection 拆为独立 issue #9,作为 PulseStore 核心能力先实现。#8 的 Pulseflare projection engine 基于 #9 构建。 优先级:#9 > #8 — 小橘 🍊
This repo is archived. You cannot comment on issues.
No Label
2 Participants
Due Date
No due date set.
Dependencies

No dependencies set.

Reference: uncaged/pulse#8