refactor!: async PulseStore interface for multi-runtime support (refs #5)
CI / test (push) Has been cancelled

- PulseStore/ScopedStore 接口方法返回 Promise<T>
- 所有 store 实现方法改为 async(bun:sqlite 同步调用外包 async)
- defs.ts, projection-engine.ts, gc.ts 公共函数异步化
- index.ts 中 store 调用加 await
- rules/health.ts rebuildHealth 函数异步化
- 核心异步化完成,支持 CF D1 等异步数据库运行时

Breaking Changes:
- All PulseStore methods now return Promise<T>
- Consumer code must add await for all store operations
- Tests need to be updated to use async/await patterns
This commit is contained in:
2026-04-18 02:50:12 +00:00
parent 80eeac03c2
commit ca3dd52e38
20 changed files with 254 additions and 248 deletions
+3 -3
View File
@@ -22,8 +22,8 @@ const store = createStore({
});
const content = readFileSync(taskFile, 'utf-8');
const hash = store.putObject(content);
store.appendEvent({
const hash = await store.putObject(content);
await store.appendEvent({
occurredAt: Date.now(),
kind: `${workflow}.__start__`,
key: topicKey,
@@ -33,4 +33,4 @@ store.appendEvent({
console.log(`✅ Submitted: ${workflow}.__start__ [${topicKey}]`);
console.log(` Content: ${content.slice(0, 100)}...`);
console.log(` Hash: ${hash}`);
store.close();
await store.close();
+4 -4
View File
@@ -106,7 +106,7 @@ console.log('');
// ── Adaptive Tick Loop ─────────────────────────────────────────
let currentTickMs = BASE_TICK_MS;
let lastEventCount = store.getAfter(0).length;
let lastEventCount = (await store.getAfter(0)).length;
let timer: ReturnType<typeof setTimeout>;
const logTick = async () => {
@@ -116,7 +116,7 @@ const logTick = async () => {
const ts = new Date().toISOString().slice(11, 19);
// Check if new events appeared → activity
const nowCount = store.getAfter(0).length;
const nowCount = (await store.getAfter(0)).length;
if (nowCount > lastEventCount) {
currentTickMs = BASE_TICK_MS; // reset to fast
lastEventCount = nowCount;
@@ -147,8 +147,8 @@ scheduleNext();
const shutdown = (signal: string) => {
console.log(`\n⏹️ ${signal} received, shutting down...`);
clearTimeout(timer);
store.close();
logStore.close();
void store.close();
void logStore.close();
process.exit(0);
};
+15 -15
View File
@@ -103,7 +103,7 @@ const PROJECTION_DEF_SOURCES_SCHEMA = `
* Initialize the definition schema on an existing database connection.
* Use this when you manage the database lifecycle externally.
*/
export function initDefsSchema(db: Database): void {
export async function initDefsSchema(db: Database): Promise<void> {
db.exec(OBJECT_DEFS_SCHEMA);
db.exec(EVENT_DEFS_SCHEMA);
db.exec(PROJECTION_DEFS_SCHEMA);
@@ -159,13 +159,13 @@ const selectObjectDef = (db: Database) =>
WHERE name = ? AND code_rev = ?
`);
export function registerObjectDef(
export async function registerObjectDef(
db: Database,
opts: {
name: string;
codeRev: string;
},
): ObjectDef {
): Promise<ObjectDef> {
const createdAt = Date.now();
try {
@@ -186,11 +186,11 @@ export function registerObjectDef(
};
}
export function getObjectDef(
export async function getObjectDef(
db: Database,
name: string,
codeRev: string,
): ObjectDef | null {
): Promise<ObjectDef | null> {
const row = selectObjectDef(db).get(name, codeRev) as any;
if (!row) return null;
@@ -225,7 +225,7 @@ const selectEventDefsByCodeRev = (db: Database) =>
ORDER BY name
`);
export function registerEventDef(
export async function registerEventDef(
db: Database,
opts: {
name: string;
@@ -233,7 +233,7 @@ export function registerEventDef(
parentHash?: string;
codeRev: string;
},
): EventDef {
): Promise<EventDef> {
const hash = calculateEventHash(opts.name, opts.schema);
const createdAt = Date.now();
@@ -265,11 +265,11 @@ export function registerEventDef(
};
}
export function getEventDef(
export async function getEventDef(
db: Database,
name: string,
codeRev: string,
): EventDef | null {
): Promise<EventDef | null> {
const row = selectEventDefByNameCodeRev(db).get(name, codeRev) as any;
if (!row) return null;
@@ -284,10 +284,10 @@ export function getEventDef(
};
}
export function listEventDefs(
export async function listEventDefs(
db: Database,
opts: { codeRev: string },
): EventDef[] {
): Promise<EventDef[]> {
const rows = selectEventDefsByCodeRev(db).all(opts.codeRev) as any[];
return rows.map((row) => ({
@@ -429,11 +429,11 @@ export async function registerProjectionDef(
};
}
export function getProjectionDef(
export async function getProjectionDef(
db: Database,
name: string,
codeRev: string,
): ProjectionDef | null {
): Promise<ProjectionDef | null> {
const row = selectProjectionDefByNameCodeRev(db).get(name, codeRev) as any;
if (!row) return null;
@@ -458,10 +458,10 @@ export function getProjectionDef(
};
}
export function listProjectionDefs(
export async function listProjectionDefs(
db: Database,
opts: { codeRev: string },
): ProjectionDef[] {
): Promise<ProjectionDef[]> {
const rows = selectProjectionDefsByCodeRev(db).all(opts.codeRev) as any[];
return rows.map((row) => {
+6 -6
View File
@@ -90,8 +90,8 @@ async function main() {
];
for (const t of tasks) {
const hash = store.putObject(t.description);
store.appendEvent({
const hash = await store.putObject(t.description);
await store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.__start__',
key: t.topicId,
@@ -123,9 +123,9 @@ async function main() {
log('📊', 'Step 3: Event Timeline');
console.log();
const allEvents = store
.getAfter(0)
.filter((e) => e.kind.startsWith('coding.'));
const allEvents = (await store.getAfter(0)).filter((e) =>
e.kind.startsWith('coding.'),
);
for (const e of allEvents) {
const ts = new Date(e.occurredAt).toISOString().slice(11, 23);
const _role = e.kind.split('.')[1];
@@ -147,7 +147,7 @@ async function main() {
log('✅', 'Council v2 demo completed!');
console.log();
} finally {
store.close();
await store.close();
if (KEEP || useCustomDb) {
console.log(`📁 Events DB preserved: ${dbPath}`);
} else if (tmpDir) {
+4 -4
View File
@@ -144,10 +144,10 @@ async function main() {
const rule = createWorkflowRule(codingTask, store);
// Create a real task via coding.__start__ event
const hash = store.putObject(
const hash = await store.putObject(
'Create packages/pulse/COUNCIL-V2.md — a concise (30-50 lines) overview of the Council v2 model. Cover: WorkflowType (events, roles, moderator), START/END automaton, pure roles, Moore machine diff-driven ticks. Reference source files in workflows/ directory. Do NOT modify any existing files.',
);
store.appendEvent({
await store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.__start__',
key: 'add-v2-readme',
@@ -177,7 +177,7 @@ async function main() {
// Print event timeline
console.log(`\n📊 === Event Timeline ===`);
const events = store.getAfter(0);
const events = await store.getAfter(0);
const t0ev = events[0]?.occurredAt ?? 0;
for (const ev of events) {
const delta = ((ev.occurredAt - t0ev) / 1000).toFixed(1);
@@ -188,7 +188,7 @@ async function main() {
);
}
store.close();
await store.close();
console.log(`\n📁 DB: ${dbPath}`);
console.log(`✅ Done in ${((Date.now() - t0) / 1000).toFixed(1)}s\n`);
}
@@ -126,8 +126,8 @@ ${reviewerSrc}
- commit author: 小橘 <xiaoju@shazhou.work>
`;
const hash = store.putObject(taskDescription);
store.appendEvent({
const hash = await store.putObject(taskDescription);
await store.appendEvent({
occurredAt: Date.now(),
kind: 'meta.__start__',
key: 'optimize-coding',
@@ -142,7 +142,7 @@ ${reviewerSrc}
console.log('Spec:', r1.executed[0].content.slice(0, 500), '...\n');
} else {
console.log('❌ Architect failed');
store.close();
await store.close();
return;
}
@@ -157,7 +157,7 @@ ${reviewerSrc}
console.log('❌ Coder not triggered');
}
store.close();
await store.close();
console.log('\n🏁 Done');
}
+4 -4
View File
@@ -112,8 +112,8 @@ ${readSrc('packages/pulse/src/workflows/roles/reviewer-cursor.ts')}
- commit author: 小橘 <xiaoju@shazhou.work>
`;
const hash = store.putObject(taskDescription);
store.appendEvent({
const hash = await store.putObject(taskDescription);
await store.appendEvent({
occurredAt: Date.now(),
kind: 'meta.__start__',
key: 'optimize-coding',
@@ -138,7 +138,7 @@ ${readSrc('packages/pulse/src/workflows/roles/reviewer-cursor.ts')}
}
// Check if we hit END
const events = store.getAfter(0);
const events = await store.getAfter(0);
const lastEvent = events[events.length - 1];
if (lastEvent?.kind === 'meta.promoter') {
console.log('\n🎉 Workflow completed — promoter finished!');
@@ -146,7 +146,7 @@ ${readSrc('packages/pulse/src/workflows/roles/reviewer-cursor.ts')}
}
}
store.close();
await store.close();
console.log('\n🏁 Done');
}
@@ -97,8 +97,8 @@ ${architectSrc}
5. 可选:加 retry 上限(防止无限循环)
`;
const hash = store.putObject(taskDescription);
store.appendEvent({
const hash = await store.putObject(taskDescription);
await store.appendEvent({
occurredAt: Date.now(),
kind: 'meta.__start__',
key: 'optimize-coding',
@@ -121,7 +121,7 @@ ${architectSrc}
console.log('❌ No role executed');
}
store.close();
await store.close();
}
main().catch(console.error);
+4 -4
View File
@@ -199,8 +199,8 @@ ${readSrc('packages/pulse/src/workflows/workflow-type.ts')}
8. bun run build 通过
`;
const hash = store.putObject(taskDescription);
store.appendEvent({
const hash = await store.putObject(taskDescription);
await store.appendEvent({
occurredAt: Date.now(),
kind: 'meta.__start__',
key: 'tdd-coding-workflow',
@@ -223,7 +223,7 @@ ${readSrc('packages/pulse/src/workflows/workflow-type.ts')}
console.log(` meta: ${JSON.stringify(ex.meta)}`);
}
const events = store.getAfter(0);
const events = await store.getAfter(0);
const lastEvent = events[events.length - 1];
if (lastEvent?.kind === 'meta.promoter') {
console.log('\n🎉 Workflow completed — promoter finished!');
@@ -231,7 +231,7 @@ ${readSrc('packages/pulse/src/workflows/workflow-type.ts')}
}
}
store.close();
await store.close();
console.log('\n🏁 Done');
}
+19 -16
View File
@@ -52,28 +52,25 @@ const source = createStore({
objectsDir: sourceObjDir,
});
const events = source
.getAfter(0)
.filter((e) => e.kind.startsWith('coding.') && e.key === workflowKey);
const events = (await source.getAfter(0)).filter(
(e) => e.kind.startsWith('coding.') && e.key === workflowKey,
);
if (events.length === 0) {
console.error(`No events found for key: ${workflowKey}`);
source.close();
await source.close();
process.exit(1);
}
const tStart = events[0].occurredAt;
const timelineJson = JSON.stringify(
{
key: workflowKey,
totalMs: events[events.length - 1].occurredAt - tStart,
events: events.map((e, i) => {
const eventItems = await Promise.all(
events.map(async (e, i) => {
const role = e.kind.replace('coding.', '');
const meta = e.meta ? JSON.parse(e.meta) : null;
let content: string | null = null;
if (e.hash) {
try {
const obj = source.getObject(e.hash);
const obj = await source.getObject(e.hash);
content = typeof obj === 'string' ? obj : JSON.stringify(obj);
} catch {}
}
@@ -86,12 +83,18 @@ const timelineJson = JSON.stringify(
content,
};
}),
);
const timelineJson = JSON.stringify(
{
key: workflowKey,
totalMs: events[events.length - 1].occurredAt - tStart,
events: eventItems,
},
null,
2,
);
source.close();
await source.close();
console.log(
ts(),
`Timeline loaded: ${events.length} events, ${(JSON.parse(timelineJson).totalMs / 1000).toFixed(1)}s`,
@@ -119,8 +122,8 @@ const reportWorkflow = createReportWorkflow({
const rule = createWorkflowRule(reportWorkflow, reportStore);
// Seed with timeline JSON
const hash = reportStore.putObject(timelineJson);
reportStore.appendEvent({
const hash = await reportStore.putObject(timelineJson);
await reportStore.appendEvent({
occurredAt: Date.now(),
kind: 'report.__start__',
key: `report-${workflowKey}`,
@@ -146,10 +149,10 @@ while (tickNum < 5) {
}
// Extract HTML report
const allEvents = reportStore.getAfter(0);
const allEvents = await reportStore.getAfter(0);
const rendererEvt = allEvents.find((e) => e.kind === 'report.renderer');
if (rendererEvt?.hash) {
const html = reportStore.getObject(rendererEvt.hash) as string;
const html = await reportStore.getObject(rendererEvt.hash) as string;
const outPath = join(tmpDir, `report-${workflowKey}.html`);
writeFileSync(outPath, html, 'utf-8');
console.log(
@@ -168,5 +171,5 @@ if (analystEvt?.meta) {
);
}
reportStore.close();
await reportStore.close();
console.log(`\n✅ Done in ${((Date.now() - t0) / 1000).toFixed(1)}s`);
+18 -20
View File
@@ -63,10 +63,10 @@ export interface GcResult {
* Run GC on vitals store: downsample + archive.
* Returns stats about what was cleaned up.
*/
export function gcVitals(
export async function gcVitals(
vitalsStore: PulseStore,
config: GcConfig = DEFAULT_GC_CONFIG,
): { downsampledCount: number; archivedCount: number } {
): Promise<{ downsampledCount: number; archivedCount: number }> {
const now = Date.now();
let downsampledCount = 0;
let archivedCount = 0;
@@ -82,13 +82,13 @@ export function gcVitals(
if (tier.intervalMs === null) {
// Hard delete (archive)
archivedCount += vitalsStore.archiveEvents(olderThan);
archivedCount += await vitalsStore.archiveEvents(olderThan);
} else {
// Downsample: query distinct kind+key combos from vitals, then downsample each.
// We downsample all kinds present in vitals.
const kinds = getDistinctKindKeys(vitalsStore, olderThan);
const kinds = await getDistinctKindKeys(vitalsStore, olderThan);
for (const { kind, key } of kinds) {
downsampledCount += vitalsStore.downsampleEvents(
downsampledCount += await vitalsStore.downsampleEvents(
kind,
key,
tier.intervalMs,
@@ -107,10 +107,10 @@ export function gcVitals(
* Scans all events in the given stores for hash references,
* then compares against files in objectsDir.
*/
export function gcOrphanObjects(
export async function gcOrphanObjects(
stores: PulseStore[],
objectsDir: string,
): number {
): Promise<number> {
// Mark: collect all hashes referenced by events
const referencedHashes = new Set<string>();
@@ -129,7 +129,7 @@ export function gcOrphanObjects(
'init',
'gc',
]) {
const events = store.queryByKind(kind, {});
const events = await store.queryByKind(kind, {});
for (const event of events) {
if (event.hash) {
referencedHashes.add(event.hash);
@@ -167,22 +167,22 @@ export function gcOrphanObjects(
* Full GC cycle: vitals downsample/archive + CAS orphan sweep.
* Writes a gc event to systemStore for observability.
*/
export function runGc(options: {
export async function runGc(options: {
vitalsStore: PulseStore;
systemStore: PulseStore;
allStores: PulseStore[];
objectsDir: string;
config?: GcConfig;
}): GcResult {
}): Promise<GcResult> {
const config = options.config ?? DEFAULT_GC_CONFIG;
const start = Date.now();
const { downsampledCount, archivedCount } = gcVitals(
const { downsampledCount, archivedCount } = await gcVitals(
options.vitalsStore,
config,
);
const orphanObjectsCount = gcOrphanObjects(
const orphanObjectsCount = await gcOrphanObjects(
options.allStores,
options.objectsDir,
);
@@ -197,7 +197,7 @@ export function runGc(options: {
};
// Write GC stats event to _system scope
options.systemStore.appendEvent({
await options.systemStore.appendEvent({
occurredAt: Date.now(),
kind: 'gc',
key: 'vitals',
@@ -229,11 +229,9 @@ export function createGcTrigger(options: {
tickCount++;
if (tickCount >= config.tickInterval) {
tickCount = 0;
try {
runGc({ ...options, config });
} catch (err) {
runGc({ ...options, config }).catch((err) => {
console.error('[pulse gc]', err);
}
});
}
};
}
@@ -244,10 +242,10 @@ export function createGcTrigger(options: {
* Get distinct kind+key combos from a store for events older than a threshold.
* Used to know which series to downsample.
*/
function getDistinctKindKeys(
async function getDistinctKindKeys(
store: PulseStore,
olderThan: number,
): Array<{ kind: string; key: string }> {
): Promise<Array<{ kind: string; key: string }>> {
// queryByKind returns events filtered by kind.
// For vitals, the main kind is 'vital' with key = watcher name.
// We also handle 'collect' kind in case vitals store has those.
@@ -255,7 +253,7 @@ function getDistinctKindKeys(
const seen = new Set<string>();
for (const kind of ['vital', 'collect']) {
const events = store.queryByKind(kind, { limit: 1000 });
const events = await store.queryByKind(kind, { limit: 1000 });
for (const event of events) {
const pair = `${kind}:${event.key ?? ''}`;
if (!seen.has(pair) && event.occurredAt < olderThan) {
+39 -39
View File
@@ -162,8 +162,8 @@ export function composeRules<S, E>(
* If no rollback, use the latest promote event.
* If no promote at all, return null (cold start).
*/
export function findEffectiveEpoch(store: PulseStore): EventRecord | null {
const rollback = store.getLatest('rollback');
export async function findEffectiveEpoch(store: PulseStore): Promise<EventRecord | null> {
const rollback = await store.getLatest('rollback');
if (rollback) {
// rollback.meta should contain { to: 'v1' } — the code_rev to roll back to
let meta: Record<string, unknown> = {};
@@ -171,14 +171,14 @@ export function findEffectiveEpoch(store: PulseStore): EventRecord | null {
meta = rollback.meta ? JSON.parse(rollback.meta) : {};
} catch {
// Corrupted meta — skip this rollback event, fall through to latest promote
return store.getLatest('promote');
return await store.getLatest('promote');
}
const targetRev = (meta.to as string | undefined) || rollback.codeRev;
if (targetRev) {
return store.getLatestWhere({ kind: 'promote', codeRev: targetRev });
return await store.getLatestWhere({ kind: 'promote', codeRev: targetRev });
}
}
return store.getLatest('promote');
return await store.getLatest('promote');
}
// ── Snapshot Rebuild ───────────────────────────────────────────
@@ -196,12 +196,12 @@ export function findEffectiveEpoch(store: PulseStore): EventRecord | null {
* task projections (pending-tasks, agent-capability-stats) are folded from workflowStore.
* Falls back to options.systemStore for backward compatibility.
*/
export function rebuildSnapshot<S extends { timestamp: number }>(
export async function rebuildSnapshot<S extends { timestamp: number }>(
storeOrStores: PulseStore | { system: PulseStore; vitals: PulseStore },
senseKeys: string[],
epoch?: EventRecord | null,
options?: { systemStore?: PulseStore; workflowStore?: PulseStore },
): S {
): Promise<S> {
const isMultiStore =
typeof storeOrStores === 'object' &&
'system' in storeOrStores &&
@@ -218,9 +218,9 @@ export function rebuildSnapshot<S extends { timestamp: number }>(
for (const key of senseKeys) {
// Priority 1: read latest vital from vitals store (if provided)
if (vitalsStore) {
const latestVital = vitalsStore.getLatest('vital', key);
const latestVital = await vitalsStore.getLatest('vital', key);
if (latestVital?.hash) {
const data = vitalsStore.getObject(latestVital.hash);
const data = await vitalsStore.getObject(latestVital.hash);
if (data !== null) {
snapshot[key] = {
data,
@@ -234,7 +234,7 @@ export function rebuildSnapshot<S extends { timestamp: number }>(
// Priority 2: fallback to events table (migrate/init events)
if (epoch) {
const events = store.getAfter(epoch.id, {
const events = await store.getAfter(epoch.id, {
kind: 'collect',
key,
codeRev: epoch.codeRev ?? undefined,
@@ -242,7 +242,7 @@ export function rebuildSnapshot<S extends { timestamp: number }>(
const latestCollect =
events.length > 0 ? events[events.length - 1]! : null;
if (latestCollect?.hash) {
const data = store.getObject(latestCollect.hash);
const data = await store.getObject(latestCollect.hash);
if (data !== null) {
snapshot[key] = {
data,
@@ -253,7 +253,7 @@ export function rebuildSnapshot<S extends { timestamp: number }>(
}
// Check migrate events
const migrateEvents = store.getAfter(epoch.id, {
const migrateEvents = await store.getAfter(epoch.id, {
kind: 'migrate',
key,
codeRev: epoch.codeRev ?? undefined,
@@ -261,7 +261,7 @@ export function rebuildSnapshot<S extends { timestamp: number }>(
if (migrateEvents.length > 0) {
const latestMigrate = migrateEvents[migrateEvents.length - 1]!;
if (latestMigrate.hash) {
const data = store.getObject(latestMigrate.hash);
const data = await store.getObject(latestMigrate.hash);
if (data !== null) {
snapshot[key] = {
data,
@@ -273,7 +273,7 @@ export function rebuildSnapshot<S extends { timestamp: number }>(
}
// Check init events
const initEvents = store.getAfter(epoch.id, {
const initEvents = await store.getAfter(epoch.id, {
kind: 'init',
key,
codeRev: epoch.codeRev ?? undefined,
@@ -281,7 +281,7 @@ export function rebuildSnapshot<S extends { timestamp: number }>(
if (initEvents.length > 0) {
const latestInit = initEvents[initEvents.length - 1]!;
if (latestInit.hash) {
const data = store.getObject(latestInit.hash);
const data = await store.getObject(latestInit.hash);
if (data !== null) {
snapshot[key] = {
data,
@@ -292,9 +292,9 @@ export function rebuildSnapshot<S extends { timestamp: number }>(
}
} else {
// No epoch — try latest collect from events table directly
const latest = store.getLatest('collect', key);
const latest = await store.getLatest('collect', key);
if (latest?.hash) {
const data = store.getObject(latest.hash);
const data = await store.getObject(latest.hash);
if (data !== null) {
snapshot[key] = {
data,
@@ -319,10 +319,10 @@ export function rebuildSnapshot<S extends { timestamp: number }>(
* Read each declared projection's current value, using "scope/name" as key.
* If projection doesn't exist, value is null (graceful degradation).
*/
export function buildSnapshotFromProjections<S extends { timestamp: number }>(
export async function buildSnapshotFromProjections<S extends { timestamp: number }>(
scopedStore: ScopedStore,
projectionPaths: string[], // ["_vitals/cpu_usage", "neko/session_count"]
): S {
): Promise<S> {
const snapshot: Record<string, unknown> = { timestamp: Date.now() };
for (const projectionPath of projectionPaths) {
@@ -341,7 +341,7 @@ export function buildSnapshotFromProjections<S extends { timestamp: number }>(
const scopeDb = scopedStore.scopeDatabase(scopeName);
// Get projection state
const projectionState = getProjectionState(scopeDb, projectionName);
const projectionState = await getProjectionState(scopeDb, projectionName);
if (projectionState) {
snapshot[projectionPath] = projectionState.value;
@@ -447,9 +447,9 @@ export async function runPulse<S extends { timestamp: number }, E>(options: {
const pulse = composeRules(rules, defaultTickMs);
// Determine version epoch (always from system store)
const epoch = findEffectiveEpoch(systemStore);
const epoch = await findEffectiveEpoch(systemStore);
let prev = rebuildSnapshot<S>(
let prev = await rebuildSnapshot<S>(
{ system: systemStore, vitals: vitalsStore },
senseKeys,
epoch,
@@ -510,7 +510,7 @@ export async function runPulse<S extends { timestamp: number }, E>(options: {
ticking = true;
pendingWake = false;
const curr = rebuildSnapshot<S>(
const curr = await rebuildSnapshot<S>(
{ system: systemStore, vitals: vitalsStore },
senseKeys,
epoch,
@@ -522,8 +522,8 @@ export async function runPulse<S extends { timestamp: number }, E>(options: {
// Write effect events to store (fire-and-forget — executorLoop picks them up)
if (effects.length > 0) {
for (const effect of effects) {
const effectHash = systemStore.putObject(effect);
systemStore.appendEvent({
const effectHash = await systemStore.putObject(effect);
await systemStore.appendEvent({
occurredAt: Date.now(),
kind: 'effect',
key: effectHash,
@@ -564,7 +564,7 @@ export async function runPulse<S extends { timestamp: number }, E>(options: {
}
// Record tick event with the actual tickMs that will be used for next iteration
systemStore.appendEvent({
await systemStore.appendEvent({
occurredAt: Date.now(),
kind: 'tick',
meta: JSON.stringify({
@@ -629,7 +629,7 @@ export async function runPulseV2<S extends { timestamp: number }, E>(options: {
const pulse = composeRules(rules, defaultTickMs);
// Build initial snapshot
let prev = buildSnapshotFromProjections<S>(scopedStore, projectionPaths);
let prev = await buildSnapshotFromProjections<S>(scopedStore, projectionPaths);
let tickMs = defaultTickMs;
// ── Wake mechanism ─────────────────────────────────────────────
@@ -698,7 +698,7 @@ export async function runPulseV2<S extends { timestamp: number }, E>(options: {
}
// Build current snapshot from projections
const curr = buildSnapshotFromProjections<S>(scopedStore, projectionPaths);
const curr = await buildSnapshotFromProjections<S>(scopedStore, projectionPaths);
const tickStart = Date.now();
const [effects, nextTickMs] = await pulse(prev, curr);
@@ -706,8 +706,8 @@ export async function runPulseV2<S extends { timestamp: number }, E>(options: {
if (effects.length > 0) {
const sysStore = scopedStore.scope('_system');
for (const effect of effects) {
const effectHash = sysStore.putObject(effect);
sysStore.appendEvent({
const effectHash = await sysStore.putObject(effect);
await sysStore.appendEvent({
occurredAt: Date.now(),
kind: 'effect',
key: effectHash,
@@ -722,7 +722,7 @@ export async function runPulseV2<S extends { timestamp: number }, E>(options: {
// Record tick event in system store
const systemStore = scopedStore.scope('_system');
systemStore.appendEvent({
await systemStore.appendEvent({
occurredAt: Date.now(),
kind: 'tick',
meta: JSON.stringify({
@@ -762,7 +762,7 @@ export async function executorLoop<E>(options: {
while (!signal?.aborted) {
try {
// Find all pending effect events
const effectEvents = store.queryByKind('effect');
const effectEvents = await store.queryByKind('effect');
for (const effectEvent of effectEvents) {
if (signal?.aborted) break;
@@ -771,23 +771,23 @@ export async function executorLoop<E>(options: {
const idStr = String(effectEvent.id);
// Check if already acked or failed
const acked = store.getLatest('effect-acked', idStr);
const acked = await store.getLatest('effect-acked', idStr);
if (acked) continue;
const failed = store.getLatest('effect-failed', idStr);
const failed = await store.getLatest('effect-failed', idStr);
if (failed) continue;
const executing = store.getLatest('effect-executing', idStr);
const executing = await store.getLatest('effect-executing', idStr);
if (executing) continue;
// Retrieve the effect object from CAS
if (!effectEvent.hash) continue;
const effectObj = store.getObject(effectEvent.hash) as E | null;
const effectObj = (await store.getObject(effectEvent.hash)) as E | null;
if (effectObj === null) continue;
// Mark as inflight
inflight.add(effectEvent.id);
// Write effect-executing event
store.appendEvent({
await store.appendEvent({
occurredAt: Date.now(),
kind: 'effect-executing',
key: idStr,
@@ -800,7 +800,7 @@ export async function executorLoop<E>(options: {
execute([effectObj])
.then(() => {
// Write effect-acked event
store.appendEvent({
return store.appendEvent({
occurredAt: Date.now(),
kind: 'effect-acked',
key: idStr,
@@ -812,7 +812,7 @@ export async function executorLoop<E>(options: {
// Write effect-failed event
const errorMessage =
err instanceof Error ? err.message : String(err);
store.appendEvent({
return store.appendEvent({
occurredAt: Date.now(),
kind: 'effect-failed',
key: idStr,
+4 -4
View File
@@ -12,11 +12,11 @@ import type {
* - persona-updated only patches provided fields.
* - Events are merged in occurredAt order.
*/
export function buildPersonasFromEvents(
export async function buildPersonasFromEvents(
store: PulseStore,
): Map<string, PersonaState> {
const registered = store.queryByKind('persona-registered');
const updated = store.queryByKind('persona-updated');
): Promise<Map<string, PersonaState>> {
const registered = await store.queryByKind('persona-registered');
const updated = await store.queryByKind('persona-updated');
const allEvents = [...registered, ...updated];
allEvents.sort((a, b) => a.occurredAt - b.occurredAt);
+6 -6
View File
@@ -161,10 +161,10 @@ function writeErrorEvent(
/**
* Get current projection state from the database.
*/
export function getProjectionState(
export async function getProjectionState(
scopeDb: Database,
projectionName: string,
): ProjectionState | null {
): Promise<ProjectionState | null> {
const stmt = getProjectionStateStmt(scopeDb);
const row = stmt.get(projectionName) as any;
@@ -190,7 +190,7 @@ export async function foldProjection(
codeRev: string,
): Promise<ProjectionState> {
// 1. Get projection definition
const def = getProjectionDef(scopeDb, projectionName, codeRev);
const def = await getProjectionDef(scopeDb, projectionName, codeRev);
if (!def) {
throw new Error(
`Projection definition not found: ${projectionName}@${codeRev}`,
@@ -198,7 +198,7 @@ export async function foldProjection(
}
// 2. Get current state from projections table
const currentState = getProjectionState(scopeDb, projectionName);
const currentState = await getProjectionState(scopeDb, projectionName);
const currentValue = currentState?.value ?? def.initialValue;
const lastEventId = currentState?.lastEventId ?? 0;
@@ -319,7 +319,7 @@ export async function foldAllProjections(
}
const { listProjectionDefs } = await import('./defs.js');
const projectionDefs = listProjectionDefs(scopeDb, { codeRev });
const projectionDefs = await listProjectionDefs(scopeDb, { codeRev });
const results = new Map<string, ProjectionState>();
@@ -350,7 +350,7 @@ export async function resetProjections(
// 2. Get all projection definitions for new code_rev
const { listProjectionDefs } = await import('./defs.js');
const projectionDefs = listProjectionDefs(scopeDb, { codeRev });
const projectionDefs = await listProjectionDefs(scopeDb, { codeRev });
// 3. Replay all events for each projection
const allEventsStmt = selectAllEventsStmt(scopeDb);
+3 -3
View File
@@ -269,7 +269,7 @@ async function runProjectLoop(
tool_choice: 'required',
});
opts.workflowStore.appendEvent({
await opts.workflowStore.appendEvent({
kind: 'llm-call-started',
meta: JSON.stringify({ projectId, taskCount: allTasks.length }),
occurredAt: Date.now(),
@@ -288,7 +288,7 @@ async function runProjectLoop(
),
]);
} catch (err) {
opts.workflowStore.appendEvent({
await opts.workflowStore.appendEvent({
kind: 'llm-call-failed',
meta: JSON.stringify({ projectId, error: String(err) }),
occurredAt: Date.now(),
@@ -298,7 +298,7 @@ async function runProjectLoop(
// 6. write trace event
const durationMs = Date.now() - startTime;
opts.workflowStore.appendEvent({
await opts.workflowStore.appendEvent({
kind: 'llm-call-completed',
meta: JSON.stringify({
projectId,
+4 -4
View File
@@ -19,12 +19,12 @@ export interface HealthSnapshot {
* Rebuild health field from events table.
* This function is in core package, agent cannot change.
*/
export function rebuildHealth(store: PulseStore): HealthSnapshot {
export async function rebuildHealth(store: PulseStore): Promise<HealthSnapshot> {
const now = Date.now();
const windowStart = now - 5 * 60 * 1000; // 5 minute window
// Query recent events from events table
const recentEvents = store.queryByKind('effect', { since: windowStart });
const recentEvents = await store.queryByKind('effect', { since: windowStart });
// Count restarts
const lastRestart: Record<string, { ts: number; count: number }> = {};
@@ -46,10 +46,10 @@ export function rebuildHealth(store: PulseStore): HealthSnapshot {
}
// Count error events
const errorEvents = store.queryByKind('error', { since: windowStart });
const errorEvents = await store.queryByKind('error', { since: windowStart });
// Find latest promote within window
const latestPromote = store.getLatest('promote');
const latestPromote = await store.getLatest('promote');
let lastPromote: HealthSnapshot['lastPromote'];
if (latestPromote && latestPromote.occurredAt > windowStart) {
const meta = latestPromote.meta ? JSON.parse(latestPromote.meta) : {};
+13 -13
View File
@@ -38,9 +38,9 @@ describe('createStore (events table + CAS)', () => {
// ── 2. appendEvent ────────────────────────────────────────────
it('2. appendEvent returns EventRecord with number id', () => {
it('2. appendEvent returns EventRecord with number id', async () => {
const store = createStore({ eventsDbPath, objectsDir });
const result = store.appendEvent({
const result = await store.appendEvent({
occurredAt: 1000,
kind: 'tick',
key: 'system',
@@ -51,7 +51,7 @@ describe('createStore (events table + CAS)', () => {
expect(result.occurredAt).toBe(1000);
expect(result.kind).toBe('tick');
expect(result.key).toBe('system');
store.close();
await store.close();
});
// ── 3. ID 唯一性 ───────────────────────────────────────────
@@ -291,22 +291,22 @@ describe('createStore (events table + CAS)', () => {
// ── 14. hasEvents ─────────────────────────────────────────────
it('14. hasEvents returns false on empty, true after insert', () => {
it('14. hasEvents returns false on empty, true after insert', async () => {
const store = createStore({ eventsDbPath, objectsDir });
expect(store.hasEvents()).toBe(false);
expect(await store.hasEvents()).toBe(false);
store.appendEvent({ occurredAt: 1000, kind: 'tick' });
expect(store.hasEvents()).toBe(true);
store.close();
await store.appendEvent({ occurredAt: 1000, kind: 'tick' });
expect(await store.hasEvents()).toBe(true);
await store.close();
});
// ── 15. putObject + getObject (CAS) ───────────────────────────
it('15. putObject writes and getObject reads back correctly', () => {
it('15. putObject writes and getObject reads back correctly', async () => {
const store = createStore({ eventsDbPath, objectsDir });
const data = { cpu: 42, mem: 1024 };
const hash = store.putObject(data);
const hash = await store.putObject(data);
expect(typeof hash === 'string').toBeTruthy();
expect(hash.length).toBe(32);
@@ -316,12 +316,12 @@ describe('createStore (events table + CAS)', () => {
expect(existsSync(filePath)).toBeTruthy();
// Read back
const retrieved = store.getObject(hash);
const retrieved = await store.getObject(hash);
expect(retrieved).toEqual(data);
// Non-existent hash returns null
expect(store.getObject('0000000000000000')).toBe(null);
store.close();
expect(await store.getObject('0000000000000000')).toBe(null);
await store.close();
});
// ── 16. CAS 去重 ──────────────────────────────────────────────
+65 -64
View File
@@ -42,36 +42,36 @@ export interface ObjectInstance {
export interface PulseStore {
/** Append one event (id is auto-incremented) */
appendEvent(event: Omit<EventRecord, 'id'>): EventRecord;
appendEvent(event: Omit<EventRecord, 'id'>): Promise<EventRecord>;
/** Append multiple events in a transaction */
appendEvents(events: Omit<EventRecord, 'id'>[]): EventRecord[];
appendEvents(events: Omit<EventRecord, 'id'>[]): Promise<EventRecord[]>;
/** Create an immutable object instance. Returns the integer id. Idempotent on (objectType, externalId). */
createObject(opts: {
objectType: string;
externalId?: string;
codeRev: string;
}): number;
}): Promise<number>;
/** Get an object instance by id. Returns null if not found. */
getObjectInstance(id: number): ObjectInstance | null;
getObjectInstance(id: number): Promise<ObjectInstance | null>;
/** Query object instances by type. */
queryObjectsByType(objectType: string): ObjectInstance[];
queryObjectsByType(objectType: string): Promise<ObjectInstance[]>;
/** Get the latest event by kind + optional key */
getLatest(kind: string, key?: string): EventRecord | null;
getLatest(kind: string, key?: string): Promise<EventRecord | null>;
/** Get latest event with additional filters */
getLatestWhere(opts: {
kind: string;
key?: string;
codeRev?: string;
}): EventRecord | null;
}): Promise<EventRecord | null>;
/** Get recent events (newest first) */
getRecent(limit?: number): EventRecord[];
getRecent(limit?: number): Promise<EventRecord[]>;
/** Query events by kind with optional filters */
queryByKind(
@@ -82,7 +82,7 @@ export interface PulseStore {
codeRev?: string;
limit?: number;
},
): EventRecord[];
): Promise<EventRecord[]>;
/** Get all events after a specific event id */
getAfter(
@@ -92,22 +92,22 @@ export interface PulseStore {
key?: string;
codeRev?: string;
},
): EventRecord[];
): Promise<EventRecord[]>;
/** Check if any events exist */
hasEvents(): boolean;
hasEvents(): Promise<boolean>;
/** Write data to CAS store. Returns hash. No-op if already exists. */
putObject(data: unknown): string;
putObject(data: unknown): Promise<string>;
/** Read data from CAS store by hash. Returns null if not found. */
getObject(hash: string): unknown | null;
getObject(hash: string): Promise<unknown | null>;
/** Close the database */
close(): void;
close(): Promise<void>;
/** Delete events older than the given timestamp. Returns count of deleted rows. */
archiveEvents(olderThan: number): number;
archiveEvents(olderThan: number): Promise<number>;
/** Downsample events of a specific kind+key: keep one per interval window. Returns count of deleted rows. */
downsampleEvents(
@@ -115,7 +115,7 @@ export interface PulseStore {
key: string,
intervalMs: number,
olderThan: number,
): number;
): Promise<number>;
}
// ── CAS Hashing ────────────────────────────────────────────────
@@ -284,15 +284,15 @@ export function createStore(options: CreateStoreOptions): PulseStore {
);
return {
appendEvent(event: Omit<EventRecord, 'id'>): EventRecord {
async appendEvent(event: Omit<EventRecord, 'id'>): Promise<EventRecord> {
return doAppendEvent(event);
},
appendEvents(events: Omit<EventRecord, 'id'>[]): EventRecord[] {
async appendEvents(events: Omit<EventRecord, 'id'>[]): Promise<EventRecord[]> {
return appendManyTx(events);
},
getLatest(kind: string, key?: string): EventRecord | null {
async getLatest(kind: string, key?: string): Promise<EventRecord | null> {
const row = selectLatest.get(
kind,
key ?? null,
@@ -301,11 +301,11 @@ export function createStore(options: CreateStoreOptions): PulseStore {
return row ? rowToRecord(row) : null;
},
getLatestWhere(opts: {
async getLatestWhere(opts: {
kind: string;
key?: string;
codeRev?: string;
}): EventRecord | null {
}): Promise<EventRecord | null> {
const conditions: string[] = ['kind = ?'];
const params: (string | number | null)[] = [opts.kind];
@@ -323,12 +323,12 @@ export function createStore(options: CreateStoreOptions): PulseStore {
return row ? rowToRecord(row) : null;
},
getRecent(limit: number = 20): EventRecord[] {
async getRecent(limit: number = 20): Promise<EventRecord[]> {
const sql = `SELECT * FROM events ORDER BY occurred_at DESC, id DESC LIMIT ?`;
return (eventsDb.prepare(sql).all(limit) as RawRow[]).map(rowToRecord);
},
queryByKind(
async queryByKind(
kind: string,
opts?: {
key?: string;
@@ -336,7 +336,7 @@ export function createStore(options: CreateStoreOptions): PulseStore {
codeRev?: string;
limit?: number;
},
): EventRecord[] {
): Promise<EventRecord[]> {
const conditions: string[] = ['kind = ?'];
const params: (string | number | null)[] = [kind];
@@ -364,14 +364,14 @@ export function createStore(options: CreateStoreOptions): PulseStore {
);
},
getAfter(
async getAfter(
afterId: number,
opts?: {
kind?: string;
key?: string;
codeRev?: string;
},
): EventRecord[] {
): Promise<EventRecord[]> {
const conditions: string[] = ['id > ?'];
const params: (string | number | null)[] = [afterId];
@@ -394,15 +394,15 @@ export function createStore(options: CreateStoreOptions): PulseStore {
);
},
hasEvents(): boolean {
async hasEvents(): Promise<boolean> {
return selectHasEvents.get() !== null;
},
createObject(opts: {
async createObject(opts: {
objectType: string;
externalId?: string;
codeRev: string;
}): number {
}): Promise<number> {
const now = Date.now();
const extId = opts.externalId ?? null;
// Idempotent: if (objectType, externalId) already exists, return existing id
@@ -422,14 +422,14 @@ export function createStore(options: CreateStoreOptions): PulseStore {
return Number(result.lastInsertRowid);
},
getObjectInstance(id: number): ObjectInstance | null {
async getObjectInstance(id: number): Promise<ObjectInstance | null> {
const row = eventsDb
.prepare('SELECT * FROM objects WHERE id = ?')
.get(id) as RawObjectRow | null;
return row ? rowToObjectInstance(row) : null;
},
queryObjectsByType(objectType: string): ObjectInstance[] {
async queryObjectsByType(objectType: string): Promise<ObjectInstance[]> {
return (
eventsDb
.prepare('SELECT * FROM objects WHERE object_type = ?')
@@ -437,7 +437,7 @@ export function createStore(options: CreateStoreOptions): PulseStore {
).map(rowToObjectInstance);
},
putObject(data: unknown): string {
async putObject(data: unknown): Promise<string> {
const hash = hashObject(data);
const filePath = join(objectsDir, `${hash}.json`);
if (!existsSync(filePath)) {
@@ -447,29 +447,29 @@ export function createStore(options: CreateStoreOptions): PulseStore {
return hash;
},
getObject(hash: string): unknown | null {
async getObject(hash: string): Promise<unknown | null> {
const filePath = join(objectsDir, `${hash}.json`);
if (!existsSync(filePath)) return null;
return JSON.parse(readFileSync(filePath, 'utf-8'));
},
close(): void {
async close(): Promise<void> {
eventsDb.close();
},
archiveEvents(olderThan: number): number {
async archiveEvents(olderThan: number): Promise<number> {
const result = eventsDb
.prepare('DELETE FROM events WHERE occurred_at < ?')
.run(olderThan);
return result.changes;
},
downsampleEvents(
async downsampleEvents(
kind: string,
key: string,
intervalMs: number,
olderThan: number,
): number {
): Promise<number> {
const safeInterval = Math.floor(Math.abs(intervalMs));
if (safeInterval <= 0) return 0;
const stmt = eventsDb.prepare(`
@@ -503,10 +503,10 @@ export interface ScopedStore {
/** Get underlying Database for scope (used by projection engine) */
scopeDatabase(name: string): Database;
putObject(data: unknown): string;
getObject(hash: string): unknown | null;
putObject(data: unknown): Promise<string>;
getObject(hash: string): Promise<unknown | null>;
close(): void;
close(): Promise<void>;
}
function validateScopeName(name: string): void {
@@ -520,6 +520,7 @@ function validateScopeName(name: string): void {
/**
* Open (or create) a scope database at the given path.
* Sets WAL mode and creates the events table and projections table.
* initDefsSchema is async in interface but synchronous in bun:sqlite — safe to call with void.
*/
function openScopeDb(path: string): Database {
mkdirSync(dirname(path), { recursive: true });
@@ -533,8 +534,8 @@ function openScopeDb(path: string): Database {
// Use canonical PROJECTIONS_SCHEMA (INTEGER last_event_id)
db.exec(PROJECTIONS_SCHEMA);
// Each scope carries its own def tables
initDefsSchema(db);
// Each scope carries its own def tables (bun:sqlite is sync under async wrapper)
void initDefsSchema(db);
return db;
}
@@ -591,15 +592,15 @@ function createScopeStore(db: Database, objectsDir: string): PulseStore {
);
return {
appendEvent(event) {
async appendEvent(event) {
return doAppendEvent(event);
},
appendEvents(events) {
async appendEvents(events) {
return appendManyTx(events);
},
getLatest(kind, key?) {
async getLatest(kind, key?) {
const row = selectLatest.get(
kind,
key ?? null,
@@ -608,7 +609,7 @@ function createScopeStore(db: Database, objectsDir: string): PulseStore {
return row ? rowToRecord(row) : null;
},
getLatestWhere(opts) {
async getLatestWhere(opts) {
const conditions: string[] = ['kind = ?'];
const params: (string | number | null)[] = [opts.kind];
if (opts.key !== undefined) {
@@ -624,12 +625,12 @@ function createScopeStore(db: Database, objectsDir: string): PulseStore {
return row ? rowToRecord(row) : null;
},
getRecent(limit = 20) {
async getRecent(limit = 20) {
const sql = `SELECT * FROM events ORDER BY occurred_at DESC, id DESC LIMIT ?`;
return (db.prepare(sql).all(limit) as RawRow[]).map(rowToRecord);
},
queryByKind(kind, opts?) {
async queryByKind(kind, opts?) {
const conditions: string[] = ['kind = ?'];
const params: (string | number | null)[] = [kind];
if (opts?.key !== undefined) {
@@ -652,7 +653,7 @@ function createScopeStore(db: Database, objectsDir: string): PulseStore {
return (db.prepare(sql).all(...params) as RawRow[]).map(rowToRecord);
},
getAfter(afterId: number, opts?) {
async getAfter(afterId: number, opts?) {
const conditions: string[] = ['id > ?'];
const params: (string | number | null)[] = [afterId];
if (opts?.kind !== undefined) {
@@ -671,15 +672,15 @@ function createScopeStore(db: Database, objectsDir: string): PulseStore {
return (db.prepare(sql).all(...params) as RawRow[]).map(rowToRecord);
},
hasEvents() {
async hasEvents() {
return selectHasEvents.get() !== null;
},
createObject(opts: {
async createObject(opts: {
objectType: string;
externalId?: string;
codeRev: string;
}): number {
}): Promise<number> {
const now = Date.now();
const extId = opts.externalId ?? null;
if (extId !== null) {
@@ -698,14 +699,14 @@ function createScopeStore(db: Database, objectsDir: string): PulseStore {
return Number(result.lastInsertRowid);
},
getObjectInstance(id: number): ObjectInstance | null {
async getObjectInstance(id: number): Promise<ObjectInstance | null> {
const row = db
.prepare('SELECT * FROM objects WHERE id = ?')
.get(id) as RawObjectRow | null;
return row ? rowToObjectInstance(row) : null;
},
queryObjectsByType(objectType: string): ObjectInstance[] {
async queryObjectsByType(objectType: string): Promise<ObjectInstance[]> {
return (
db
.prepare('SELECT * FROM objects WHERE object_type = ?')
@@ -713,7 +714,7 @@ function createScopeStore(db: Database, objectsDir: string): PulseStore {
).map(rowToObjectInstance);
},
putObject(data) {
async putObject(data) {
const hash = hashObject(data);
const filePath = join(objectsDir, `${hash}.json`);
if (!existsSync(filePath)) {
@@ -723,29 +724,29 @@ function createScopeStore(db: Database, objectsDir: string): PulseStore {
return hash;
},
getObject(hash) {
async getObject(hash) {
const filePath = join(objectsDir, `${hash}.json`);
if (!existsSync(filePath)) return null;
return JSON.parse(readFileSync(filePath, 'utf-8'));
},
close() {
async close() {
db.close();
},
archiveEvents(olderThan: number): number {
async archiveEvents(olderThan: number): Promise<number> {
const result = db
.prepare('DELETE FROM events WHERE occurred_at < ?')
.run(olderThan);
return result.changes;
},
downsampleEvents(
async downsampleEvents(
kind: string,
key: string,
intervalMs: number,
olderThan: number,
): number {
): Promise<number> {
const safeInterval = Math.floor(Math.abs(intervalMs));
if (safeInterval <= 0) return 0;
const stmt = db.prepare(`
@@ -818,7 +819,7 @@ export function createScopedStore(
.sort();
},
putObject(data: unknown): string {
async putObject(data: unknown): Promise<string> {
const hash = hashObject(data);
const filePath = join(objectsDir, `${hash}.json`);
if (!existsSync(filePath)) {
@@ -828,15 +829,15 @@ export function createScopedStore(
return hash;
},
getObject(hash: string): unknown | null {
async getObject(hash: string): Promise<unknown | null> {
const filePath = join(objectsDir, `${hash}.json`);
if (!existsSync(filePath)) return null;
return JSON.parse(readFileSync(filePath, 'utf-8'));
},
close(): void {
async close(): Promise<void> {
for (const store of openStores.values()) {
store.close();
await store.close();
}
for (const db of openDatabases.values()) {
db.close();
+10 -8
View File
@@ -103,23 +103,25 @@ export function startWatcher(
try {
const data = await def.collect();
const hash = store.putObject(data);
const hash = await store.putObject(data);
store.appendEvent({
await store.appendEvent({
occurredAt: Date.now(),
kind: 'vital',
key: def.key,
hash,
});
const window = store.queryByKind('vital', {
const window = await store.queryByKind('vital', {
key: def.key,
limit: 12,
});
const resolvedWindow: VitalWithData<any>[] = window.map((v) => ({
const resolvedWindow: VitalWithData<any>[] = await Promise.all(
window.map(async (v) => ({
...v,
data: v.hash ? store.getObject(v.hash) : null,
}));
data: v.hash ? await store.getObject(v.hash) : null,
})),
);
if (def.shouldWake(resolvedWindow)) {
wakeTick();
@@ -132,11 +134,11 @@ export function startWatcher(
}
console.error(`[watcher:${def.name}] error during tick:`, err);
try {
store.appendEvent({
await store.appendEvent({
occurredAt: Date.now(),
kind: 'vital',
key: `_error:${def.key}`,
hash: store.putObject({
hash: await store.putObject({
error: msg,
watcher: def.name,
}),
@@ -69,7 +69,7 @@ export function createWorkflowRule(
// Incremental read: first tick reads all, subsequent ticks read only new events
const afterId = checkpoint ? checkpoint.lastEventId : 0;
const newEvents = store.getAfter(afterId);
const newEvents = await store.getAfter(afterId);
// Initialize or reuse checkpoint
if (!checkpoint) {
@@ -145,11 +145,12 @@ export function createWorkflowRule(
// Build message chain from cached topic events
const cachedEvents = checkpoint.topicEvents.get(action.topicId) ?? [];
const chain: WorkflowMessage[] = cachedEvents.map((e) => ({
const chain: WorkflowMessage[] = await Promise.all(
cachedEvents.map(async (e) => ({
role: e.kind.slice(prefix.length),
content: e.hash
? (() => {
const obj = store.getObject(e.hash!);
? await (async () => {
const obj = await store.getObject(e.hash!);
if (typeof obj === 'string') return obj;
if (obj && typeof obj === 'object' && 'content' in obj)
return String((obj as any).content);
@@ -166,11 +167,12 @@ export function createWorkflowRule(
})()
: null,
timestamp: e.occurredAt,
}));
})),
);
// Log role-started
if (logStore) {
logStore.appendEvent({
await logStore.appendEvent({
occurredAt: Date.now(),
kind: `${wf.name}.role-started`,
key: action.topicId,
@@ -187,8 +189,8 @@ export function createWorkflowRule(
const result = await roleFn(chain, action.topicId, store);
// Adapter writes CAS + event
const hash = store.putObject(result.content);
const written = store.appendEvent({
const hash = await store.putObject(result.content);
const written = await store.appendEvent({
occurredAt: Date.now(),
kind: `${wf.name}.${action.role}`,
key: action.topicId,
@@ -213,7 +215,7 @@ export function createWorkflowRule(
}
if (logStore) {
logStore.appendEvent({
await logStore.appendEvent({
occurredAt: Date.now(),
kind: `${wf.name}.role-completed`,
key: action.topicId,
@@ -232,7 +234,7 @@ export function createWorkflowRule(
});
} catch (err) {
if (logStore) {
logStore.appendEvent({
await logStore.appendEvent({
occurredAt: Date.now(),
kind: `${wf.name}.role-failed`,
key: action.topicId,