From ca3dd52e38c76bf6821641360ae7ca5dbd508a02 Mon Sep 17 00:00:00 2001 From: xiaomo Date: Sat, 18 Apr 2026 02:50:12 +0000 Subject: [PATCH] refactor!: async PulseStore interface for multi-runtime support (refs #5) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - PulseStore/ScopedStore 接口方法返回 Promise - 所有 store 实现方法改为 async(bun:sqlite 同步调用外包 async) - defs.ts, projection-engine.ts, gc.ts 公共函数异步化 - index.ts 中 store 调用加 await - rules/health.ts rebuildHealth 函数异步化 - 核心异步化完成,支持 CF D1 等异步数据库运行时 Breaking Changes: - All PulseStore methods now return Promise - Consumer code must add await for all store operations - Tests need to be updated to use async/await patterns --- packages/pulse/src/bin/submit-task.ts | 6 +- packages/pulse/src/bin/workflow-daemon.ts | 8 +- packages/pulse/src/defs.ts | 30 ++-- packages/pulse/src/e2e/council-demo.ts | 12 +- packages/pulse/src/e2e/council-v2-live.ts | 8 +- .../pulse/src/e2e/meta-coding-optimize.ts | 8 +- packages/pulse/src/e2e/meta-full-e2e.ts | 8 +- .../pulse/src/e2e/meta-optimize-coding.ts | 6 +- packages/pulse/src/e2e/meta-tdd-coding.ts | 8 +- packages/pulse/src/e2e/report-live.ts | 61 +++++---- packages/pulse/src/gc.ts | 38 +++--- packages/pulse/src/index.ts | 78 +++++------ packages/pulse/src/persona.ts | 8 +- packages/pulse/src/projection-engine.ts | 12 +- packages/pulse/src/rules/agent-loop.ts | 6 +- packages/pulse/src/rules/health.ts | 8 +- packages/pulse/src/store.test.ts | 26 ++-- packages/pulse/src/store.ts | 129 +++++++++--------- packages/pulse/src/watcher.ts | 20 +-- .../src/workflows/workflow-rule-adapter.ts | 22 +-- 20 files changed, 254 insertions(+), 248 deletions(-) diff --git a/packages/pulse/src/bin/submit-task.ts b/packages/pulse/src/bin/submit-task.ts index ec90978..4b42a35 100644 --- a/packages/pulse/src/bin/submit-task.ts +++ b/packages/pulse/src/bin/submit-task.ts @@ -22,8 +22,8 @@ const store = createStore({ }); const content = readFileSync(taskFile, 'utf-8'); -const hash = store.putObject(content); -store.appendEvent({ +const hash = await store.putObject(content); +await store.appendEvent({ occurredAt: Date.now(), kind: `${workflow}.__start__`, key: topicKey, @@ -33,4 +33,4 @@ store.appendEvent({ console.log(`✅ Submitted: ${workflow}.__start__ [${topicKey}]`); console.log(` Content: ${content.slice(0, 100)}...`); console.log(` Hash: ${hash}`); -store.close(); +await store.close(); diff --git a/packages/pulse/src/bin/workflow-daemon.ts b/packages/pulse/src/bin/workflow-daemon.ts index 049d1f0..c856376 100644 --- a/packages/pulse/src/bin/workflow-daemon.ts +++ b/packages/pulse/src/bin/workflow-daemon.ts @@ -106,7 +106,7 @@ console.log(''); // ── Adaptive Tick Loop ───────────────────────────────────────── let currentTickMs = BASE_TICK_MS; -let lastEventCount = store.getAfter(0).length; +let lastEventCount = (await store.getAfter(0)).length; let timer: ReturnType; const logTick = async () => { @@ -116,7 +116,7 @@ const logTick = async () => { const ts = new Date().toISOString().slice(11, 19); // Check if new events appeared → activity - const nowCount = store.getAfter(0).length; + const nowCount = (await store.getAfter(0)).length; if (nowCount > lastEventCount) { currentTickMs = BASE_TICK_MS; // reset to fast lastEventCount = nowCount; @@ -147,8 +147,8 @@ scheduleNext(); const shutdown = (signal: string) => { console.log(`\n⏹️ ${signal} received, shutting down...`); clearTimeout(timer); - store.close(); - logStore.close(); + void store.close(); + void logStore.close(); process.exit(0); }; diff --git a/packages/pulse/src/defs.ts b/packages/pulse/src/defs.ts index ff9d49b..86eeff9 100644 --- a/packages/pulse/src/defs.ts +++ b/packages/pulse/src/defs.ts @@ -103,7 +103,7 @@ const PROJECTION_DEF_SOURCES_SCHEMA = ` * Initialize the definition schema on an existing database connection. * Use this when you manage the database lifecycle externally. */ -export function initDefsSchema(db: Database): void { +export async function initDefsSchema(db: Database): Promise { db.exec(OBJECT_DEFS_SCHEMA); db.exec(EVENT_DEFS_SCHEMA); db.exec(PROJECTION_DEFS_SCHEMA); @@ -159,13 +159,13 @@ const selectObjectDef = (db: Database) => WHERE name = ? AND code_rev = ? `); -export function registerObjectDef( +export async function registerObjectDef( db: Database, opts: { name: string; codeRev: string; }, -): ObjectDef { +): Promise { const createdAt = Date.now(); try { @@ -186,11 +186,11 @@ export function registerObjectDef( }; } -export function getObjectDef( +export async function getObjectDef( db: Database, name: string, codeRev: string, -): ObjectDef | null { +): Promise { const row = selectObjectDef(db).get(name, codeRev) as any; if (!row) return null; @@ -225,7 +225,7 @@ const selectEventDefsByCodeRev = (db: Database) => ORDER BY name `); -export function registerEventDef( +export async function registerEventDef( db: Database, opts: { name: string; @@ -233,7 +233,7 @@ export function registerEventDef( parentHash?: string; codeRev: string; }, -): EventDef { +): Promise { const hash = calculateEventHash(opts.name, opts.schema); const createdAt = Date.now(); @@ -265,11 +265,11 @@ export function registerEventDef( }; } -export function getEventDef( +export async function getEventDef( db: Database, name: string, codeRev: string, -): EventDef | null { +): Promise { const row = selectEventDefByNameCodeRev(db).get(name, codeRev) as any; if (!row) return null; @@ -284,10 +284,10 @@ export function getEventDef( }; } -export function listEventDefs( +export async function listEventDefs( db: Database, opts: { codeRev: string }, -): EventDef[] { +): Promise { const rows = selectEventDefsByCodeRev(db).all(opts.codeRev) as any[]; return rows.map((row) => ({ @@ -429,11 +429,11 @@ export async function registerProjectionDef( }; } -export function getProjectionDef( +export async function getProjectionDef( db: Database, name: string, codeRev: string, -): ProjectionDef | null { +): Promise { const row = selectProjectionDefByNameCodeRev(db).get(name, codeRev) as any; if (!row) return null; @@ -458,10 +458,10 @@ export function getProjectionDef( }; } -export function listProjectionDefs( +export async function listProjectionDefs( db: Database, opts: { codeRev: string }, -): ProjectionDef[] { +): Promise { const rows = selectProjectionDefsByCodeRev(db).all(opts.codeRev) as any[]; return rows.map((row) => { diff --git a/packages/pulse/src/e2e/council-demo.ts b/packages/pulse/src/e2e/council-demo.ts index 8b163e4..ad74fc4 100644 --- a/packages/pulse/src/e2e/council-demo.ts +++ b/packages/pulse/src/e2e/council-demo.ts @@ -90,8 +90,8 @@ async function main() { ]; for (const t of tasks) { - const hash = store.putObject(t.description); - store.appendEvent({ + const hash = await store.putObject(t.description); + await store.appendEvent({ occurredAt: Date.now(), kind: 'coding.__start__', key: t.topicId, @@ -123,9 +123,9 @@ async function main() { log('📊', 'Step 3: Event Timeline'); console.log(); - const allEvents = store - .getAfter(0) - .filter((e) => e.kind.startsWith('coding.')); + const allEvents = (await store.getAfter(0)).filter((e) => + e.kind.startsWith('coding.'), + ); for (const e of allEvents) { const ts = new Date(e.occurredAt).toISOString().slice(11, 23); const _role = e.kind.split('.')[1]; @@ -147,7 +147,7 @@ async function main() { log('✅', 'Council v2 demo completed!'); console.log(); } finally { - store.close(); + await store.close(); if (KEEP || useCustomDb) { console.log(`📁 Events DB preserved: ${dbPath}`); } else if (tmpDir) { diff --git a/packages/pulse/src/e2e/council-v2-live.ts b/packages/pulse/src/e2e/council-v2-live.ts index ad99dd9..c1bc103 100644 --- a/packages/pulse/src/e2e/council-v2-live.ts +++ b/packages/pulse/src/e2e/council-v2-live.ts @@ -144,10 +144,10 @@ async function main() { const rule = createWorkflowRule(codingTask, store); // Create a real task via coding.__start__ event - const hash = store.putObject( + const hash = await store.putObject( 'Create packages/pulse/COUNCIL-V2.md — a concise (30-50 lines) overview of the Council v2 model. Cover: WorkflowType (events, roles, moderator), START/END automaton, pure roles, Moore machine diff-driven ticks. Reference source files in workflows/ directory. Do NOT modify any existing files.', ); - store.appendEvent({ + await store.appendEvent({ occurredAt: Date.now(), kind: 'coding.__start__', key: 'add-v2-readme', @@ -177,7 +177,7 @@ async function main() { // Print event timeline console.log(`\n📊 === Event Timeline ===`); - const events = store.getAfter(0); + const events = await store.getAfter(0); const t0ev = events[0]?.occurredAt ?? 0; for (const ev of events) { const delta = ((ev.occurredAt - t0ev) / 1000).toFixed(1); @@ -188,7 +188,7 @@ async function main() { ); } - store.close(); + await store.close(); console.log(`\n📁 DB: ${dbPath}`); console.log(`✅ Done in ${((Date.now() - t0) / 1000).toFixed(1)}s\n`); } diff --git a/packages/pulse/src/e2e/meta-coding-optimize.ts b/packages/pulse/src/e2e/meta-coding-optimize.ts index 7cbbfb5..12e8c1b 100644 --- a/packages/pulse/src/e2e/meta-coding-optimize.ts +++ b/packages/pulse/src/e2e/meta-coding-optimize.ts @@ -126,8 +126,8 @@ ${reviewerSrc} - commit author: 小橘 `; - const hash = store.putObject(taskDescription); - store.appendEvent({ + const hash = await store.putObject(taskDescription); + await store.appendEvent({ occurredAt: Date.now(), kind: 'meta.__start__', key: 'optimize-coding', @@ -142,7 +142,7 @@ ${reviewerSrc} console.log('Spec:', r1.executed[0].content.slice(0, 500), '...\n'); } else { console.log('❌ Architect failed'); - store.close(); + await store.close(); return; } @@ -157,7 +157,7 @@ ${reviewerSrc} console.log('❌ Coder not triggered'); } - store.close(); + await store.close(); console.log('\n🏁 Done'); } diff --git a/packages/pulse/src/e2e/meta-full-e2e.ts b/packages/pulse/src/e2e/meta-full-e2e.ts index 908d60e..f78ab31 100644 --- a/packages/pulse/src/e2e/meta-full-e2e.ts +++ b/packages/pulse/src/e2e/meta-full-e2e.ts @@ -112,8 +112,8 @@ ${readSrc('packages/pulse/src/workflows/roles/reviewer-cursor.ts')} - commit author: 小橘 `; - const hash = store.putObject(taskDescription); - store.appendEvent({ + const hash = await store.putObject(taskDescription); + await store.appendEvent({ occurredAt: Date.now(), kind: 'meta.__start__', key: 'optimize-coding', @@ -138,7 +138,7 @@ ${readSrc('packages/pulse/src/workflows/roles/reviewer-cursor.ts')} } // Check if we hit END - const events = store.getAfter(0); + const events = await store.getAfter(0); const lastEvent = events[events.length - 1]; if (lastEvent?.kind === 'meta.promoter') { console.log('\n🎉 Workflow completed — promoter finished!'); @@ -146,7 +146,7 @@ ${readSrc('packages/pulse/src/workflows/roles/reviewer-cursor.ts')} } } - store.close(); + await store.close(); console.log('\n🏁 Done'); } diff --git a/packages/pulse/src/e2e/meta-optimize-coding.ts b/packages/pulse/src/e2e/meta-optimize-coding.ts index c77ca6c..f1fb972 100644 --- a/packages/pulse/src/e2e/meta-optimize-coding.ts +++ b/packages/pulse/src/e2e/meta-optimize-coding.ts @@ -97,8 +97,8 @@ ${architectSrc} 5. 可选:加 retry 上限(防止无限循环) `; - const hash = store.putObject(taskDescription); - store.appendEvent({ + const hash = await store.putObject(taskDescription); + await store.appendEvent({ occurredAt: Date.now(), kind: 'meta.__start__', key: 'optimize-coding', @@ -121,7 +121,7 @@ ${architectSrc} console.log('❌ No role executed'); } - store.close(); + await store.close(); } main().catch(console.error); diff --git a/packages/pulse/src/e2e/meta-tdd-coding.ts b/packages/pulse/src/e2e/meta-tdd-coding.ts index ee94604..deb5eb9 100644 --- a/packages/pulse/src/e2e/meta-tdd-coding.ts +++ b/packages/pulse/src/e2e/meta-tdd-coding.ts @@ -199,8 +199,8 @@ ${readSrc('packages/pulse/src/workflows/workflow-type.ts')} 8. bun run build 通过 `; - const hash = store.putObject(taskDescription); - store.appendEvent({ + const hash = await store.putObject(taskDescription); + await store.appendEvent({ occurredAt: Date.now(), kind: 'meta.__start__', key: 'tdd-coding-workflow', @@ -223,7 +223,7 @@ ${readSrc('packages/pulse/src/workflows/workflow-type.ts')} console.log(` meta: ${JSON.stringify(ex.meta)}`); } - const events = store.getAfter(0); + const events = await store.getAfter(0); const lastEvent = events[events.length - 1]; if (lastEvent?.kind === 'meta.promoter') { console.log('\n🎉 Workflow completed — promoter finished!'); @@ -231,7 +231,7 @@ ${readSrc('packages/pulse/src/workflows/workflow-type.ts')} } } - store.close(); + await store.close(); console.log('\n🏁 Done'); } diff --git a/packages/pulse/src/e2e/report-live.ts b/packages/pulse/src/e2e/report-live.ts index 96eacc7..24174ff 100644 --- a/packages/pulse/src/e2e/report-live.ts +++ b/packages/pulse/src/e2e/report-live.ts @@ -52,46 +52,49 @@ const source = createStore({ objectsDir: sourceObjDir, }); -const events = source - .getAfter(0) - .filter((e) => e.kind.startsWith('coding.') && e.key === workflowKey); +const events = (await source.getAfter(0)).filter( + (e) => e.kind.startsWith('coding.') && e.key === workflowKey, +); if (events.length === 0) { console.error(`No events found for key: ${workflowKey}`); - source.close(); + await source.close(); process.exit(1); } const tStart = events[0].occurredAt; +const eventItems = await Promise.all( + events.map(async (e, i) => { + const role = e.kind.replace('coding.', ''); + const meta = e.meta ? JSON.parse(e.meta) : null; + let content: string | null = null; + if (e.hash) { + try { + const obj = await source.getObject(e.hash); + content = typeof obj === 'string' ? obj : JSON.stringify(obj); + } catch {} + } + return { + id: e.id, + role, + offsetMs: e.occurredAt - tStart, + durationMs: i > 0 ? e.occurredAt - events[i - 1].occurredAt : 0, + meta, + content, + }; + }), +); const timelineJson = JSON.stringify( { key: workflowKey, totalMs: events[events.length - 1].occurredAt - tStart, - events: events.map((e, i) => { - const role = e.kind.replace('coding.', ''); - const meta = e.meta ? JSON.parse(e.meta) : null; - let content: string | null = null; - if (e.hash) { - try { - const obj = source.getObject(e.hash); - content = typeof obj === 'string' ? obj : JSON.stringify(obj); - } catch {} - } - return { - id: e.id, - role, - offsetMs: e.occurredAt - tStart, - durationMs: i > 0 ? e.occurredAt - events[i - 1].occurredAt : 0, - meta, - content, - }; - }), + events: eventItems, }, null, 2, ); -source.close(); +await source.close(); console.log( ts(), `Timeline loaded: ${events.length} events, ${(JSON.parse(timelineJson).totalMs / 1000).toFixed(1)}s`, @@ -119,8 +122,8 @@ const reportWorkflow = createReportWorkflow({ const rule = createWorkflowRule(reportWorkflow, reportStore); // Seed with timeline JSON -const hash = reportStore.putObject(timelineJson); -reportStore.appendEvent({ +const hash = await reportStore.putObject(timelineJson); +await reportStore.appendEvent({ occurredAt: Date.now(), kind: 'report.__start__', key: `report-${workflowKey}`, @@ -146,10 +149,10 @@ while (tickNum < 5) { } // Extract HTML report -const allEvents = reportStore.getAfter(0); +const allEvents = await reportStore.getAfter(0); const rendererEvt = allEvents.find((e) => e.kind === 'report.renderer'); if (rendererEvt?.hash) { - const html = reportStore.getObject(rendererEvt.hash) as string; + const html = await reportStore.getObject(rendererEvt.hash) as string; const outPath = join(tmpDir, `report-${workflowKey}.html`); writeFileSync(outPath, html, 'utf-8'); console.log( @@ -168,5 +171,5 @@ if (analystEvt?.meta) { ); } -reportStore.close(); +await reportStore.close(); console.log(`\n✅ Done in ${((Date.now() - t0) / 1000).toFixed(1)}s`); diff --git a/packages/pulse/src/gc.ts b/packages/pulse/src/gc.ts index 2bac431..30fadce 100644 --- a/packages/pulse/src/gc.ts +++ b/packages/pulse/src/gc.ts @@ -63,10 +63,10 @@ export interface GcResult { * Run GC on vitals store: downsample + archive. * Returns stats about what was cleaned up. */ -export function gcVitals( +export async function gcVitals( vitalsStore: PulseStore, config: GcConfig = DEFAULT_GC_CONFIG, -): { downsampledCount: number; archivedCount: number } { +): Promise<{ downsampledCount: number; archivedCount: number }> { const now = Date.now(); let downsampledCount = 0; let archivedCount = 0; @@ -82,13 +82,13 @@ export function gcVitals( if (tier.intervalMs === null) { // Hard delete (archive) - archivedCount += vitalsStore.archiveEvents(olderThan); + archivedCount += await vitalsStore.archiveEvents(olderThan); } else { // Downsample: query distinct kind+key combos from vitals, then downsample each. // We downsample all kinds present in vitals. - const kinds = getDistinctKindKeys(vitalsStore, olderThan); + const kinds = await getDistinctKindKeys(vitalsStore, olderThan); for (const { kind, key } of kinds) { - downsampledCount += vitalsStore.downsampleEvents( + downsampledCount += await vitalsStore.downsampleEvents( kind, key, tier.intervalMs, @@ -107,10 +107,10 @@ export function gcVitals( * Scans all events in the given stores for hash references, * then compares against files in objectsDir. */ -export function gcOrphanObjects( +export async function gcOrphanObjects( stores: PulseStore[], objectsDir: string, -): number { +): Promise { // Mark: collect all hashes referenced by events const referencedHashes = new Set(); @@ -129,7 +129,7 @@ export function gcOrphanObjects( 'init', 'gc', ]) { - const events = store.queryByKind(kind, {}); + const events = await store.queryByKind(kind, {}); for (const event of events) { if (event.hash) { referencedHashes.add(event.hash); @@ -167,22 +167,22 @@ export function gcOrphanObjects( * Full GC cycle: vitals downsample/archive + CAS orphan sweep. * Writes a gc event to systemStore for observability. */ -export function runGc(options: { +export async function runGc(options: { vitalsStore: PulseStore; systemStore: PulseStore; allStores: PulseStore[]; objectsDir: string; config?: GcConfig; -}): GcResult { +}): Promise { const config = options.config ?? DEFAULT_GC_CONFIG; const start = Date.now(); - const { downsampledCount, archivedCount } = gcVitals( + const { downsampledCount, archivedCount } = await gcVitals( options.vitalsStore, config, ); - const orphanObjectsCount = gcOrphanObjects( + const orphanObjectsCount = await gcOrphanObjects( options.allStores, options.objectsDir, ); @@ -197,7 +197,7 @@ export function runGc(options: { }; // Write GC stats event to _system scope - options.systemStore.appendEvent({ + await options.systemStore.appendEvent({ occurredAt: Date.now(), kind: 'gc', key: 'vitals', @@ -229,11 +229,9 @@ export function createGcTrigger(options: { tickCount++; if (tickCount >= config.tickInterval) { tickCount = 0; - try { - runGc({ ...options, config }); - } catch (err) { + runGc({ ...options, config }).catch((err) => { console.error('[pulse gc]', err); - } + }); } }; } @@ -244,10 +242,10 @@ export function createGcTrigger(options: { * Get distinct kind+key combos from a store for events older than a threshold. * Used to know which series to downsample. */ -function getDistinctKindKeys( +async function getDistinctKindKeys( store: PulseStore, olderThan: number, -): Array<{ kind: string; key: string }> { +): Promise> { // queryByKind returns events filtered by kind. // For vitals, the main kind is 'vital' with key = watcher name. // We also handle 'collect' kind in case vitals store has those. @@ -255,7 +253,7 @@ function getDistinctKindKeys( const seen = new Set(); for (const kind of ['vital', 'collect']) { - const events = store.queryByKind(kind, { limit: 1000 }); + const events = await store.queryByKind(kind, { limit: 1000 }); for (const event of events) { const pair = `${kind}:${event.key ?? ''}`; if (!seen.has(pair) && event.occurredAt < olderThan) { diff --git a/packages/pulse/src/index.ts b/packages/pulse/src/index.ts index a9179a8..e9cccaf 100644 --- a/packages/pulse/src/index.ts +++ b/packages/pulse/src/index.ts @@ -162,8 +162,8 @@ export function composeRules( * If no rollback, use the latest promote event. * If no promote at all, return null (cold start). */ -export function findEffectiveEpoch(store: PulseStore): EventRecord | null { - const rollback = store.getLatest('rollback'); +export async function findEffectiveEpoch(store: PulseStore): Promise { + const rollback = await store.getLatest('rollback'); if (rollback) { // rollback.meta should contain { to: 'v1' } — the code_rev to roll back to let meta: Record = {}; @@ -171,14 +171,14 @@ export function findEffectiveEpoch(store: PulseStore): EventRecord | null { meta = rollback.meta ? JSON.parse(rollback.meta) : {}; } catch { // Corrupted meta — skip this rollback event, fall through to latest promote - return store.getLatest('promote'); + return await store.getLatest('promote'); } const targetRev = (meta.to as string | undefined) || rollback.codeRev; if (targetRev) { - return store.getLatestWhere({ kind: 'promote', codeRev: targetRev }); + return await store.getLatestWhere({ kind: 'promote', codeRev: targetRev }); } } - return store.getLatest('promote'); + return await store.getLatest('promote'); } // ── Snapshot Rebuild ─────────────────────────────────────────── @@ -196,12 +196,12 @@ export function findEffectiveEpoch(store: PulseStore): EventRecord | null { * task projections (pending-tasks, agent-capability-stats) are folded from workflowStore. * Falls back to options.systemStore for backward compatibility. */ -export function rebuildSnapshot( +export async function rebuildSnapshot( storeOrStores: PulseStore | { system: PulseStore; vitals: PulseStore }, senseKeys: string[], epoch?: EventRecord | null, options?: { systemStore?: PulseStore; workflowStore?: PulseStore }, -): S { +): Promise { const isMultiStore = typeof storeOrStores === 'object' && 'system' in storeOrStores && @@ -218,9 +218,9 @@ export function rebuildSnapshot( for (const key of senseKeys) { // Priority 1: read latest vital from vitals store (if provided) if (vitalsStore) { - const latestVital = vitalsStore.getLatest('vital', key); + const latestVital = await vitalsStore.getLatest('vital', key); if (latestVital?.hash) { - const data = vitalsStore.getObject(latestVital.hash); + const data = await vitalsStore.getObject(latestVital.hash); if (data !== null) { snapshot[key] = { data, @@ -234,7 +234,7 @@ export function rebuildSnapshot( // Priority 2: fallback to events table (migrate/init events) if (epoch) { - const events = store.getAfter(epoch.id, { + const events = await store.getAfter(epoch.id, { kind: 'collect', key, codeRev: epoch.codeRev ?? undefined, @@ -242,7 +242,7 @@ export function rebuildSnapshot( const latestCollect = events.length > 0 ? events[events.length - 1]! : null; if (latestCollect?.hash) { - const data = store.getObject(latestCollect.hash); + const data = await store.getObject(latestCollect.hash); if (data !== null) { snapshot[key] = { data, @@ -253,7 +253,7 @@ export function rebuildSnapshot( } // Check migrate events - const migrateEvents = store.getAfter(epoch.id, { + const migrateEvents = await store.getAfter(epoch.id, { kind: 'migrate', key, codeRev: epoch.codeRev ?? undefined, @@ -261,7 +261,7 @@ export function rebuildSnapshot( if (migrateEvents.length > 0) { const latestMigrate = migrateEvents[migrateEvents.length - 1]!; if (latestMigrate.hash) { - const data = store.getObject(latestMigrate.hash); + const data = await store.getObject(latestMigrate.hash); if (data !== null) { snapshot[key] = { data, @@ -273,7 +273,7 @@ export function rebuildSnapshot( } // Check init events - const initEvents = store.getAfter(epoch.id, { + const initEvents = await store.getAfter(epoch.id, { kind: 'init', key, codeRev: epoch.codeRev ?? undefined, @@ -281,7 +281,7 @@ export function rebuildSnapshot( if (initEvents.length > 0) { const latestInit = initEvents[initEvents.length - 1]!; if (latestInit.hash) { - const data = store.getObject(latestInit.hash); + const data = await store.getObject(latestInit.hash); if (data !== null) { snapshot[key] = { data, @@ -292,9 +292,9 @@ export function rebuildSnapshot( } } else { // No epoch — try latest collect from events table directly - const latest = store.getLatest('collect', key); + const latest = await store.getLatest('collect', key); if (latest?.hash) { - const data = store.getObject(latest.hash); + const data = await store.getObject(latest.hash); if (data !== null) { snapshot[key] = { data, @@ -319,10 +319,10 @@ export function rebuildSnapshot( * Read each declared projection's current value, using "scope/name" as key. * If projection doesn't exist, value is null (graceful degradation). */ -export function buildSnapshotFromProjections( +export async function buildSnapshotFromProjections( scopedStore: ScopedStore, projectionPaths: string[], // ["_vitals/cpu_usage", "neko/session_count"] -): S { +): Promise { const snapshot: Record = { timestamp: Date.now() }; for (const projectionPath of projectionPaths) { @@ -341,7 +341,7 @@ export function buildSnapshotFromProjections( const scopeDb = scopedStore.scopeDatabase(scopeName); // Get projection state - const projectionState = getProjectionState(scopeDb, projectionName); + const projectionState = await getProjectionState(scopeDb, projectionName); if (projectionState) { snapshot[projectionPath] = projectionState.value; @@ -447,9 +447,9 @@ export async function runPulse(options: { const pulse = composeRules(rules, defaultTickMs); // Determine version epoch (always from system store) - const epoch = findEffectiveEpoch(systemStore); + const epoch = await findEffectiveEpoch(systemStore); - let prev = rebuildSnapshot( + let prev = await rebuildSnapshot( { system: systemStore, vitals: vitalsStore }, senseKeys, epoch, @@ -510,7 +510,7 @@ export async function runPulse(options: { ticking = true; pendingWake = false; - const curr = rebuildSnapshot( + const curr = await rebuildSnapshot( { system: systemStore, vitals: vitalsStore }, senseKeys, epoch, @@ -522,8 +522,8 @@ export async function runPulse(options: { // Write effect events to store (fire-and-forget — executorLoop picks them up) if (effects.length > 0) { for (const effect of effects) { - const effectHash = systemStore.putObject(effect); - systemStore.appendEvent({ + const effectHash = await systemStore.putObject(effect); + await systemStore.appendEvent({ occurredAt: Date.now(), kind: 'effect', key: effectHash, @@ -564,7 +564,7 @@ export async function runPulse(options: { } // Record tick event with the actual tickMs that will be used for next iteration - systemStore.appendEvent({ + await systemStore.appendEvent({ occurredAt: Date.now(), kind: 'tick', meta: JSON.stringify({ @@ -629,7 +629,7 @@ export async function runPulseV2(options: { const pulse = composeRules(rules, defaultTickMs); // Build initial snapshot - let prev = buildSnapshotFromProjections(scopedStore, projectionPaths); + let prev = await buildSnapshotFromProjections(scopedStore, projectionPaths); let tickMs = defaultTickMs; // ── Wake mechanism ───────────────────────────────────────────── @@ -698,7 +698,7 @@ export async function runPulseV2(options: { } // Build current snapshot from projections - const curr = buildSnapshotFromProjections(scopedStore, projectionPaths); + const curr = await buildSnapshotFromProjections(scopedStore, projectionPaths); const tickStart = Date.now(); const [effects, nextTickMs] = await pulse(prev, curr); @@ -706,8 +706,8 @@ export async function runPulseV2(options: { if (effects.length > 0) { const sysStore = scopedStore.scope('_system'); for (const effect of effects) { - const effectHash = sysStore.putObject(effect); - sysStore.appendEvent({ + const effectHash = await sysStore.putObject(effect); + await sysStore.appendEvent({ occurredAt: Date.now(), kind: 'effect', key: effectHash, @@ -722,7 +722,7 @@ export async function runPulseV2(options: { // Record tick event in system store const systemStore = scopedStore.scope('_system'); - systemStore.appendEvent({ + await systemStore.appendEvent({ occurredAt: Date.now(), kind: 'tick', meta: JSON.stringify({ @@ -762,7 +762,7 @@ export async function executorLoop(options: { while (!signal?.aborted) { try { // Find all pending effect events - const effectEvents = store.queryByKind('effect'); + const effectEvents = await store.queryByKind('effect'); for (const effectEvent of effectEvents) { if (signal?.aborted) break; @@ -771,23 +771,23 @@ export async function executorLoop(options: { const idStr = String(effectEvent.id); // Check if already acked or failed - const acked = store.getLatest('effect-acked', idStr); + const acked = await store.getLatest('effect-acked', idStr); if (acked) continue; - const failed = store.getLatest('effect-failed', idStr); + const failed = await store.getLatest('effect-failed', idStr); if (failed) continue; - const executing = store.getLatest('effect-executing', idStr); + const executing = await store.getLatest('effect-executing', idStr); if (executing) continue; // Retrieve the effect object from CAS if (!effectEvent.hash) continue; - const effectObj = store.getObject(effectEvent.hash) as E | null; + const effectObj = (await store.getObject(effectEvent.hash)) as E | null; if (effectObj === null) continue; // Mark as inflight inflight.add(effectEvent.id); // Write effect-executing event - store.appendEvent({ + await store.appendEvent({ occurredAt: Date.now(), kind: 'effect-executing', key: idStr, @@ -800,7 +800,7 @@ export async function executorLoop(options: { execute([effectObj]) .then(() => { // Write effect-acked event - store.appendEvent({ + return store.appendEvent({ occurredAt: Date.now(), kind: 'effect-acked', key: idStr, @@ -812,7 +812,7 @@ export async function executorLoop(options: { // Write effect-failed event const errorMessage = err instanceof Error ? err.message : String(err); - store.appendEvent({ + return store.appendEvent({ occurredAt: Date.now(), kind: 'effect-failed', key: idStr, diff --git a/packages/pulse/src/persona.ts b/packages/pulse/src/persona.ts index 38f4178..aff3184 100644 --- a/packages/pulse/src/persona.ts +++ b/packages/pulse/src/persona.ts @@ -12,11 +12,11 @@ import type { * - persona-updated only patches provided fields. * - Events are merged in occurredAt order. */ -export function buildPersonasFromEvents( +export async function buildPersonasFromEvents( store: PulseStore, -): Map { - const registered = store.queryByKind('persona-registered'); - const updated = store.queryByKind('persona-updated'); +): Promise> { + const registered = await store.queryByKind('persona-registered'); + const updated = await store.queryByKind('persona-updated'); const allEvents = [...registered, ...updated]; allEvents.sort((a, b) => a.occurredAt - b.occurredAt); diff --git a/packages/pulse/src/projection-engine.ts b/packages/pulse/src/projection-engine.ts index f23c4a5..9017846 100644 --- a/packages/pulse/src/projection-engine.ts +++ b/packages/pulse/src/projection-engine.ts @@ -161,10 +161,10 @@ function writeErrorEvent( /** * Get current projection state from the database. */ -export function getProjectionState( +export async function getProjectionState( scopeDb: Database, projectionName: string, -): ProjectionState | null { +): Promise { const stmt = getProjectionStateStmt(scopeDb); const row = stmt.get(projectionName) as any; @@ -190,7 +190,7 @@ export async function foldProjection( codeRev: string, ): Promise { // 1. Get projection definition - const def = getProjectionDef(scopeDb, projectionName, codeRev); + const def = await getProjectionDef(scopeDb, projectionName, codeRev); if (!def) { throw new Error( `Projection definition not found: ${projectionName}@${codeRev}`, @@ -198,7 +198,7 @@ export async function foldProjection( } // 2. Get current state from projections table - const currentState = getProjectionState(scopeDb, projectionName); + const currentState = await getProjectionState(scopeDb, projectionName); const currentValue = currentState?.value ?? def.initialValue; const lastEventId = currentState?.lastEventId ?? 0; @@ -319,7 +319,7 @@ export async function foldAllProjections( } const { listProjectionDefs } = await import('./defs.js'); - const projectionDefs = listProjectionDefs(scopeDb, { codeRev }); + const projectionDefs = await listProjectionDefs(scopeDb, { codeRev }); const results = new Map(); @@ -350,7 +350,7 @@ export async function resetProjections( // 2. Get all projection definitions for new code_rev const { listProjectionDefs } = await import('./defs.js'); - const projectionDefs = listProjectionDefs(scopeDb, { codeRev }); + const projectionDefs = await listProjectionDefs(scopeDb, { codeRev }); // 3. Replay all events for each projection const allEventsStmt = selectAllEventsStmt(scopeDb); diff --git a/packages/pulse/src/rules/agent-loop.ts b/packages/pulse/src/rules/agent-loop.ts index 20ebba0..22131ce 100644 --- a/packages/pulse/src/rules/agent-loop.ts +++ b/packages/pulse/src/rules/agent-loop.ts @@ -269,7 +269,7 @@ async function runProjectLoop( tool_choice: 'required', }); - opts.workflowStore.appendEvent({ + await opts.workflowStore.appendEvent({ kind: 'llm-call-started', meta: JSON.stringify({ projectId, taskCount: allTasks.length }), occurredAt: Date.now(), @@ -288,7 +288,7 @@ async function runProjectLoop( ), ]); } catch (err) { - opts.workflowStore.appendEvent({ + await opts.workflowStore.appendEvent({ kind: 'llm-call-failed', meta: JSON.stringify({ projectId, error: String(err) }), occurredAt: Date.now(), @@ -298,7 +298,7 @@ async function runProjectLoop( // 6. write trace event const durationMs = Date.now() - startTime; - opts.workflowStore.appendEvent({ + await opts.workflowStore.appendEvent({ kind: 'llm-call-completed', meta: JSON.stringify({ projectId, diff --git a/packages/pulse/src/rules/health.ts b/packages/pulse/src/rules/health.ts index d74911c..4a9cc53 100644 --- a/packages/pulse/src/rules/health.ts +++ b/packages/pulse/src/rules/health.ts @@ -19,12 +19,12 @@ export interface HealthSnapshot { * Rebuild health field from events table. * This function is in core package, agent cannot change. */ -export function rebuildHealth(store: PulseStore): HealthSnapshot { +export async function rebuildHealth(store: PulseStore): Promise { const now = Date.now(); const windowStart = now - 5 * 60 * 1000; // 5 minute window // Query recent events from events table - const recentEvents = store.queryByKind('effect', { since: windowStart }); + const recentEvents = await store.queryByKind('effect', { since: windowStart }); // Count restarts const lastRestart: Record = {}; @@ -46,10 +46,10 @@ export function rebuildHealth(store: PulseStore): HealthSnapshot { } // Count error events - const errorEvents = store.queryByKind('error', { since: windowStart }); + const errorEvents = await store.queryByKind('error', { since: windowStart }); // Find latest promote within window - const latestPromote = store.getLatest('promote'); + const latestPromote = await store.getLatest('promote'); let lastPromote: HealthSnapshot['lastPromote']; if (latestPromote && latestPromote.occurredAt > windowStart) { const meta = latestPromote.meta ? JSON.parse(latestPromote.meta) : {}; diff --git a/packages/pulse/src/store.test.ts b/packages/pulse/src/store.test.ts index 65e57a5..bb53f6e 100644 --- a/packages/pulse/src/store.test.ts +++ b/packages/pulse/src/store.test.ts @@ -38,9 +38,9 @@ describe('createStore (events table + CAS)', () => { // ── 2. appendEvent ──────────────────────────────────────────── - it('2. appendEvent returns EventRecord with number id', () => { + it('2. appendEvent returns EventRecord with number id', async () => { const store = createStore({ eventsDbPath, objectsDir }); - const result = store.appendEvent({ + const result = await store.appendEvent({ occurredAt: 1000, kind: 'tick', key: 'system', @@ -51,7 +51,7 @@ describe('createStore (events table + CAS)', () => { expect(result.occurredAt).toBe(1000); expect(result.kind).toBe('tick'); expect(result.key).toBe('system'); - store.close(); + await store.close(); }); // ── 3. ID 唯一性 ─────────────────────────────────────────── @@ -291,22 +291,22 @@ describe('createStore (events table + CAS)', () => { // ── 14. hasEvents ───────────────────────────────────────────── - it('14. hasEvents returns false on empty, true after insert', () => { + it('14. hasEvents returns false on empty, true after insert', async () => { const store = createStore({ eventsDbPath, objectsDir }); - expect(store.hasEvents()).toBe(false); + expect(await store.hasEvents()).toBe(false); - store.appendEvent({ occurredAt: 1000, kind: 'tick' }); - expect(store.hasEvents()).toBe(true); - store.close(); + await store.appendEvent({ occurredAt: 1000, kind: 'tick' }); + expect(await store.hasEvents()).toBe(true); + await store.close(); }); // ── 15. putObject + getObject (CAS) ─────────────────────────── - it('15. putObject writes and getObject reads back correctly', () => { + it('15. putObject writes and getObject reads back correctly', async () => { const store = createStore({ eventsDbPath, objectsDir }); const data = { cpu: 42, mem: 1024 }; - const hash = store.putObject(data); + const hash = await store.putObject(data); expect(typeof hash === 'string').toBeTruthy(); expect(hash.length).toBe(32); @@ -316,12 +316,12 @@ describe('createStore (events table + CAS)', () => { expect(existsSync(filePath)).toBeTruthy(); // Read back - const retrieved = store.getObject(hash); + const retrieved = await store.getObject(hash); expect(retrieved).toEqual(data); // Non-existent hash returns null - expect(store.getObject('0000000000000000')).toBe(null); - store.close(); + expect(await store.getObject('0000000000000000')).toBe(null); + await store.close(); }); // ── 16. CAS 去重 ────────────────────────────────────────────── diff --git a/packages/pulse/src/store.ts b/packages/pulse/src/store.ts index e43caf9..bd30652 100644 --- a/packages/pulse/src/store.ts +++ b/packages/pulse/src/store.ts @@ -42,36 +42,36 @@ export interface ObjectInstance { export interface PulseStore { /** Append one event (id is auto-incremented) */ - appendEvent(event: Omit): EventRecord; + appendEvent(event: Omit): Promise; /** Append multiple events in a transaction */ - appendEvents(events: Omit[]): EventRecord[]; + appendEvents(events: Omit[]): Promise; /** Create an immutable object instance. Returns the integer id. Idempotent on (objectType, externalId). */ createObject(opts: { objectType: string; externalId?: string; codeRev: string; - }): number; + }): Promise; /** Get an object instance by id. Returns null if not found. */ - getObjectInstance(id: number): ObjectInstance | null; + getObjectInstance(id: number): Promise; /** Query object instances by type. */ - queryObjectsByType(objectType: string): ObjectInstance[]; + queryObjectsByType(objectType: string): Promise; /** Get the latest event by kind + optional key */ - getLatest(kind: string, key?: string): EventRecord | null; + getLatest(kind: string, key?: string): Promise; /** Get latest event with additional filters */ getLatestWhere(opts: { kind: string; key?: string; codeRev?: string; - }): EventRecord | null; + }): Promise; /** Get recent events (newest first) */ - getRecent(limit?: number): EventRecord[]; + getRecent(limit?: number): Promise; /** Query events by kind with optional filters */ queryByKind( @@ -82,7 +82,7 @@ export interface PulseStore { codeRev?: string; limit?: number; }, - ): EventRecord[]; + ): Promise; /** Get all events after a specific event id */ getAfter( @@ -92,22 +92,22 @@ export interface PulseStore { key?: string; codeRev?: string; }, - ): EventRecord[]; + ): Promise; /** Check if any events exist */ - hasEvents(): boolean; + hasEvents(): Promise; /** Write data to CAS store. Returns hash. No-op if already exists. */ - putObject(data: unknown): string; + putObject(data: unknown): Promise; /** Read data from CAS store by hash. Returns null if not found. */ - getObject(hash: string): unknown | null; + getObject(hash: string): Promise; /** Close the database */ - close(): void; + close(): Promise; /** Delete events older than the given timestamp. Returns count of deleted rows. */ - archiveEvents(olderThan: number): number; + archiveEvents(olderThan: number): Promise; /** Downsample events of a specific kind+key: keep one per interval window. Returns count of deleted rows. */ downsampleEvents( @@ -115,7 +115,7 @@ export interface PulseStore { key: string, intervalMs: number, olderThan: number, - ): number; + ): Promise; } // ── CAS Hashing ──────────────────────────────────────────────── @@ -284,15 +284,15 @@ export function createStore(options: CreateStoreOptions): PulseStore { ); return { - appendEvent(event: Omit): EventRecord { + async appendEvent(event: Omit): Promise { return doAppendEvent(event); }, - appendEvents(events: Omit[]): EventRecord[] { + async appendEvents(events: Omit[]): Promise { return appendManyTx(events); }, - getLatest(kind: string, key?: string): EventRecord | null { + async getLatest(kind: string, key?: string): Promise { const row = selectLatest.get( kind, key ?? null, @@ -301,11 +301,11 @@ export function createStore(options: CreateStoreOptions): PulseStore { return row ? rowToRecord(row) : null; }, - getLatestWhere(opts: { + async getLatestWhere(opts: { kind: string; key?: string; codeRev?: string; - }): EventRecord | null { + }): Promise { const conditions: string[] = ['kind = ?']; const params: (string | number | null)[] = [opts.kind]; @@ -323,12 +323,12 @@ export function createStore(options: CreateStoreOptions): PulseStore { return row ? rowToRecord(row) : null; }, - getRecent(limit: number = 20): EventRecord[] { + async getRecent(limit: number = 20): Promise { const sql = `SELECT * FROM events ORDER BY occurred_at DESC, id DESC LIMIT ?`; return (eventsDb.prepare(sql).all(limit) as RawRow[]).map(rowToRecord); }, - queryByKind( + async queryByKind( kind: string, opts?: { key?: string; @@ -336,7 +336,7 @@ export function createStore(options: CreateStoreOptions): PulseStore { codeRev?: string; limit?: number; }, - ): EventRecord[] { + ): Promise { const conditions: string[] = ['kind = ?']; const params: (string | number | null)[] = [kind]; @@ -364,14 +364,14 @@ export function createStore(options: CreateStoreOptions): PulseStore { ); }, - getAfter( + async getAfter( afterId: number, opts?: { kind?: string; key?: string; codeRev?: string; }, - ): EventRecord[] { + ): Promise { const conditions: string[] = ['id > ?']; const params: (string | number | null)[] = [afterId]; @@ -394,15 +394,15 @@ export function createStore(options: CreateStoreOptions): PulseStore { ); }, - hasEvents(): boolean { + async hasEvents(): Promise { return selectHasEvents.get() !== null; }, - createObject(opts: { + async createObject(opts: { objectType: string; externalId?: string; codeRev: string; - }): number { + }): Promise { const now = Date.now(); const extId = opts.externalId ?? null; // Idempotent: if (objectType, externalId) already exists, return existing id @@ -422,14 +422,14 @@ export function createStore(options: CreateStoreOptions): PulseStore { return Number(result.lastInsertRowid); }, - getObjectInstance(id: number): ObjectInstance | null { + async getObjectInstance(id: number): Promise { const row = eventsDb .prepare('SELECT * FROM objects WHERE id = ?') .get(id) as RawObjectRow | null; return row ? rowToObjectInstance(row) : null; }, - queryObjectsByType(objectType: string): ObjectInstance[] { + async queryObjectsByType(objectType: string): Promise { return ( eventsDb .prepare('SELECT * FROM objects WHERE object_type = ?') @@ -437,7 +437,7 @@ export function createStore(options: CreateStoreOptions): PulseStore { ).map(rowToObjectInstance); }, - putObject(data: unknown): string { + async putObject(data: unknown): Promise { const hash = hashObject(data); const filePath = join(objectsDir, `${hash}.json`); if (!existsSync(filePath)) { @@ -447,29 +447,29 @@ export function createStore(options: CreateStoreOptions): PulseStore { return hash; }, - getObject(hash: string): unknown | null { + async getObject(hash: string): Promise { const filePath = join(objectsDir, `${hash}.json`); if (!existsSync(filePath)) return null; return JSON.parse(readFileSync(filePath, 'utf-8')); }, - close(): void { + async close(): Promise { eventsDb.close(); }, - archiveEvents(olderThan: number): number { + async archiveEvents(olderThan: number): Promise { const result = eventsDb .prepare('DELETE FROM events WHERE occurred_at < ?') .run(olderThan); return result.changes; }, - downsampleEvents( + async downsampleEvents( kind: string, key: string, intervalMs: number, olderThan: number, - ): number { + ): Promise { const safeInterval = Math.floor(Math.abs(intervalMs)); if (safeInterval <= 0) return 0; const stmt = eventsDb.prepare(` @@ -503,10 +503,10 @@ export interface ScopedStore { /** Get underlying Database for scope (used by projection engine) */ scopeDatabase(name: string): Database; - putObject(data: unknown): string; - getObject(hash: string): unknown | null; + putObject(data: unknown): Promise; + getObject(hash: string): Promise; - close(): void; + close(): Promise; } function validateScopeName(name: string): void { @@ -520,6 +520,7 @@ function validateScopeName(name: string): void { /** * Open (or create) a scope database at the given path. * Sets WAL mode and creates the events table and projections table. + * initDefsSchema is async in interface but synchronous in bun:sqlite — safe to call with void. */ function openScopeDb(path: string): Database { mkdirSync(dirname(path), { recursive: true }); @@ -533,8 +534,8 @@ function openScopeDb(path: string): Database { // Use canonical PROJECTIONS_SCHEMA (INTEGER last_event_id) db.exec(PROJECTIONS_SCHEMA); - // Each scope carries its own def tables - initDefsSchema(db); + // Each scope carries its own def tables (bun:sqlite is sync under async wrapper) + void initDefsSchema(db); return db; } @@ -591,15 +592,15 @@ function createScopeStore(db: Database, objectsDir: string): PulseStore { ); return { - appendEvent(event) { + async appendEvent(event) { return doAppendEvent(event); }, - appendEvents(events) { + async appendEvents(events) { return appendManyTx(events); }, - getLatest(kind, key?) { + async getLatest(kind, key?) { const row = selectLatest.get( kind, key ?? null, @@ -608,7 +609,7 @@ function createScopeStore(db: Database, objectsDir: string): PulseStore { return row ? rowToRecord(row) : null; }, - getLatestWhere(opts) { + async getLatestWhere(opts) { const conditions: string[] = ['kind = ?']; const params: (string | number | null)[] = [opts.kind]; if (opts.key !== undefined) { @@ -624,12 +625,12 @@ function createScopeStore(db: Database, objectsDir: string): PulseStore { return row ? rowToRecord(row) : null; }, - getRecent(limit = 20) { + async getRecent(limit = 20) { const sql = `SELECT * FROM events ORDER BY occurred_at DESC, id DESC LIMIT ?`; return (db.prepare(sql).all(limit) as RawRow[]).map(rowToRecord); }, - queryByKind(kind, opts?) { + async queryByKind(kind, opts?) { const conditions: string[] = ['kind = ?']; const params: (string | number | null)[] = [kind]; if (opts?.key !== undefined) { @@ -652,7 +653,7 @@ function createScopeStore(db: Database, objectsDir: string): PulseStore { return (db.prepare(sql).all(...params) as RawRow[]).map(rowToRecord); }, - getAfter(afterId: number, opts?) { + async getAfter(afterId: number, opts?) { const conditions: string[] = ['id > ?']; const params: (string | number | null)[] = [afterId]; if (opts?.kind !== undefined) { @@ -671,15 +672,15 @@ function createScopeStore(db: Database, objectsDir: string): PulseStore { return (db.prepare(sql).all(...params) as RawRow[]).map(rowToRecord); }, - hasEvents() { + async hasEvents() { return selectHasEvents.get() !== null; }, - createObject(opts: { + async createObject(opts: { objectType: string; externalId?: string; codeRev: string; - }): number { + }): Promise { const now = Date.now(); const extId = opts.externalId ?? null; if (extId !== null) { @@ -698,14 +699,14 @@ function createScopeStore(db: Database, objectsDir: string): PulseStore { return Number(result.lastInsertRowid); }, - getObjectInstance(id: number): ObjectInstance | null { + async getObjectInstance(id: number): Promise { const row = db .prepare('SELECT * FROM objects WHERE id = ?') .get(id) as RawObjectRow | null; return row ? rowToObjectInstance(row) : null; }, - queryObjectsByType(objectType: string): ObjectInstance[] { + async queryObjectsByType(objectType: string): Promise { return ( db .prepare('SELECT * FROM objects WHERE object_type = ?') @@ -713,7 +714,7 @@ function createScopeStore(db: Database, objectsDir: string): PulseStore { ).map(rowToObjectInstance); }, - putObject(data) { + async putObject(data) { const hash = hashObject(data); const filePath = join(objectsDir, `${hash}.json`); if (!existsSync(filePath)) { @@ -723,29 +724,29 @@ function createScopeStore(db: Database, objectsDir: string): PulseStore { return hash; }, - getObject(hash) { + async getObject(hash) { const filePath = join(objectsDir, `${hash}.json`); if (!existsSync(filePath)) return null; return JSON.parse(readFileSync(filePath, 'utf-8')); }, - close() { + async close() { db.close(); }, - archiveEvents(olderThan: number): number { + async archiveEvents(olderThan: number): Promise { const result = db .prepare('DELETE FROM events WHERE occurred_at < ?') .run(olderThan); return result.changes; }, - downsampleEvents( + async downsampleEvents( kind: string, key: string, intervalMs: number, olderThan: number, - ): number { + ): Promise { const safeInterval = Math.floor(Math.abs(intervalMs)); if (safeInterval <= 0) return 0; const stmt = db.prepare(` @@ -818,7 +819,7 @@ export function createScopedStore( .sort(); }, - putObject(data: unknown): string { + async putObject(data: unknown): Promise { const hash = hashObject(data); const filePath = join(objectsDir, `${hash}.json`); if (!existsSync(filePath)) { @@ -828,15 +829,15 @@ export function createScopedStore( return hash; }, - getObject(hash: string): unknown | null { + async getObject(hash: string): Promise { const filePath = join(objectsDir, `${hash}.json`); if (!existsSync(filePath)) return null; return JSON.parse(readFileSync(filePath, 'utf-8')); }, - close(): void { + async close(): Promise { for (const store of openStores.values()) { - store.close(); + await store.close(); } for (const db of openDatabases.values()) { db.close(); diff --git a/packages/pulse/src/watcher.ts b/packages/pulse/src/watcher.ts index 95111c2..4ab0acc 100644 --- a/packages/pulse/src/watcher.ts +++ b/packages/pulse/src/watcher.ts @@ -103,23 +103,25 @@ export function startWatcher( try { const data = await def.collect(); - const hash = store.putObject(data); + const hash = await store.putObject(data); - store.appendEvent({ + await store.appendEvent({ occurredAt: Date.now(), kind: 'vital', key: def.key, hash, }); - const window = store.queryByKind('vital', { + const window = await store.queryByKind('vital', { key: def.key, limit: 12, }); - const resolvedWindow: VitalWithData[] = window.map((v) => ({ - ...v, - data: v.hash ? store.getObject(v.hash) : null, - })); + const resolvedWindow: VitalWithData[] = await Promise.all( + window.map(async (v) => ({ + ...v, + data: v.hash ? await store.getObject(v.hash) : null, + })), + ); if (def.shouldWake(resolvedWindow)) { wakeTick(); @@ -132,11 +134,11 @@ export function startWatcher( } console.error(`[watcher:${def.name}] error during tick:`, err); try { - store.appendEvent({ + await store.appendEvent({ occurredAt: Date.now(), kind: 'vital', key: `_error:${def.key}`, - hash: store.putObject({ + hash: await store.putObject({ error: msg, watcher: def.name, }), diff --git a/packages/pulse/src/workflows/workflow-rule-adapter.ts b/packages/pulse/src/workflows/workflow-rule-adapter.ts index eee9935..6704a33 100644 --- a/packages/pulse/src/workflows/workflow-rule-adapter.ts +++ b/packages/pulse/src/workflows/workflow-rule-adapter.ts @@ -69,7 +69,7 @@ export function createWorkflowRule( // Incremental read: first tick reads all, subsequent ticks read only new events const afterId = checkpoint ? checkpoint.lastEventId : 0; - const newEvents = store.getAfter(afterId); + const newEvents = await store.getAfter(afterId); // Initialize or reuse checkpoint if (!checkpoint) { @@ -145,11 +145,12 @@ export function createWorkflowRule( // Build message chain from cached topic events const cachedEvents = checkpoint.topicEvents.get(action.topicId) ?? []; - const chain: WorkflowMessage[] = cachedEvents.map((e) => ({ + const chain: WorkflowMessage[] = await Promise.all( + cachedEvents.map(async (e) => ({ role: e.kind.slice(prefix.length), content: e.hash - ? (() => { - const obj = store.getObject(e.hash!); + ? await (async () => { + const obj = await store.getObject(e.hash!); if (typeof obj === 'string') return obj; if (obj && typeof obj === 'object' && 'content' in obj) return String((obj as any).content); @@ -166,11 +167,12 @@ export function createWorkflowRule( })() : null, timestamp: e.occurredAt, - })); + })), + ); // Log role-started if (logStore) { - logStore.appendEvent({ + await logStore.appendEvent({ occurredAt: Date.now(), kind: `${wf.name}.role-started`, key: action.topicId, @@ -187,8 +189,8 @@ export function createWorkflowRule( const result = await roleFn(chain, action.topicId, store); // Adapter writes CAS + event - const hash = store.putObject(result.content); - const written = store.appendEvent({ + const hash = await store.putObject(result.content); + const written = await store.appendEvent({ occurredAt: Date.now(), kind: `${wf.name}.${action.role}`, key: action.topicId, @@ -213,7 +215,7 @@ export function createWorkflowRule( } if (logStore) { - logStore.appendEvent({ + await logStore.appendEvent({ occurredAt: Date.now(), kind: `${wf.name}.role-completed`, key: action.topicId, @@ -232,7 +234,7 @@ export function createWorkflowRule( }); } catch (err) { if (logStore) { - logStore.appendEvent({ + await logStore.appendEvent({ occurredAt: Date.now(), kind: `${wf.name}.role-failed`, key: action.topicId,