20 KiB
Pulse Runtime 无关化分析报告
目标:把
@uncaged/pulse从 Bun-only 改造为支持 Bun + Cloudflare Workers 两个 runtime。 分析日期:2026-04-18
1. 依赖地图
1.1 store.ts
| 维度 | 详情 |
|---|---|
| Bun-specific | import { Database } from 'bun:sqlite' — SQLite 数据库(createStore / createScopedStore 的核心依赖) |
| Node.js API | node:crypto(createHash)、node:fs(existsSync / mkdirSync / readdirSync / readFileSync / writeFileSync)、node:path(dirname / join) |
| 内部依赖 | ./defs.js(initDefsSchema)、./projection-engine.js(PROJECTIONS_SCHEMA) |
| 对外暴露 | EventRecord、ObjectInstance、PulseStore(接口)、ScopedStore(接口)、CreateStoreOptions、CreateScopedStoreOptions、createStore()、createScopedStore() |
关键观察:
PulseStore和ScopedStore已经是纯接口(interface),与具体实现解耦,这是最大的优势。ScopedStore.scopeDatabase(name)返回Database(bun:sqlite类型),是唯一泄漏进接口的 Bun 类型,需要处理。- CAS(内容寻址存储)完全基于本地文件系统(
objects/目录),CF Workers 无文件系统。 createHash('sha256')来自node:crypto,CF Workers 有crypto.subtle(Web Crypto API)可替代。
1.2 defs.ts
| 维度 | 详情 |
|---|---|
| Bun-specific | import type { Database } from 'bun:sqlite' — 所有函数签名都以 Database 为参数 |
| Node.js API | node:crypto(createHash,用于计算 def 的 hash) |
| 外部库 | jsonata(JSONata 表达式引擎,纯 JS,runtime 无关) |
| 内部依赖 | 无(叶子节点) |
| 对外暴露 | ObjectDef、EventDef、ProjectionDef、ValidationResult、initDefsSchema(db)、registerObjectDef()、getObjectDef()、registerEventDef()、getEventDef()、listEventDefs()、registerProjectionDef()、getProjectionDef()、listProjectionDefs()、validateExpression() |
关键观察:
- 所有公共函数都接受
Database参数(bun:sqlite),无法直接在 CF Workers 中使用。 Database实例承担了 SQL 执行角色,需要抽象为一个通用的"数据库访问接口"。jsonata是纯 JS 库,兼容性良好。
1.3 projection-engine.ts
| 维度 | 详情 |
|---|---|
| Bun-specific | import type { Database } from 'bun:sqlite'(Expression 来自 jsonata,非 Bun) |
| Node.js API | 无 |
| 外部库 | jsonata(runtime 无关) |
| 内部依赖 | ./defs.js(getProjectionDef、listProjectionDefs) |
| 对外暴露 | ProjectionState、PROJECTIONS_SCHEMA(DDL 字符串)、clearExpressionCache()、getProjectionState()、foldProjection()、foldAllProjections()、resetProjections() |
关键观察:
- 和
defs.ts相同模式:所有函数接受Database参数。 PROJECTIONS_SCHEMA是 SQL DDL 字符串,CF D1 可直接使用。- 折叠逻辑(JSONata 表达式求值)是纯 JS,runtime 无关。
1.4 gc.ts
| 维度 | 详情 |
|---|---|
| Bun-specific | 无 |
| Node.js API | node:fs(readdirSync、unlinkSync)、node:path(join) |
| 内部依赖 | ./store.js(PulseStore 接口) |
| 对外暴露 | GcTier、GcConfig、GcResult、DEFAULT_GC_CONFIG、gcVitals()、gcOrphanObjects()、runGc()、createGcTrigger() |
关键观察:
gcOrphanObjects()直接操作文件系统扫描objects/目录,CF Workers 无此能力。gcVitals()只依赖PulseStore接口(runtime 无关)。- CF 版本的 CAS 清理需要另一种机制(R2 bucket list + delete,或 TTL)。
1.5 llm-client.ts
| 维度 | 详情 |
|---|---|
| Bun-specific | 无 |
| Node.js API | 无 |
| 内部依赖 | 无(叶子节点) |
| 对外暴露 | LlmMessage、LlmTool、LlmResponse、LlmClient、createOpenAiLlmClient() |
关键观察:
- 完全 runtime 无关,使用标准
fetch+AbortController+setTimeout。 - 可以直接放入
pulse/核心包,无需任何改动。
1.6 persona.ts
| 维度 | 详情 |
|---|---|
| Bun-specific | 无 |
| Node.js API | 无 |
| 内部依赖 | ./store.js(PulseStore 接口)、./task-events.js(类型) |
| 对外暴露 | buildPersonasFromEvents() |
关键观察:
- 完全 runtime 无关,只依赖
PulseStore接口。
1.7 task-events.ts
| 维度 | 详情 |
|---|---|
| Bun-specific | 无 |
| Node.js API | 无 |
| 内部依赖 | 无(纯类型定义文件) |
| 对外暴露 | 所有 *Meta、*Data、*State 类型 |
关键观察:
- 完全 runtime 无关,纯类型定义,直接留在核心包。
1.8 watcher.ts
| 维度 | 详情 |
|---|---|
| Bun-specific | 无 |
| Node.js API | 无 |
| 内部依赖 | ./store.js(EventRecord、PulseStore 接口) |
| 对外暴露 | VitalWithData、WakeCondition、WatcherDef、WatcherHandle、startWatcher() |
关键观察:
- 使用
setTimeout(Web 标准),完全 runtime 无关。 startWatcher()不依赖文件系统,只通过PulseStore接口写数据。- CF Workers 中需要注意:Watcher 使用无限循环 +
setTimeout,不符合 Workers 的请求-响应模型,但在 Durable Objects 中可以运行。
1.9 index.ts
| 维度 | 详情 |
|---|---|
| Bun-specific | 间接(通过 ./store.js、./gc.js) |
| Node.js API | 间接(通过 ./store.js、./gc.js) |
| 内部依赖 | ./gc.js、./projection-engine.js、./store.js(类型)、./watcher.js、./watchers/pending-tasks-projection.js |
| 对外暴露 | 所有公共 API(见源码 export 列表) |
关键观察:
index.ts本身逻辑(runPulse、runPulseV2、composeRules等)完全基于PulseStore接口,runtime 无关。objectsDir参数暴露了文件系统假设,CF 版本需要替代方案。
内部依赖图(核心文件)
index.ts
├── gc.ts → store.ts (PulseStore 接口)
│ → node:fs, node:path (gcOrphanObjects)
├── projection-engine.ts → defs.ts → bun:sqlite (Database 类型)
│ → node:crypto
├── store.ts → bun:sqlite (Database 实现)
│ → node:crypto, node:fs, node:path
│ → defs.ts, projection-engine.ts
├── watcher.ts → store.ts (PulseStore 接口) ✓
├── llm-client.ts (独立,无内部依赖) ✓
├── persona.ts → store.ts (PulseStore 接口), task-events.ts ✓
├── task-events.ts (纯类型) ✓
└── watchers/
└── pending-tasks-projection.ts → store.ts (PulseStore 接口), task-events.ts ✓
2. Store 接口提取方案
2.1 当前 PulseStore 公共方法签名
interface PulseStore {
appendEvent(event: Omit<EventRecord, 'id'>): EventRecord;
appendEvents(events: Omit<EventRecord, 'id'>[]): EventRecord[];
createObject(opts: { objectType: string; externalId?: string; codeRev: string }): number;
getObjectInstance(id: number): ObjectInstance | null;
queryObjectsByType(objectType: string): ObjectInstance[];
getLatest(kind: string, key?: string): EventRecord | null;
getLatestWhere(opts: { kind: string; key?: string; codeRev?: string }): EventRecord | null;
getRecent(limit?: number): EventRecord[];
queryByKind(kind: string, opts?: { key?: string; since?: number; codeRev?: string; limit?: number }): EventRecord[];
getAfter(afterId: number, opts?: { kind?: string; key?: string; codeRev?: string }): EventRecord[];
hasEvents(): boolean;
putObject(data: unknown): string; // CAS 写入
getObject(hash: string): unknown | null; // CAS 读取
close(): void;
archiveEvents(olderThan: number): number;
downsampleEvents(kind: string, key: string, intervalMs: number, olderThan: number): number;
}
2.2 CF D1 可直接实现的方法
以下方法直接映射到 D1 SQL 操作(D1 支持标准 SQL,WAL 模式默认开启):
| 方法 | CF D1 实现方式 |
|---|---|
appendEvent |
INSERT INTO events ...,D1 支持 AUTOINCREMENT |
appendEvents |
D1 支持批量 prepare().bind().batch() |
getLatest |
SELECT ... ORDER BY occurred_at DESC LIMIT 1 |
getLatestWhere |
动态 WHERE 条件 |
getRecent |
SELECT ... ORDER BY occurred_at DESC LIMIT ? |
queryByKind |
SELECT ... WHERE kind = ? + 可选过滤 |
getAfter |
SELECT ... WHERE id > ? |
hasEvents |
SELECT 1 FROM events LIMIT 1 |
archiveEvents |
DELETE FROM events WHERE occurred_at < ? |
downsampleEvents |
D1 支持 ROW_NUMBER() OVER (PARTITION BY ...) |
createObject |
INSERT INTO objects ... with UNIQUE constraint |
getObjectInstance |
SELECT * FROM objects WHERE id = ? |
queryObjectsByType |
SELECT * FROM objects WHERE object_type = ? |
注意:D1 API 是异步的(返回 Promise),而当前 PulseStore 是同步的。需要将接口改为异步:
interface PulseStore {
appendEvent(event: Omit<EventRecord, 'id'>): Promise<EventRecord>;
getLatest(kind: string, key?: string): Promise<EventRecord | null>;
// ...所有方法改为 async
}
这是最大的接口 breaking change。
2.3 涉及文件系统的方法(需替代方案)
| 方法 | 当前实现 | CF 替代方案 |
|---|---|---|
putObject(data) |
写入 objects/<hash>.json |
选项 A:存入 Cloudflare R2 bucket;选项 B:存入 D1 cas_objects 表(对小对象 < 1MB 可行);选项 C:KV storage(适合读多写少) |
getObject(hash) |
读取 objects/<hash>.json |
对应选项的读取操作 |
推荐:使用 D1 额外表存储 CAS 对象(简单,无需额外服务):
CREATE TABLE IF NOT EXISTS cas_objects (
hash TEXT PRIMARY KEY,
data TEXT NOT NULL, -- JSON string
created_at INTEGER NOT NULL
);
对于大对象(> 1MB),改用 R2。
2.4 ScopedStore.scopeDatabase() 问题
当前签名:
interface ScopedStore {
scopeDatabase(name: string): Database; // 返回 bun:sqlite Database
}
这个方法只被 projection-engine.ts 使用,用于直接执行 SQL。需要引入抽象:
/** 抽象 DB 访问接口(供 projection engine 使用) */
interface PulseDatabase {
exec(sql: string): void;
prepare(sql: string): PulseStatement;
transaction<T>(fn: () => T): T;
}
interface ScopedStore {
scopeDatabase(name: string): PulseDatabase; // 不再暴露 bun:sqlite 类型
}
3. defs.ts 和 projection-engine.ts 解耦方案
3.1 当前对 Database 的依赖
两个文件的所有函数都以 Database(bun:sqlite)为第一参数:
// defs.ts
export function initDefsSchema(db: Database): void
export function registerObjectDef(db: Database, opts: ...): ObjectDef
export function getEventDef(db: Database, name: string, codeRev: string): EventDef | null
// projection-engine.ts
export function getProjectionState(scopeDb: Database, projectionName: string): ProjectionState | null
export async function foldProjection(scopeDb: Database, ...): Promise<ProjectionState>
3.2 改为依赖抽象接口
引入 PulseDatabase 抽象接口,屏蔽底层实现:
// 新增:packages/pulse/src/db-interface.ts
export interface PulseStatement {
run(...params: unknown[]): { lastInsertRowid: number; changes: number };
get(...params: unknown[]): unknown | null;
all(...params: unknown[]): unknown[];
}
export interface PulseDatabase {
/** 执行不返回结果的 SQL(DDL 等) */
exec(sql: string): void;
/** 创建预编译语句 */
prepare(sql: string): PulseStatement;
/** 同步事务(Bun/SQLite 的同步事务语义) */
transaction<T>(fn: () => T): () => T;
}
然后将 defs.ts 和 projection-engine.ts 的参数类型从 Database 改为 PulseDatabase:
// defs.ts (修改后)
import type { PulseDatabase } from './db-interface.js';
export function initDefsSchema(db: PulseDatabase): void { ... }
export function getProjectionDef(db: PulseDatabase, name: string, codeRev: string): ProjectionDef | null { ... }
CF D1 适配器实现:
// packages/pulseflare/src/d1-database.ts
import type { PulseDatabase, PulseStatement } from '@uncaged/pulse/db-interface';
export function createD1Database(d1: D1Database): PulseDatabase {
return {
exec(sql) {
// D1 DDL 需要 await,这里需要在初始化时提前执行
// 考虑: do.storage.sql.exec() 的同步接口
},
prepare(sql) {
const stmt = d1.prepare(sql);
return {
run(...params) { return stmt.bind(...params).run(); },
get(...params) { return stmt.bind(...params).first(); },
all(...params) { return stmt.bind(...params).all().then(r => r.results); },
};
},
transaction(fn) {
// D1 通过 batch() 实现事务
return fn;
},
};
}
挑战:D1 是全异步 API,而当前 defs.ts 和 projection-engine.ts 使用同步调用(db.prepare().get())。需要决定:
- 方案 A:将所有接口改为异步(
Promise返回),统一 Bun 和 CF 实现。 - 方案 B:在 CF 中使用 Durable Objects 的
storage.sql(有同步-like 接口)。 - 方案 C:保持同步接口,CF 侧使用 SQLite WASM(如
@cloudflare/d1-sql-driver)。
推荐方案 A:改为全异步,这符合 Web 标准和 CF 生态。
4. 包拆分建议
4.1 @uncaged/pulse(核心包,runtime 无关)
留在核心包的文件:
| 文件 | 说明 |
|---|---|
index.ts |
核心循环逻辑(依赖 PulseStore 接口) |
store.ts |
仅保留接口定义(PulseStore、ScopedStore、EventRecord 等类型),移除实现 |
defs.ts |
改为依赖 PulseDatabase 抽象后保留 |
projection-engine.ts |
改为依赖 PulseDatabase 抽象后保留 |
gc.ts |
保留 gcVitals() 和 createGcTrigger();移除 gcOrphanObjects()(依赖 node:fs) |
llm-client.ts |
完全保留 |
persona.ts |
完全保留 |
task-events.ts |
完全保留 |
watcher.ts |
完全保留 |
watchers/pending-tasks-projection.ts |
完全保留 |
rules/ |
完全保留(builtin、agent-loop 等) |
db-interface.ts |
新增:PulseDatabase / PulseStatement 抽象接口 |
从核心包移除的内容:
createStore()和createScopedStore()的实现(移至pulse-bun)gcOrphanObjects()的文件系统实现(移至各 adapter)
4.2 @uncaged/pulse-bun(Bun adapter)
移入此包的文件:
| 文件 | 说明 |
|---|---|
store.ts(实现部分) |
createStore()、createScopedStore() 的 bun:sqlite 实现 |
gc-bun.ts(新建) |
gcOrphanObjects() 的文件系统实现 |
db-bun.ts(新建) |
PulseDatabase 的 bun:sqlite 实现 |
依赖:@uncaged/pulse(核心包)、bun:sqlite、node:crypto、node:fs、node:path
4.3 @uncaged/pulseflare(CF adapter)— 需实现的内容
| 接口/功能 | 实现方式 |
|---|---|
PulseStore |
基于 CF D1 实现所有方法(全异步) |
ScopedStore |
每个 scope 对应一个 D1 database binding |
PulseDatabase |
包装 D1 API,实现 exec/prepare/transaction |
CAS putObject/getObject |
D1 cas_objects 表(小对象)或 R2 bucket(大对象) |
gcOrphanObjects |
R2 list() + delete() 或 D1 TTL 策略 |
| Watcher 运行时 | Durable Objects alarm API(替代 setInterval) |
runPulse 循环 |
Durable Objects(长驻进程) |
5. 风险和注意事项
5.1 向后兼容问题
| 风险 | 严重度 | 说明 |
|---|---|---|
PulseStore 接口从同步变异步 |
🔴 高 | 所有调用方需要加 await,涉及 index.ts、gc.ts、watcher.ts、所有 watchers |
ScopedStore.scopeDatabase() 返回类型变更 |
🟡 中 | 仅 projection-engine.ts 依赖,改为 PulseDatabase 接口 |
createStore/createScopedStore 从 @uncaged/pulse 迁移到 @uncaged/pulse-bun |
🟡 中 | 所有直接使用的代码需要更新 import 路径 |
gcOrphanObjects 从核心包移出 |
🟢 低 | 已在 runGc() 内部调用,外部调用者少 |
缓解策略:
- 在
@uncaged/pulse中保留createStore/createScopedStore的 re-export(从@uncaged/pulse-bun),标记@deprecated,给迁移窗口期。 - 先发布 v2 核心包(接口不变,只是类型抽象),再发布 v3(异步接口),配合 semver major 版本号。
5.2 同步 vs 异步接口的核心挑战
当前 Bun SQLite 的同步特性被大量依赖:
// 当前:同步
const row = db.prepare('SELECT ...').get(id);
// CF D1:必须异步
const row = await d1.prepare('SELECT ...').bind(id).first();
推荐迁移路径:
- 将
PulseStore所有方法改为Promise返回(breaking change,需要 major 版本)。 - Bun 实现可以用
Promise.resolve()包装同步结果,兼容新接口。 projection-engine.ts的折叠逻辑已经是async(因为 JSONata),只需将数据库读取改为await。
5.3 CF Workers 运行模型限制
| 限制 | 影响 | 解决方案 |
|---|---|---|
| 无文件系统 | CAS 对象存储 | D1 表或 R2 |
| 请求-响应模型(无长驻进程) | runPulse 无限循环 |
Durable Objects |
| CPU 时间限制(默认 30ms) | 复杂 JSONata 折叠 | 拆分到多个请求 / DO |
无 setInterval(常规 Worker) |
Watcher 定时采集 | Durable Objects alarm |
node:crypto 不可用(部分 API) |
SHA-256 哈希 | crypto.subtle.digest('SHA-256', ...) |
node:fs 不可用 |
CAS、GC | R2/D1 替代 |
5.4 测试策略
核心包单元测试(runtime 无关):
- 用 in-memory
PulseStoremock 测试composeRules、rebuildSnapshot、foldProjection等逻辑。 - 已有大量
*.test.ts,只需将测试依赖改为 mock store。
集成测试:
pulse-bun:继续用 Bun test runner,真实 SQLite。pulseflare:用 Miniflare(CF Workers 本地模拟器)测试 D1 + DO。
5.5 迁移顺序建议
Phase 1:接口抽象(非 breaking)
├── 新增 db-interface.ts(PulseDatabase 接口)
├── defs.ts:Database → PulseDatabase(类型兼容,Bun Database 实现了 PulseDatabase)
├── projection-engine.ts:同上
└── store.ts:ScopedStore.scopeDatabase() 返回 PulseDatabase
Phase 2:包拆分(非 breaking,re-export 兼容)
├── 新建 @uncaged/pulse-bun 包
├── createStore/createScopedStore 实现移入 pulse-bun
├── gcOrphanObjects 移入 pulse-bun
└── @uncaged/pulse 保留 re-export(标记 deprecated)
Phase 3:接口异步化(breaking,major 版本)
├── PulseStore 所有方法改为 Promise
├── Bun 实现用 Promise.resolve() 包装
└── 更新 index.ts、gc.ts、watcher.ts 所有调用点
Phase 4:Pulseflare 实现
├── D1 PulseStore 实现
├── D1 PulseDatabase 实现
├── CAS via D1 cas_objects 表
├── Durable Objects 运行 runPulse 循环
└── Durable Objects alarm 驱动 Watcher
附录:CAS 抽象接口建议
为了完全解耦对象存储,建议将 CAS 从 PulseStore 中分离为独立接口:
/** 内容寻址对象存储抽象 */
export interface CasStore {
put(data: unknown): Promise<string>; // 写入,返回 hash
get(hash: string): Promise<unknown | null>; // 读取
listOrphans(referencedHashes: Set<string>): Promise<string[]>; // GC 支持
delete(hash: string): Promise<void>;
}
这样:
- Bun 实现:基于
node:fs的文件系统 CAS - CF 实现:基于 D1 的表 CAS 或 R2 bucket CAS
- 核心包:只依赖
CasStore接口,不关心存储后端