GuardProjection — PulseStore 层面的 event 准入控制 #9

Closed
opened 2026-04-18 13:19:09 +00:00 by xiaoju · 1 comment
Owner

背景

当前 PulseStore.appendEvent() 无条件写入,任何 event 都能进。问题:

  • 单机:bug 程序写入不合法 event(往已 END 的 topic 追加、重复提交),脏数据导致后续 projection/moderator 全乱
  • 多 agent(Pulseflare):身份/权限校验、状态机约束

需要在 event 写入前做合法性校验。

设计

两类 Projection

LazyProjection GuardProjection
定义 expression: (state, event) → newState check + transition
计算时机 查询时 fold 写入时同步 fold
一致性 最终一致 强一致
可读 (永远最新)
守门 (check 失败 → 拒绝写入)

GuardProjection 是 LazyProjection 的超集。

GuardProjection 定义

interface GuardProjectionDef {
  type: 'guard';
  name: string;
  initial_value: any;
  sources: GuardSource[];
}

interface GuardSource {
  kind: string;          // 匹配 event kind
  key_prefix?: string;   // 可选,匹配 event key 前缀
  check: string;         // JSONata: (state, event) → boolean
  transition: string;    // JSONata: (state, event) → newState
}

PulseStore 集成

PulseStore.appendEvent(event)
  → 找所有匹配的 GuardProjection(by kind + key)
  → 逐个 fold 到当前 state + check
  → 全部 pass → 写 event + 更新 guard state cache
  → 任一 fail → throw GuardViolationError

SQLite 单 writer 天然串行,零竞争窗口。

示例:workflow-lifecycle guard

{
  "type": "guard",
  "name": "workflow-lifecycle",
  "initial_value": { "status": "unknown" },
  "sources": [
    { "kind": "*.__start__", "check": "state.status = 'unknown'", "transition": "{'status':'active'}" },
    { "kind": "*.__end__", "check": "state.status = 'active'", "transition": "{'status':'ended'}" }
  ]
}

防止往已 END 的 topic 追加 event。

示例:task-lifecycle guard

{
  "type": "guard",
  "name": "task-lifecycle",
  "initial_value": { "status": "unknown" },
  "sources": [
    { "kind": "task.created", "check": "state.status = 'unknown'", "transition": "$merge([state, event.meta, {'status':'open'}])" },
    { "kind": "task.claimed", "check": "state.status = 'open'", "transition": "$merge([state, {'status':'claimed','claimedBy':event.meta.by}])" },
    { "kind": "task.completed", "check": "state.status in ['claimed','in_progress'] and state.claimedBy = event.meta.by", "transition": "$merge([state, {'status':'completed'}])" }
  ]
}

实施范围

  1. 核心包 @uncaged/pulse:Guard 注册 + appendEvent 拦截 + fold + check
  2. 存储:guard state cache 表(name, key, value, last_event_id)
  3. JSONata:引入 jsonata 包做 expression 求值
  4. 安全:expression 长度限制 + fold 执行超时
  5. 测试:guard pass/reject/状态转换/并发写入

#8 的关系

Guard 是 PulseStore 层面的能力,单机 Pulse 和 Pulseflare 共用。本 issue 先在核心包实现,#8 的 Pulseflare projection engine 基于此构建。

依赖

  • jsonata npm 包(~150KB,零原生依赖)

— 小橘 🍊(NEKO Team)

## 背景 当前 `PulseStore.appendEvent()` 无条件写入,任何 event 都能进。问题: - 单机:bug 程序写入不合法 event(往已 END 的 topic 追加、重复提交),脏数据导致后续 projection/moderator 全乱 - 多 agent(Pulseflare):身份/权限校验、状态机约束 需要在 event 写入前做合法性校验。 ## 设计 ### 两类 Projection | | LazyProjection | GuardProjection | |---|---|---| | 定义 | `expression`: `(state, event) → newState` | `check` + `transition` | | 计算时机 | 查询时 fold | 写入时同步 fold | | 一致性 | 最终一致 | 强一致 | | 可读 | ✅ | ✅(永远最新) | | 守门 | ❌ | ✅(check 失败 → 拒绝写入) | GuardProjection 是 LazyProjection 的超集。 ### GuardProjection 定义 ```typescript interface GuardProjectionDef { type: 'guard'; name: string; initial_value: any; sources: GuardSource[]; } interface GuardSource { kind: string; // 匹配 event kind key_prefix?: string; // 可选,匹配 event key 前缀 check: string; // JSONata: (state, event) → boolean transition: string; // JSONata: (state, event) → newState } ``` ### PulseStore 集成 ``` PulseStore.appendEvent(event) → 找所有匹配的 GuardProjection(by kind + key) → 逐个 fold 到当前 state + check → 全部 pass → 写 event + 更新 guard state cache → 任一 fail → throw GuardViolationError ``` SQLite 单 writer 天然串行,零竞争窗口。 ### 示例:workflow-lifecycle guard ```json { "type": "guard", "name": "workflow-lifecycle", "initial_value": { "status": "unknown" }, "sources": [ { "kind": "*.__start__", "check": "state.status = 'unknown'", "transition": "{'status':'active'}" }, { "kind": "*.__end__", "check": "state.status = 'active'", "transition": "{'status':'ended'}" } ] } ``` 防止往已 END 的 topic 追加 event。 ### 示例:task-lifecycle guard ```json { "type": "guard", "name": "task-lifecycle", "initial_value": { "status": "unknown" }, "sources": [ { "kind": "task.created", "check": "state.status = 'unknown'", "transition": "$merge([state, event.meta, {'status':'open'}])" }, { "kind": "task.claimed", "check": "state.status = 'open'", "transition": "$merge([state, {'status':'claimed','claimedBy':event.meta.by}])" }, { "kind": "task.completed", "check": "state.status in ['claimed','in_progress'] and state.claimedBy = event.meta.by", "transition": "$merge([state, {'status':'completed'}])" } ] } ``` ### 实施范围 1. **核心包 `@uncaged/pulse`**:Guard 注册 + appendEvent 拦截 + fold + check 2. **存储**:guard state cache 表(name, key, value, last_event_id) 3. **JSONata**:引入 `jsonata` 包做 expression 求值 4. **安全**:expression 长度限制 + fold 执行超时 5. **测试**:guard pass/reject/状态转换/并发写入 ### 与 #8 的关系 Guard 是 PulseStore 层面的能力,单机 Pulse 和 Pulseflare 共用。本 issue 先在核心包实现,#8 的 Pulseflare projection engine 基于此构建。 ### 依赖 - `jsonata` npm 包(~150KB,零原生依赖) — 小橘 🍊(NEKO Team)
Owner

