From 80eeac03c2cf059295f9d324e73f0476a391e243 Mon Sep 17 00:00:00 2001 From: xiaomo Date: Sat, 18 Apr 2026 02:24:48 +0000 Subject: [PATCH] docs: Pulseflare runtime analysis report (refs #5) --- PULSEFLARE-ANALYSIS.md | 528 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 528 insertions(+) create mode 100644 PULSEFLARE-ANALYSIS.md diff --git a/PULSEFLARE-ANALYSIS.md b/PULSEFLARE-ANALYSIS.md new file mode 100644 index 0000000..c329ceb --- /dev/null +++ b/PULSEFLARE-ANALYSIS.md @@ -0,0 +1,528 @@ +# Pulse Runtime 无关化分析报告 + +> 目标:把 `@uncaged/pulse` 从 Bun-only 改造为支持 Bun + Cloudflare Workers 两个 runtime。 +> 分析日期:2026-04-18 + +--- + +## 1. 依赖地图 + +### 1.1 `store.ts` + +| 维度 | 详情 | +|------|------| +| **Bun-specific** | `import { Database } from 'bun:sqlite'` — SQLite 数据库(`createStore` / `createScopedStore` 的核心依赖) | +| **Node.js API** | `node:crypto`(`createHash`)、`node:fs`(`existsSync` / `mkdirSync` / `readdirSync` / `readFileSync` / `writeFileSync`)、`node:path`(`dirname` / `join`) | +| **内部依赖** | `./defs.js`(`initDefsSchema`)、`./projection-engine.js`(`PROJECTIONS_SCHEMA`) | +| **对外暴露** | `EventRecord`、`ObjectInstance`、`PulseStore`(接口)、`ScopedStore`(接口)、`CreateStoreOptions`、`CreateScopedStoreOptions`、`createStore()`、`createScopedStore()` | + +**关键观察**: + +- `PulseStore` 和 `ScopedStore` 已经是纯接口(interface),**与具体实现解耦**,这是最大的优势。 +- `ScopedStore.scopeDatabase(name)` 返回 `Database`(`bun:sqlite` 类型),是唯一泄漏进接口的 Bun 类型,需要处理。 +- CAS(内容寻址存储)完全基于本地文件系统(`objects/` 目录),CF Workers 无文件系统。 +- `createHash('sha256')` 来自 `node:crypto`,CF Workers 有 `crypto.subtle`(Web Crypto API)可替代。 + +--- + +### 1.2 `defs.ts` + +| 维度 | 详情 | +|------|------| +| **Bun-specific** | `import type { Database } from 'bun:sqlite'` — 所有函数签名都以 `Database` 为参数 | +| **Node.js API** | `node:crypto`(`createHash`,用于计算 def 的 hash) | +| **外部库** | `jsonata`(JSONata 表达式引擎,纯 JS,runtime 无关) | +| **内部依赖** | 无(叶子节点) | +| **对外暴露** | `ObjectDef`、`EventDef`、`ProjectionDef`、`ValidationResult`、`initDefsSchema(db)`、`registerObjectDef()`、`getObjectDef()`、`registerEventDef()`、`getEventDef()`、`listEventDefs()`、`registerProjectionDef()`、`getProjectionDef()`、`listProjectionDefs()`、`validateExpression()` | + +**关键观察**: + +- 所有公共函数都接受 `Database` 参数(`bun:sqlite`),无法直接在 CF Workers 中使用。 +- `Database` 实例承担了 SQL 执行角色,需要抽象为一个通用的"数据库访问接口"。 +- `jsonata` 是纯 JS 库,兼容性良好。 + +--- + +### 1.3 `projection-engine.ts` + +| 维度 | 详情 | +|------|------| +| **Bun-specific** | `import type { Database } from 'bun:sqlite'`(`Expression` 来自 `jsonata`,非 Bun) | +| **Node.js API** | 无 | +| **外部库** | `jsonata`(runtime 无关) | +| **内部依赖** | `./defs.js`(`getProjectionDef`、`listProjectionDefs`) | +| **对外暴露** | `ProjectionState`、`PROJECTIONS_SCHEMA`(DDL 字符串)、`clearExpressionCache()`、`getProjectionState()`、`foldProjection()`、`foldAllProjections()`、`resetProjections()` | + +**关键观察**: + +- 和 `defs.ts` 相同模式:所有函数接受 `Database` 参数。 +- `PROJECTIONS_SCHEMA` 是 SQL DDL 字符串,CF D1 可直接使用。 +- 折叠逻辑(JSONata 表达式求值)是纯 JS,runtime 无关。 + +--- + +### 1.4 `gc.ts` + +| 维度 | 详情 | +|------|------| +| **Bun-specific** | 无 | +| **Node.js API** | `node:fs`(`readdirSync`、`unlinkSync`)、`node:path`(`join`) | +| **内部依赖** | `./store.js`(`PulseStore` 接口) | +| **对外暴露** | `GcTier`、`GcConfig`、`GcResult`、`DEFAULT_GC_CONFIG`、`gcVitals()`、`gcOrphanObjects()`、`runGc()`、`createGcTrigger()` | + +**关键观察**: + +- `gcOrphanObjects()` 直接操作文件系统扫描 `objects/` 目录,CF Workers 无此能力。 +- `gcVitals()` 只依赖 `PulseStore` 接口(runtime 无关)。 +- CF 版本的 CAS 清理需要另一种机制(R2 bucket list + delete,或 TTL)。 + +--- + +### 1.5 `llm-client.ts` + +| 维度 | 详情 | +|------|------| +| **Bun-specific** | 无 | +| **Node.js API** | 无 | +| **内部依赖** | 无(叶子节点) | +| **对外暴露** | `LlmMessage`、`LlmTool`、`LlmResponse`、`LlmClient`、`createOpenAiLlmClient()` | + +**关键观察**: + +- **完全 runtime 无关**,使用标准 `fetch` + `AbortController` + `setTimeout`。 +- 可以直接放入 `pulse/` 核心包,无需任何改动。 + +--- + +### 1.6 `persona.ts` + +| 维度 | 详情 | +|------|------| +| **Bun-specific** | 无 | +| **Node.js API** | 无 | +| **内部依赖** | `./store.js`(`PulseStore` 接口)、`./task-events.js`(类型) | +| **对外暴露** | `buildPersonasFromEvents()` | + +**关键观察**: + +- **完全 runtime 无关**,只依赖 `PulseStore` 接口。 + +--- + +### 1.7 `task-events.ts` + +| 维度 | 详情 | +|------|------| +| **Bun-specific** | 无 | +| **Node.js API** | 无 | +| **内部依赖** | 无(纯类型定义文件) | +| **对外暴露** | 所有 `*Meta`、`*Data`、`*State` 类型 | + +**关键观察**: + +- **完全 runtime 无关**,纯类型定义,直接留在核心包。 + +--- + +### 1.8 `watcher.ts` + +| 维度 | 详情 | +|------|------| +| **Bun-specific** | 无 | +| **Node.js API** | 无 | +| **内部依赖** | `./store.js`(`EventRecord`、`PulseStore` 接口) | +| **对外暴露** | `VitalWithData`、`WakeCondition`、`WatcherDef`、`WatcherHandle`、`startWatcher()` | + +**关键观察**: + +- 使用 `setTimeout`(Web 标准),**完全 runtime 无关**。 +- `startWatcher()` 不依赖文件系统,只通过 `PulseStore` 接口写数据。 +- CF Workers 中需要注意:Watcher 使用无限循环 + `setTimeout`,不符合 Workers 的请求-响应模型,但在 Durable Objects 中可以运行。 + +--- + +### 1.9 `index.ts` + +| 维度 | 详情 | +|------|------| +| **Bun-specific** | 间接(通过 `./store.js`、`./gc.js`) | +| **Node.js API** | 间接(通过 `./store.js`、`./gc.js`) | +| **内部依赖** | `./gc.js`、`./projection-engine.js`、`./store.js`(类型)、`./watcher.js`、`./watchers/pending-tasks-projection.js` | +| **对外暴露** | 所有公共 API(见源码 export 列表) | + +**关键观察**: + +- `index.ts` 本身逻辑(`runPulse`、`runPulseV2`、`composeRules` 等)完全基于 `PulseStore` 接口,**runtime 无关**。 +- `objectsDir` 参数暴露了文件系统假设,CF 版本需要替代方案。 + +--- + +### 内部依赖图(核心文件) + +``` +index.ts + ├── gc.ts → store.ts (PulseStore 接口) + │ → node:fs, node:path (gcOrphanObjects) + ├── projection-engine.ts → defs.ts → bun:sqlite (Database 类型) + │ → node:crypto + ├── store.ts → bun:sqlite (Database 实现) + │ → node:crypto, node:fs, node:path + │ → defs.ts, projection-engine.ts + ├── watcher.ts → store.ts (PulseStore 接口) ✓ + ├── llm-client.ts (独立,无内部依赖) ✓ + ├── persona.ts → store.ts (PulseStore 接口), task-events.ts ✓ + ├── task-events.ts (纯类型) ✓ + └── watchers/ + └── pending-tasks-projection.ts → store.ts (PulseStore 接口), task-events.ts ✓ +``` + +--- + +## 2. Store 接口提取方案 + +### 2.1 当前 `PulseStore` 公共方法签名 + +```typescript +interface PulseStore { + appendEvent(event: Omit): EventRecord; + appendEvents(events: Omit[]): EventRecord[]; + createObject(opts: { objectType: string; externalId?: string; codeRev: string }): number; + getObjectInstance(id: number): ObjectInstance | null; + queryObjectsByType(objectType: string): ObjectInstance[]; + getLatest(kind: string, key?: string): EventRecord | null; + getLatestWhere(opts: { kind: string; key?: string; codeRev?: string }): EventRecord | null; + getRecent(limit?: number): EventRecord[]; + queryByKind(kind: string, opts?: { key?: string; since?: number; codeRev?: string; limit?: number }): EventRecord[]; + getAfter(afterId: number, opts?: { kind?: string; key?: string; codeRev?: string }): EventRecord[]; + hasEvents(): boolean; + putObject(data: unknown): string; // CAS 写入 + getObject(hash: string): unknown | null; // CAS 读取 + close(): void; + archiveEvents(olderThan: number): number; + downsampleEvents(kind: string, key: string, intervalMs: number, olderThan: number): number; +} +``` + +### 2.2 CF D1 可直接实现的方法 + +以下方法直接映射到 D1 SQL 操作(D1 支持标准 SQL,WAL 模式默认开启): + +| 方法 | CF D1 实现方式 | +|------|----------------| +| `appendEvent` | `INSERT INTO events ...`,D1 支持 `AUTOINCREMENT` | +| `appendEvents` | D1 支持批量 `prepare().bind().batch()` | +| `getLatest` | `SELECT ... ORDER BY occurred_at DESC LIMIT 1` | +| `getLatestWhere` | 动态 WHERE 条件 | +| `getRecent` | `SELECT ... ORDER BY occurred_at DESC LIMIT ?` | +| `queryByKind` | `SELECT ... WHERE kind = ?` + 可选过滤 | +| `getAfter` | `SELECT ... WHERE id > ?` | +| `hasEvents` | `SELECT 1 FROM events LIMIT 1` | +| `archiveEvents` | `DELETE FROM events WHERE occurred_at < ?` | +| `downsampleEvents` | D1 支持 `ROW_NUMBER() OVER (PARTITION BY ...)` | +| `createObject` | `INSERT INTO objects ...` with UNIQUE constraint | +| `getObjectInstance` | `SELECT * FROM objects WHERE id = ?` | +| `queryObjectsByType` | `SELECT * FROM objects WHERE object_type = ?` | + +**注意**:D1 API 是异步的(返回 `Promise`),而当前 `PulseStore` 是同步的。需要将接口改为异步: + +```typescript +interface PulseStore { + appendEvent(event: Omit): Promise; + getLatest(kind: string, key?: string): Promise; + // ...所有方法改为 async +} +``` + +这是最大的接口 breaking change。 + +### 2.3 涉及文件系统的方法(需替代方案) + +| 方法 | 当前实现 | CF 替代方案 | +|------|----------|-------------| +| `putObject(data)` | 写入 `objects/.json` | **选项 A**:存入 Cloudflare R2 bucket;**选项 B**:存入 D1 `cas_objects` 表(对小对象 < 1MB 可行);**选项 C**:KV storage(适合读多写少) | +| `getObject(hash)` | 读取 `objects/.json` | 对应选项的读取操作 | + +**推荐**:使用 D1 额外表存储 CAS 对象(简单,无需额外服务): + +```sql +CREATE TABLE IF NOT EXISTS cas_objects ( + hash TEXT PRIMARY KEY, + data TEXT NOT NULL, -- JSON string + created_at INTEGER NOT NULL +); +``` + +对于大对象(> 1MB),改用 R2。 + +### 2.4 `ScopedStore.scopeDatabase()` 问题 + +当前签名: + +```typescript +interface ScopedStore { + scopeDatabase(name: string): Database; // 返回 bun:sqlite Database +} +``` + +这个方法只被 `projection-engine.ts` 使用,用于直接执行 SQL。需要引入抽象: + +```typescript +/** 抽象 DB 访问接口(供 projection engine 使用) */ +interface PulseDatabase { + exec(sql: string): void; + prepare(sql: string): PulseStatement; + transaction(fn: () => T): T; +} + +interface ScopedStore { + scopeDatabase(name: string): PulseDatabase; // 不再暴露 bun:sqlite 类型 +} +``` + +--- + +## 3. `defs.ts` 和 `projection-engine.ts` 解耦方案 + +### 3.1 当前对 `Database` 的依赖 + +两个文件的所有函数都以 `Database`(`bun:sqlite`)为第一参数: + +```typescript +// defs.ts +export function initDefsSchema(db: Database): void +export function registerObjectDef(db: Database, opts: ...): ObjectDef +export function getEventDef(db: Database, name: string, codeRev: string): EventDef | null + +// projection-engine.ts +export function getProjectionState(scopeDb: Database, projectionName: string): ProjectionState | null +export async function foldProjection(scopeDb: Database, ...): Promise +``` + +### 3.2 改为依赖抽象接口 + +引入 `PulseDatabase` 抽象接口,屏蔽底层实现: + +```typescript +// 新增:packages/pulse/src/db-interface.ts + +export interface PulseStatement { + run(...params: unknown[]): { lastInsertRowid: number; changes: number }; + get(...params: unknown[]): unknown | null; + all(...params: unknown[]): unknown[]; +} + +export interface PulseDatabase { + /** 执行不返回结果的 SQL(DDL 等) */ + exec(sql: string): void; + + /** 创建预编译语句 */ + prepare(sql: string): PulseStatement; + + /** 同步事务(Bun/SQLite 的同步事务语义) */ + transaction(fn: () => T): () => T; +} +``` + +然后将 `defs.ts` 和 `projection-engine.ts` 的参数类型从 `Database` 改为 `PulseDatabase`: + +```typescript +// defs.ts (修改后) +import type { PulseDatabase } from './db-interface.js'; + +export function initDefsSchema(db: PulseDatabase): void { ... } +export function getProjectionDef(db: PulseDatabase, name: string, codeRev: string): ProjectionDef | null { ... } +``` + +**CF D1 适配器实现**: + +```typescript +// packages/pulseflare/src/d1-database.ts +import type { PulseDatabase, PulseStatement } from '@uncaged/pulse/db-interface'; + +export function createD1Database(d1: D1Database): PulseDatabase { + return { + exec(sql) { + // D1 DDL 需要 await,这里需要在初始化时提前执行 + // 考虑: do.storage.sql.exec() 的同步接口 + }, + prepare(sql) { + const stmt = d1.prepare(sql); + return { + run(...params) { return stmt.bind(...params).run(); }, + get(...params) { return stmt.bind(...params).first(); }, + all(...params) { return stmt.bind(...params).all().then(r => r.results); }, + }; + }, + transaction(fn) { + // D1 通过 batch() 实现事务 + return fn; + }, + }; +} +``` + +**挑战**:D1 是全异步 API,而当前 `defs.ts` 和 `projection-engine.ts` 使用同步调用(`db.prepare().get()`)。需要决定: + +- **方案 A**:将所有接口改为异步(`Promise` 返回),统一 Bun 和 CF 实现。 +- **方案 B**:在 CF 中使用 Durable Objects 的 `storage.sql`(有同步-like 接口)。 +- **方案 C**:保持同步接口,CF 侧使用 SQLite WASM(如 `@cloudflare/d1-sql-driver`)。 + +**推荐方案 A**:改为全异步,这符合 Web 标准和 CF 生态。 + +--- + +## 4. 包拆分建议 + +### 4.1 `@uncaged/pulse`(核心包,runtime 无关) + +**留在核心包的文件**: + +| 文件 | 说明 | +|------|------| +| `index.ts` | 核心循环逻辑(依赖 `PulseStore` 接口) | +| `store.ts` | **仅保留接口定义**(`PulseStore`、`ScopedStore`、`EventRecord` 等类型),移除实现 | +| `defs.ts` | 改为依赖 `PulseDatabase` 抽象后保留 | +| `projection-engine.ts` | 改为依赖 `PulseDatabase` 抽象后保留 | +| `gc.ts` | 保留 `gcVitals()` 和 `createGcTrigger()`;移除 `gcOrphanObjects()`(依赖 `node:fs`) | +| `llm-client.ts` | 完全保留 | +| `persona.ts` | 完全保留 | +| `task-events.ts` | 完全保留 | +| `watcher.ts` | 完全保留 | +| `watchers/pending-tasks-projection.ts` | 完全保留 | +| `rules/` | 完全保留(builtin、agent-loop 等) | +| `db-interface.ts` | **新增**:`PulseDatabase` / `PulseStatement` 抽象接口 | + +**从核心包移除的内容**: + +- `createStore()` 和 `createScopedStore()` 的**实现**(移至 `pulse-bun`) +- `gcOrphanObjects()` 的文件系统实现(移至各 adapter) + +### 4.2 `@uncaged/pulse-bun`(Bun adapter) + +**移入此包的文件**: + +| 文件 | 说明 | +|------|------| +| `store.ts`(实现部分) | `createStore()`、`createScopedStore()` 的 `bun:sqlite` 实现 | +| `gc-bun.ts`(新建) | `gcOrphanObjects()` 的文件系统实现 | +| `db-bun.ts`(新建) | `PulseDatabase` 的 `bun:sqlite` 实现 | + +**依赖**:`@uncaged/pulse`(核心包)、`bun:sqlite`、`node:crypto`、`node:fs`、`node:path` + +### 4.3 `@uncaged/pulseflare`(CF adapter)— 需实现的内容 + +| 接口/功能 | 实现方式 | +|-----------|----------| +| `PulseStore` | 基于 CF D1 实现所有方法(全异步) | +| `ScopedStore` | 每个 scope 对应一个 D1 database binding | +| `PulseDatabase` | 包装 D1 API,实现 `exec`/`prepare`/`transaction` | +| CAS `putObject`/`getObject` | D1 `cas_objects` 表(小对象)或 R2 bucket(大对象) | +| `gcOrphanObjects` | R2 `list()` + `delete()` 或 D1 TTL 策略 | +| Watcher 运行时 | Durable Objects alarm API(替代 `setInterval`) | +| `runPulse` 循环 | Durable Objects(长驻进程) | + +--- + +## 5. 风险和注意事项 + +### 5.1 向后兼容问题 + +| 风险 | 严重度 | 说明 | +|------|--------|------| +| `PulseStore` 接口从同步变异步 | 🔴 高 | 所有调用方需要加 `await`,涉及 `index.ts`、`gc.ts`、`watcher.ts`、所有 watchers | +| `ScopedStore.scopeDatabase()` 返回类型变更 | 🟡 中 | 仅 `projection-engine.ts` 依赖,改为 `PulseDatabase` 接口 | +| `createStore`/`createScopedStore` 从 `@uncaged/pulse` 迁移到 `@uncaged/pulse-bun` | 🟡 中 | 所有直接使用的代码需要更新 import 路径 | +| `gcOrphanObjects` 从核心包移出 | 🟢 低 | 已在 `runGc()` 内部调用,外部调用者少 | + +**缓解策略**: + +1. 在 `@uncaged/pulse` 中保留 `createStore`/`createScopedStore` 的 re-export(从 `@uncaged/pulse-bun`),标记 `@deprecated`,给迁移窗口期。 +2. 先发布 v2 核心包(接口不变,只是类型抽象),再发布 v3(异步接口),配合 semver major 版本号。 + +### 5.2 同步 vs 异步接口的核心挑战 + +当前 Bun SQLite 的同步特性被大量依赖: + +```typescript +// 当前:同步 +const row = db.prepare('SELECT ...').get(id); + +// CF D1:必须异步 +const row = await d1.prepare('SELECT ...').bind(id).first(); +``` + +**推荐迁移路径**: + +1. 将 `PulseStore` 所有方法改为 `Promise` 返回(breaking change,需要 major 版本)。 +2. Bun 实现可以用 `Promise.resolve()` 包装同步结果,兼容新接口。 +3. `projection-engine.ts` 的折叠逻辑已经是 `async`(因为 JSONata),只需将数据库读取改为 `await`。 + +### 5.3 CF Workers 运行模型限制 + +| 限制 | 影响 | 解决方案 | +|------|------|----------| +| 无文件系统 | CAS 对象存储 | D1 表或 R2 | +| 请求-响应模型(无长驻进程) | `runPulse` 无限循环 | Durable Objects | +| CPU 时间限制(默认 30ms) | 复杂 JSONata 折叠 | 拆分到多个请求 / DO | +| 无 `setInterval`(常规 Worker) | Watcher 定时采集 | Durable Objects alarm | +| `node:crypto` 不可用(部分 API) | SHA-256 哈希 | `crypto.subtle.digest('SHA-256', ...)` | +| `node:fs` 不可用 | CAS、GC | R2/D1 替代 | + +### 5.4 测试策略 + +**核心包单元测试**(runtime 无关): +- 用 in-memory `PulseStore` mock 测试 `composeRules`、`rebuildSnapshot`、`foldProjection` 等逻辑。 +- 已有大量 `*.test.ts`,只需将测试依赖改为 mock store。 + +**集成测试**: +- `pulse-bun`:继续用 Bun test runner,真实 SQLite。 +- `pulseflare`:用 Miniflare(CF Workers 本地模拟器)测试 D1 + DO。 + +### 5.5 迁移顺序建议 + +``` +Phase 1:接口抽象(非 breaking) + ├── 新增 db-interface.ts(PulseDatabase 接口) + ├── defs.ts:Database → PulseDatabase(类型兼容,Bun Database 实现了 PulseDatabase) + ├── projection-engine.ts:同上 + └── store.ts:ScopedStore.scopeDatabase() 返回 PulseDatabase + +Phase 2:包拆分(非 breaking,re-export 兼容) + ├── 新建 @uncaged/pulse-bun 包 + ├── createStore/createScopedStore 实现移入 pulse-bun + ├── gcOrphanObjects 移入 pulse-bun + └── @uncaged/pulse 保留 re-export(标记 deprecated) + +Phase 3:接口异步化(breaking,major 版本) + ├── PulseStore 所有方法改为 Promise + ├── Bun 实现用 Promise.resolve() 包装 + └── 更新 index.ts、gc.ts、watcher.ts 所有调用点 + +Phase 4:Pulseflare 实现 + ├── D1 PulseStore 实现 + ├── D1 PulseDatabase 实现 + ├── CAS via D1 cas_objects 表 + ├── Durable Objects 运行 runPulse 循环 + └── Durable Objects alarm 驱动 Watcher +``` + +--- + +## 附录:CAS 抽象接口建议 + +为了完全解耦对象存储,建议将 CAS 从 `PulseStore` 中分离为独立接口: + +```typescript +/** 内容寻址对象存储抽象 */ +export interface CasStore { + put(data: unknown): Promise; // 写入,返回 hash + get(hash: string): Promise; // 读取 + listOrphans(referencedHashes: Set): Promise; // GC 支持 + delete(hash: string): Promise; +} +``` + +这样: +- Bun 实现:基于 `node:fs` 的文件系统 CAS +- CF 实现:基于 D1 的表 CAS 或 R2 bucket CAS +- 核心包:只依赖 `CasStore` 接口,不关心存储后端