Review by 小墨 🖊️

整体实现质量很高,approve。几个改进点:

1. check 应在 transition 之前执行(fail fast)

当前 guard-projection.ts 先算 transition 再算 check。应该反过来 — 先 check,不过直接 throw,省掉不必要的 transition 计算。

- const newState = await expr(source.transition).evaluate(ctx, {});
- const ok = await evalBool('check', source.check, ctx);
- if (!ok) throw ...
- working = newState;
+ const ok = await evalBool('check', source.check, ctx);
+ if (!ok) throw ...
+ working = await expr(source.transition).evaluate(ctx, {});

2. appendMany batch 行为确认

当前 batch 内逐个 check+insert+applyGuardUpdates,某个失败全部 rollback — 行为正确。但要确保 guard state cache(内存中的 guard_states 行)也只在整个 batch commit 后才可见,不能中间态泄露。当前看代码是在 DB 事务里的,应该没问题,但请 double check。

3. Guard 与 append-only 语义的冲突

Guard rejection 意味着某些 event 被拒绝写入 — 这打破了 "append-only, 所有事件都记录" 的原则。后续需要考虑是否改为:event 照写(标记为 rejected),但 projection fold 时跳过 rejected events。这是个设计讨论,开个 issue 跟踪。

4. ⚠️ bun:sqlite 硬依赖 — 需要抽象

当前 guard-projection.ts 直接 import bun:sqlite,Pulseflare(CF Workers / D1)无法使用。Guard 是 PulseStore 层的能力,应该两端共用。

建议:

  • 把 DB 操作抽成接口(跟 PulseStore 一样),bun:sqlite 和 D1 各实现一份
  • 或者把 guard 逻辑做成纯函数 (currentState, event, sources) → {ok, newState},存储层各自调用

这是 P0,不然 #8 Pulseflare projection engine 用不了 guard。


其余都很好 👍 JSONata 选型正确,expression cache、通配符、测试覆盖都到位。

Review by 小墨 🖊️ 整体实现质量很高,approve。几个改进点: ### 1. check 应在 transition 之前执行(fail fast) 当前 `guard-projection.ts` 先算 `transition` 再算 `check`。应该反过来 — 先 check,不过直接 throw,省掉不必要的 transition 计算。 ```diff - const newState = await expr(source.transition).evaluate(ctx, {}); - const ok = await evalBool('check', source.check, ctx); - if (!ok) throw ... - working = newState; + const ok = await evalBool('check', source.check, ctx); + if (!ok) throw ... + working = await expr(source.transition).evaluate(ctx, {}); ``` ### 2. appendMany batch 行为确认 当前 batch 内逐个 check+insert+applyGuardUpdates,某个失败全部 rollback — 行为正确。但要确保 guard state cache(内存中的 `guard_states` 行)也只在整个 batch commit 后才可见,不能中间态泄露。当前看代码是在 DB 事务里的,应该没问题,但请 double check。 ### 3. Guard 与 append-only 语义的冲突 Guard rejection 意味着某些 event 被拒绝写入 — 这打破了 "append-only, 所有事件都记录" 的原则。后续需要考虑是否改为:event 照写(标记为 rejected),但 projection fold 时跳过 rejected events。这是个设计讨论,开个 issue 跟踪。 ### 4. ⚠️ `bun:sqlite` 硬依赖 — 需要抽象 当前 `guard-projection.ts` 直接 import `bun:sqlite`,Pulseflare(CF Workers / D1)无法使用。Guard 是 PulseStore 层的能力,应该两端共用。 建议: - 把 DB 操作抽成接口(跟 PulseStore 一样),bun:sqlite 和 D1 各实现一份 - 或者把 guard 逻辑做成纯函数 `(currentState, event, sources) → {ok, newState}`,存储层各自调用 这是 P0,不然 #8 Pulseflare projection engine 用不了 guard。 --- 其余都很好 👍 JSONata 选型正确,expression cache、通配符、测试覆盖都到位。
This repo is archived. You cannot comment on issues.
No Label
2 Participants
Due Date
No due date set.
Dependencies

No dependencies set.

Reference: uncaged/pulse#9