chore: remove legacy broker code (pulse#6)
CI / test (push) Has been cancelled

This commit is contained in:
2026-04-18 02:09:59 +00:00
parent 5ccd50221c
commit 0517007d40
5 changed files with 4 additions and 1427 deletions
-10
View File
@@ -128,16 +128,6 @@ export function gcOrphanObjects(
'migrate',
'init',
'gc',
'task-created',
'task-assigned',
'task-acked',
'task-closed',
'task-given-up',
'project-created',
'project-paused',
'project-archived',
'llm-call-completed',
'tool-response',
]) {
const events = store.queryByKind(kind, {});
for (const event of events) {
-271
View File
@@ -6,14 +6,11 @@ import { mkdtempSync, rmSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import {
type ActiveProjectsData,
type AgentLoopTraceData,
composeRules,
createRule,
createScopedStore,
createStore,
findEffectiveEpoch,
type PendingTasksData,
type PulseStore,
type Rule,
rebuildSnapshot,
@@ -1140,271 +1137,3 @@ describe('runPulse with ScopedStore', () => {
store.close();
});
});
// ── rebuildSnapshot with task projection ───────────────────────
describe('rebuildSnapshot with task projection', () => {
let tmpDir: string;
let store: PulseStore;
beforeEach(() => {
tmpDir = mkdtempSync(join(tmpdir(), 'pulse-task-proj-'));
store = createStore({
eventsDbPath: join(tmpDir, 'events.db'),
objectsDir: join(tmpDir, 'objects'),
});
});
afterEach(async () => {
await new Promise((r) => setTimeout(r, 200));
store.close();
rmSync(tmpDir, { recursive: true, force: true });
});
it('should include pending-tasks in snapshot when systemStore provided', () => {
store.appendEvent({
occurredAt: 1000,
kind: 'task-created',
meta: JSON.stringify({
taskId: 't1',
projectId: 'proj1',
title: 'Fix bug',
prompt: 'Fix the login bug',
}),
});
const snapshot = rebuildSnapshot<{
timestamp: number;
'pending-tasks': Sensed<PendingTasksData>;
}>(store, ['pending-tasks'], null, { systemStore: store });
expect(snapshot['pending-tasks']).toBeTruthy();
expect(snapshot['pending-tasks'].data.pendingCount).toBe(1);
expect(snapshot['pending-tasks'].data.tasks[0]?.taskId).toBe('t1');
expect(snapshot['pending-tasks'].refreshedAt).toBeGreaterThan(0);
});
it('should not include pending-tasks when systemStore not provided', () => {
store.appendEvent({
occurredAt: 1000,
kind: 'task-created',
meta: JSON.stringify({
taskId: 't1',
projectId: 'proj1',
title: 'Fix bug',
prompt: 'Fix it',
}),
});
const snapshot = rebuildSnapshot<{
timestamp: number;
'pending-tasks'?: Sensed<PendingTasksData>;
}>(store, ['pending-tasks']);
expect(snapshot['pending-tasks']).toBeUndefined();
});
it('should not include pending-tasks when senseKeys does not contain pending-tasks', () => {
store.appendEvent({
occurredAt: 1000,
kind: 'task-created',
meta: JSON.stringify({
taskId: 't1',
projectId: 'proj1',
title: 'Fix bug',
prompt: 'Fix it',
}),
});
const snapshot = rebuildSnapshot<{
timestamp: number;
'pending-tasks'?: Sensed<PendingTasksData>;
}>(store, ['other-key'], null, { systemStore: store });
expect(snapshot['pending-tasks']).toBeUndefined();
});
it('should prefer workflowStore over systemStore for task projections', () => {
// Write task to a separate "routine" store
const routineDir = mkdtempSync(join(tmpdir(), 'pulse-routine-'));
const workflowStore = createStore({
eventsDbPath: join(routineDir, 'events.db'),
objectsDir: join(routineDir, 'objects'),
});
workflowStore.appendEvent({
occurredAt: 1000,
kind: 'task-created',
meta: JSON.stringify({
taskId: 'r1',
projectId: 'proj-routine',
title: 'Routine task',
prompt: 'from routine scope',
}),
});
// systemStore (store) has no task events
const snapshot = rebuildSnapshot<{
timestamp: number;
'pending-tasks': Sensed<PendingTasksData>;
}>(store, ['pending-tasks'], null, { systemStore: store, workflowStore });
expect(snapshot['pending-tasks']).toBeTruthy();
expect(snapshot['pending-tasks'].data.pendingCount).toBe(1);
expect(snapshot['pending-tasks'].data.tasks[0]?.taskId).toBe('r1');
workflowStore.close();
rmSync(routineDir, { recursive: true, force: true });
});
});
// ── rebuildSnapshot with active-projects + agent-loop-trace ─────────
describe('rebuildSnapshot with active-projects and agent-loop-trace', () => {
let tmpDir: string;
let store: PulseStore;
let workflowStore: PulseStore;
let routineDir: string;
beforeEach(() => {
tmpDir = mkdtempSync(join(tmpdir(), 'pulse-proj-trace-'));
store = createStore({
eventsDbPath: join(tmpDir, 'events.db'),
objectsDir: join(tmpDir, 'objects'),
});
routineDir = mkdtempSync(join(tmpdir(), 'pulse-routine-trace-'));
workflowStore = createStore({
eventsDbPath: join(routineDir, 'events.db'),
objectsDir: join(routineDir, 'objects'),
});
});
afterEach(async () => {
await new Promise((r) => setTimeout(r, 200));
store.close();
workflowStore.close();
rmSync(tmpDir, { recursive: true, force: true });
rmSync(routineDir, { recursive: true, force: true });
});
it('should include active-projects in snapshot when workflowStore provided', () => {
workflowStore.appendEvent({
occurredAt: 1000,
kind: 'project-created',
meta: JSON.stringify({ projectId: 'p1', name: 'Alpha' }),
});
workflowStore.appendEvent({
occurredAt: 2000,
kind: 'project-created',
meta: JSON.stringify({ projectId: 'p2', name: 'Beta' }),
});
const snapshot = rebuildSnapshot<{
timestamp: number;
'active-projects': Sensed<ActiveProjectsData>;
}>(store, ['active-projects'], null, { workflowStore });
expect(snapshot['active-projects']).toBeTruthy();
expect(snapshot['active-projects'].data.projectIds).toContain('p1');
expect(snapshot['active-projects'].data.projectIds).toContain('p2');
expect(snapshot['active-projects'].refreshedAt).toBeGreaterThan(0);
});
it('should not include active-projects without workflowStore', () => {
const snapshot = rebuildSnapshot<{
timestamp: number;
'active-projects'?: Sensed<ActiveProjectsData>;
}>(store, ['active-projects']);
expect(snapshot['active-projects']).toBeUndefined();
});
it('should include agent-loop-trace:{projectId} in snapshot', () => {
workflowStore.appendEvent({
occurredAt: 1000,
kind: 'llm-call-completed',
meta: JSON.stringify({
projectId: 'proj-x',
model: 'gpt-4',
durationMs: 300,
toolCalls: [{ name: 'read_file', arguments: { path: '/a' } }],
}),
});
workflowStore.appendEvent({
occurredAt: 2000,
kind: 'tool-response',
meta: JSON.stringify({
projectId: 'proj-x',
toolCallIndex: 0,
toolName: 'read_file',
result: 'content',
}),
});
const snapshot = rebuildSnapshot<{
timestamp: number;
'agent-loop-trace:proj-x': Sensed<AgentLoopTraceData>;
}>(store, ['agent-loop-trace:proj-x'], null, { workflowStore });
expect(snapshot['agent-loop-trace:proj-x']).toBeTruthy();
expect(snapshot['agent-loop-trace:proj-x'].data.messages).toHaveLength(2);
expect(snapshot['agent-loop-trace:proj-x'].data.messages[0].role).toBe(
'assistant',
);
expect(snapshot['agent-loop-trace:proj-x'].data.messages[1].role).toBe(
'tool',
);
});
it('should not include agent-loop-trace without workflowStore', () => {
const snapshot = rebuildSnapshot<{
timestamp: number;
'agent-loop-trace:proj-x'?: Sensed<AgentLoopTraceData>;
}>(store, ['agent-loop-trace:proj-x']);
expect(snapshot['agent-loop-trace:proj-x']).toBeUndefined();
});
it('agent-loop-trace nextCheckAt from set_next_check', () => {
workflowStore.appendEvent({
occurredAt: 5000,
kind: 'llm-call-completed',
meta: JSON.stringify({
projectId: 'proj-y',
durationMs: 100,
toolCalls: [{ name: 'set_next_check', arguments: { tickMs: 30000 } }],
}),
});
const snapshot = rebuildSnapshot<{
timestamp: number;
'agent-loop-trace:proj-y': Sensed<AgentLoopTraceData>;
}>(store, ['agent-loop-trace:proj-y'], null, { workflowStore });
expect(snapshot['agent-loop-trace:proj-y'].data.nextCheckAt).toBe(35000);
});
it('multiple agent-loop-trace keys for different projects', () => {
workflowStore.appendEvent({
occurredAt: 1000,
kind: 'llm-call-completed',
meta: JSON.stringify({ projectId: 'p1', durationMs: 100 }),
});
workflowStore.appendEvent({
occurredAt: 2000,
kind: 'llm-call-completed',
meta: JSON.stringify({ projectId: 'p2', durationMs: 200 }),
});
const snapshot = rebuildSnapshot<{
timestamp: number;
'agent-loop-trace:p1': Sensed<AgentLoopTraceData>;
'agent-loop-trace:p2': Sensed<AgentLoopTraceData>;
}>(store, ['agent-loop-trace:p1', 'agent-loop-trace:p2'], null, {
workflowStore,
});
expect(snapshot['agent-loop-trace:p1'].data.messages).toHaveLength(1);
expect(snapshot['agent-loop-trace:p2'].data.messages).toHaveLength(1);
});
});
+4 -45
View File
@@ -16,11 +16,7 @@ import { createGcTrigger, DEFAULT_GC_CONFIG, type GcConfig } from './gc.js';
import { foldAllProjections, getProjectionState } from './projection-engine.js';
import type { EventRecord, PulseStore, ScopedStore } from './store.js';
import { startWatcher, type WatcherDef } from './watcher.js';
import {
buildActiveProjectsFromEvents,
buildAgentLoopTraceFromEvents,
buildPendingTasksFromEvents,
} from './watchers/pending-tasks-projection.js';
// ── Sensed Types ───────────────────────────────────────────────
@@ -313,38 +309,6 @@ export function rebuildSnapshot<S extends { timestamp: number }>(
snapshot['_error:cas_miss'] = { keys: casMisses, count: casMisses.length };
}
// Task projections: inject pending-tasks and agent-capability-stats from event fold.
// Prefer workflowStore (task events live in the 'workflows' scope); fall back to systemStore for backward compat.
const taskStore = options?.workflowStore ?? options?.systemStore;
if (taskStore && senseKeys.includes('pending-tasks')) {
snapshot['pending-tasks'] = {
data: buildPendingTasksFromEvents(taskStore),
refreshedAt: Date.now(),
} as Sensed<unknown>;
}
// Active-projects projection from project-created/paused/archived events
const workflowStore = options?.workflowStore;
if (workflowStore && senseKeys.includes('active-projects')) {
snapshot['active-projects'] = {
data: buildActiveProjectsFromEvents(workflowStore),
refreshedAt: Date.now(),
} as Sensed<unknown>;
}
// Agent-loop-trace projections (per projectId, dynamic sense keys)
for (const key of senseKeys) {
if (key.startsWith('agent-loop-trace:')) {
const projectId = key.slice('agent-loop-trace:'.length);
if (workflowStore) {
snapshot[key] = {
data: buildAgentLoopTraceFromEvents(workflowStore, projectId),
refreshedAt: Date.now(),
} as Sensed<unknown>;
}
}
}
return snapshot as S;
}
@@ -1034,6 +998,8 @@ export {
} from './gc.js';
// ── Projection Engine ───────────────────────────────────────────
export * from './projection-engine.js';
// ── Task Event Types ────────────────────────────────────────────
export type {
ActiveProjectsData,
AgentLoopTraceData,
@@ -1059,11 +1025,4 @@ export type {
ToolResponseMeta,
TraceMessage,
} from './task-events.js';
// ── Task Events ─────────────────────────────────────────────────
export {
buildActiveProjectsFromEvents,
buildAgentLoopTraceFromEvents,
buildInflightBrokerFromEvents,
buildPendingTasksFromEvents,
buildProjectsFromEvents,
} from './watchers/pending-tasks-projection.js';
@@ -1,840 +0,0 @@
import { afterEach, beforeEach, describe, expect, it } from 'bun:test';
import { mkdtempSync, rmSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import {
createScopedStore,
createStore,
type PulseStore,
type ScopedStore,
} from '../store.js';
import {
buildActiveProjectsFromEvents,
buildAgentLoopTraceFromEvents,
buildInflightBrokerFromEvents,
buildPendingTasksFromEvents,
buildProjectsFromEvents,
} from './pending-tasks-projection.js';
function makeTaskMeta(overrides: Record<string, unknown>) {
return JSON.stringify(overrides);
}
describe('buildPendingTasksFromEvents', () => {
let tmpDir: string;
let store: PulseStore;
beforeEach(() => {
tmpDir = mkdtempSync(join(tmpdir(), 'pulse-test-'));
store = createStore({
eventsDbPath: join(tmpDir, 'events.db'),
objectsDir: join(tmpDir, 'objects'),
});
});
afterEach(() => {
store.close();
rmSync(tmpDir, { recursive: true, force: true });
});
it('empty store → pendingCount = 0', () => {
const result = buildPendingTasksFromEvents(store);
expect(result.pendingCount).toBe(0);
expect(result.tasks).toEqual([]);
expect(result.byProject).toEqual({});
});
it('task-created → pendingCount = 1, status = pending', () => {
store.appendEvent({
occurredAt: 1000,
kind: 'task-created',
meta: makeTaskMeta({
taskId: 't1',
projectId: 'proj-a',
title: 'Fix bug',
description: 'fix the login bug',
type: 'bug',
priority: 5,
creatorId: 'user-1',
}),
});
const result = buildPendingTasksFromEvents(store);
expect(result.pendingCount).toBe(1);
expect(result.tasks[0].taskId).toBe('t1');
expect(result.tasks[0].status).toBe('pending');
expect(result.tasks[0].projectId).toBe('proj-a');
expect(result.tasks[0].priority).toBe(5);
expect(result.tasks[0].type).toBe('bug');
expect(result.tasks[0].creatorId).toBe('user-1');
expect(result.tasks[0].description).toBe('fix the login bug');
expect(result.byProject['proj-a']).toHaveLength(1);
});
it('task-routing → status = routing', () => {
store.appendEvent({
occurredAt: 1000,
kind: 'task-created',
meta: makeTaskMeta({
taskId: 't1',
projectId: 'proj-a',
title: 'Fix bug',
description: 'fix it',
type: 'bug',
priority: 0,
creatorId: 'user-1',
}),
});
store.appendEvent({
occurredAt: 1500,
kind: 'task-routing',
meta: makeTaskMeta({
taskId: 't1',
brokerSessionId: 'broker-123',
}),
});
const result = buildPendingTasksFromEvents(store);
expect(result.pendingCount).toBe(1);
expect(result.tasks[0].status).toBe('routing');
});
it('task-assigned → status = assigned, has assigneeId', () => {
store.appendEvent({
occurredAt: 1000,
kind: 'task-created',
meta: makeTaskMeta({
taskId: 't1',
projectId: 'proj-a',
title: 'Fix bug',
description: 'fix it',
type: 'bug',
priority: 0,
creatorId: 'user-1',
}),
});
store.appendEvent({
occurredAt: 2000,
kind: 'task-assigned',
meta: makeTaskMeta({
taskId: 't1',
assigneeId: 'cursor',
assignedBy: 'broker',
}),
});
const result = buildPendingTasksFromEvents(store);
expect(result.pendingCount).toBe(1);
expect(result.tasks[0].status).toBe('assigned');
expect(result.tasks[0].assigneeId).toBe('cursor');
});
it('task-responded → status = pending, has lastRespondedResult', () => {
store.appendEvent({
occurredAt: 1000,
kind: 'task-created',
meta: makeTaskMeta({
taskId: 't1',
projectId: 'proj-a',
title: 'Fix bug',
description: 'fix it',
type: 'bug',
priority: 0,
creatorId: 'user-1',
}),
});
store.appendEvent({
occurredAt: 2000,
kind: 'task-assigned',
meta: makeTaskMeta({
taskId: 't1',
assigneeId: 'cursor',
assignedBy: 'broker',
}),
});
store.appendEvent({
occurredAt: 3000,
kind: 'task-responded',
meta: makeTaskMeta({
taskId: 't1',
assigneeId: 'cursor',
result: 'Fixed the login issue',
}),
});
const result = buildPendingTasksFromEvents(store);
expect(result.pendingCount).toBe(1);
expect(result.tasks[0].status).toBe('pending');
expect(result.tasks[0].lastRespondedResult).toBe('Fixed the login issue');
});
it('task-closed by matching creatorId → removed from map', () => {
store.appendEvent({
occurredAt: 1000,
kind: 'task-created',
meta: makeTaskMeta({
taskId: 't1',
projectId: 'proj-a',
title: 'Fix bug',
description: 'fix it',
type: 'bug',
priority: 0,
creatorId: 'user-1',
}),
});
store.appendEvent({
occurredAt: 2000,
kind: 'task-closed',
meta: makeTaskMeta({ taskId: 't1', creatorId: 'user-1' }),
});
const result = buildPendingTasksFromEvents(store);
expect(result.pendingCount).toBe(0);
expect(result.tasks).toEqual([]);
});
it('task-closed by non-matching creatorId → ignored', () => {
store.appendEvent({
occurredAt: 1000,
kind: 'task-created',
meta: makeTaskMeta({
taskId: 't1',
projectId: 'proj-a',
title: 'Fix bug',
description: 'fix it',
type: 'bug',
priority: 0,
creatorId: 'user-1',
}),
});
store.appendEvent({
occurredAt: 2000,
kind: 'task-closed',
meta: makeTaskMeta({ taskId: 't1', creatorId: 'intruder' }),
});
const result = buildPendingTasksFromEvents(store);
expect(result.pendingCount).toBe(1);
expect(result.tasks[0].status).toBe('pending');
});
it('full lifecycle: created → assigned → responded → closed', () => {
store.appendEvent({
occurredAt: 1000,
kind: 'task-created',
meta: makeTaskMeta({
taskId: 't1',
projectId: 'proj-a',
title: 'Task',
description: 'do it',
type: 'action',
priority: 0,
creatorId: 'user-1',
}),
});
store.appendEvent({
occurredAt: 2000,
kind: 'task-assigned',
meta: makeTaskMeta({
taskId: 't1',
assigneeId: 'cursor',
assignedBy: 'broker',
}),
});
store.appendEvent({
occurredAt: 3000,
kind: 'task-responded',
meta: makeTaskMeta({
taskId: 't1',
assigneeId: 'cursor',
result: 'done',
}),
});
store.appendEvent({
occurredAt: 4000,
kind: 'task-closed',
meta: makeTaskMeta({ taskId: 't1', creatorId: 'user-1' }),
});
const result = buildPendingTasksFromEvents(store);
expect(result.pendingCount).toBe(0);
expect(result.tasks).toEqual([]);
});
it('multiple tasks across multiple projects', () => {
store.appendEvent({
occurredAt: 1000,
kind: 'task-created',
meta: makeTaskMeta({
taskId: 't1',
projectId: 'proj-a',
title: 'Task A1',
description: 'do a1',
type: 'bug',
priority: 0,
creatorId: 'user-1',
}),
});
store.appendEvent({
occurredAt: 1001,
kind: 'task-created',
meta: makeTaskMeta({
taskId: 't2',
projectId: 'proj-b',
title: 'Task B1',
description: 'do b1',
type: 'rfc',
priority: 0,
creatorId: 'user-1',
}),
});
store.appendEvent({
occurredAt: 1002,
kind: 'task-created',
meta: makeTaskMeta({
taskId: 't3',
projectId: 'proj-a',
title: 'Task A2',
description: 'do a2',
type: 'action',
priority: 0,
creatorId: 'user-1',
}),
});
store.appendEvent({
occurredAt: 2000,
kind: 'task-closed',
meta: makeTaskMeta({ taskId: 't1', creatorId: 'user-1' }),
});
const result = buildPendingTasksFromEvents(store);
expect(result.pendingCount).toBe(2);
expect(result.byProject['proj-a']).toHaveLength(1);
expect(result.byProject['proj-b']).toHaveLength(1);
expect(result.byProject['proj-a'][0].taskId).toBe('t3');
expect(result.byProject['proj-b'][0].taskId).toBe('t2');
});
it('default priority is 0 when not specified', () => {
store.appendEvent({
occurredAt: 1000,
kind: 'task-created',
meta: makeTaskMeta({
taskId: 't1',
projectId: 'proj-a',
title: 'No priority',
description: 'do it',
type: 'action',
creatorId: 'user-1',
}),
});
const result = buildPendingTasksFromEvents(store);
expect(result.tasks[0].priority).toBe(0);
});
it('events without meta are skipped gracefully', () => {
store.appendEvent({
occurredAt: 1000,
kind: 'task-created',
});
const result = buildPendingTasksFromEvents(store);
expect(result.pendingCount).toBe(0);
expect(result.tasks).toEqual([]);
});
});
// ── buildProjectsFromEvents ───────────────────────────────────
describe('buildProjectsFromEvents', () => {
let tmpDir: string;
let store: PulseStore;
beforeEach(() => {
tmpDir = mkdtempSync(join(tmpdir(), 'pulse-proj-'));
store = createStore({
eventsDbPath: join(tmpDir, 'events.db'),
objectsDir: join(tmpDir, 'objects'),
});
});
afterEach(() => {
store.close();
rmSync(tmpDir, { recursive: true, force: true });
});
it('empty store → empty record', () => {
const result = buildProjectsFromEvents(store);
expect(result).toEqual({});
});
it('project-created → record with project state', () => {
store.appendEvent({
occurredAt: 1000,
kind: 'project-created',
meta: makeTaskMeta({
projectId: 'p1',
name: 'Alpha',
repoDir: '/home/user/alpha',
}),
});
const result = buildProjectsFromEvents(store);
expect(result.p1).toEqual({
projectId: 'p1',
name: 'Alpha',
repoDir: '/home/user/alpha',
});
});
it('multiple projects', () => {
store.appendEvent({
occurredAt: 1000,
kind: 'project-created',
meta: makeTaskMeta({
projectId: 'p1',
name: 'Alpha',
repoDir: '/repo/a',
}),
});
store.appendEvent({
occurredAt: 2000,
kind: 'project-created',
meta: makeTaskMeta({
projectId: 'p2',
name: 'Beta',
repoDir: '/repo/b',
}),
});
const result = buildProjectsFromEvents(store);
expect(Object.keys(result)).toHaveLength(2);
expect(result.p1.name).toBe('Alpha');
expect(result.p2.name).toBe('Beta');
});
});
// ── buildInflightBrokerFromEvents ─────────────────────────────
describe('buildInflightBrokerFromEvents', () => {
let tmpDir: string;
let store: PulseStore;
beforeEach(() => {
tmpDir = mkdtempSync(join(tmpdir(), 'pulse-broker-'));
store = createStore({
eventsDbPath: join(tmpDir, 'events.db'),
objectsDir: join(tmpDir, 'objects'),
});
});
afterEach(() => {
store.close();
rmSync(tmpDir, { recursive: true, force: true });
});
it('no executing effects → active = false', () => {
const result = buildInflightBrokerFromEvents(store);
expect(result.active).toBe(false);
});
it('executing broker effect without ack → active = true', () => {
store.appendEvent({
occurredAt: 1000,
kind: 'effect-executing',
key: 'effect-1',
meta: JSON.stringify({ type: 'broker' }),
});
const result = buildInflightBrokerFromEvents(store);
expect(result.active).toBe(true);
});
it('executing broker effect with ack → active = false', () => {
store.appendEvent({
occurredAt: 1000,
kind: 'effect-executing',
key: 'effect-1',
meta: JSON.stringify({ type: 'broker' }),
});
store.appendEvent({
occurredAt: 2000,
kind: 'effect-acked',
key: 'effect-1',
});
const result = buildInflightBrokerFromEvents(store);
expect(result.active).toBe(false);
});
it('executing broker effect with failure → active = false', () => {
store.appendEvent({
occurredAt: 1000,
kind: 'effect-executing',
key: 'effect-1',
meta: JSON.stringify({ type: 'broker' }),
});
store.appendEvent({
occurredAt: 2000,
kind: 'effect-failed',
key: 'effect-1',
});
const result = buildInflightBrokerFromEvents(store);
expect(result.active).toBe(false);
});
it('non-broker executing effect → active = false', () => {
store.appendEvent({
occurredAt: 1000,
kind: 'effect-executing',
key: 'effect-1',
meta: JSON.stringify({ type: 'cursor' }),
});
const result = buildInflightBrokerFromEvents(store);
expect(result.active).toBe(false);
});
});
// ── routine scope integration ─────────────────────────────────────
describe('buildPendingTasksFromEvents with routine scope', () => {
let tmpDir: string;
let scopedStore: ScopedStore;
beforeEach(() => {
tmpDir = mkdtempSync(join(tmpdir(), 'pulse-routine-test-'));
scopedStore = createScopedStore({
basePath: join(tmpDir, 'scopes'),
objectsDir: join(tmpDir, 'objects'),
});
});
afterEach(() => {
scopedStore.close();
rmSync(tmpDir, { recursive: true, force: true });
});
it('task-created written to routine scope is correctly folded', () => {
const workflowStore = scopedStore.scope('workflows');
workflowStore.appendEvent({
occurredAt: 1000,
kind: 'task-created',
meta: makeTaskMeta({
taskId: 'rt-1',
projectId: 'proj-x',
title: 'Routine task',
description: 'do routine work',
type: 'action',
priority: 3,
creatorId: 'user-1',
}),
});
const result = buildPendingTasksFromEvents(workflowStore);
expect(result.pendingCount).toBe(1);
expect(result.tasks[0].taskId).toBe('rt-1');
expect(result.tasks[0].projectId).toBe('proj-x');
expect(result.tasks[0].status).toBe('pending');
expect(result.tasks[0].priority).toBe(3);
});
it('full lifecycle: created → assigned → responded → closed', () => {
const workflowStore = scopedStore.scope('workflows');
workflowStore.appendEvent({
occurredAt: 1000,
kind: 'task-created',
meta: makeTaskMeta({
taskId: 'rt-2',
projectId: 'proj-y',
title: 'Lifecycle task',
description: 'full cycle',
type: 'bug',
priority: 0,
creatorId: 'user-1',
}),
});
workflowStore.appendEvent({
occurredAt: 2000,
kind: 'task-assigned',
meta: makeTaskMeta({
taskId: 'rt-2',
assigneeId: 'cursor',
assignedBy: 'broker',
}),
});
workflowStore.appendEvent({
occurredAt: 3000,
kind: 'task-responded',
meta: makeTaskMeta({
taskId: 'rt-2',
assigneeId: 'cursor',
result: 'completed',
}),
});
workflowStore.appendEvent({
occurredAt: 4000,
kind: 'task-closed',
meta: makeTaskMeta({ taskId: 'rt-2', creatorId: 'user-1' }),
});
const result = buildPendingTasksFromEvents(workflowStore);
expect(result.pendingCount).toBe(0);
expect(result.tasks).toEqual([]);
});
it('routine scope is isolated from _system scope', () => {
const workflowStore = scopedStore.scope('workflows');
const systemStore = scopedStore.scope('_system');
workflowStore.appendEvent({
occurredAt: 1000,
kind: 'task-created',
meta: makeTaskMeta({
taskId: 'rt-3',
projectId: 'proj-z',
title: 'Routine only',
description: 'routine scope',
type: 'action',
priority: 0,
creatorId: 'user-1',
}),
});
const systemResult = buildPendingTasksFromEvents(systemStore);
expect(systemResult.pendingCount).toBe(0);
const routineResult = buildPendingTasksFromEvents(workflowStore);
expect(routineResult.pendingCount).toBe(1);
expect(routineResult.tasks[0].taskId).toBe('rt-3');
});
});
// ── buildActiveProjectsFromEvents ─────────────────────────────────
describe('buildActiveProjectsFromEvents', () => {
let tmpDir: string;
let store: PulseStore;
beforeEach(() => {
tmpDir = mkdtempSync(join(tmpdir(), 'pulse-active-proj-'));
store = createStore({
eventsDbPath: join(tmpDir, 'events.db'),
objectsDir: join(tmpDir, 'objects'),
});
});
afterEach(() => {
store.close();
rmSync(tmpDir, { recursive: true, force: true });
});
it('empty store → no active projects', () => {
const result = buildActiveProjectsFromEvents(store);
expect(result.projectIds).toEqual([]);
});
it('project-created adds to active list', () => {
store.appendEvent({
occurredAt: 1000,
kind: 'project-created',
meta: makeTaskMeta({
projectId: 'p1',
name: 'Alpha',
repoDir: '/repo/a',
}),
});
store.appendEvent({
occurredAt: 2000,
kind: 'project-created',
meta: makeTaskMeta({
projectId: 'p2',
name: 'Beta',
repoDir: '/repo/b',
}),
});
const result = buildActiveProjectsFromEvents(store);
expect(result.projectIds).toContain('p1');
expect(result.projectIds).toContain('p2');
expect(result.projectIds).toHaveLength(2);
});
it('project-paused removes from active list', () => {
store.appendEvent({
occurredAt: 1000,
kind: 'project-created',
meta: makeTaskMeta({
projectId: 'p1',
name: 'Alpha',
repoDir: '/repo/a',
}),
});
store.appendEvent({
occurredAt: 2000,
kind: 'project-paused',
meta: makeTaskMeta({ projectId: 'p1', reason: 'maintenance' }),
});
const result = buildActiveProjectsFromEvents(store);
expect(result.projectIds).toEqual([]);
});
it('project-archived removes from active list', () => {
store.appendEvent({
occurredAt: 1000,
kind: 'project-created',
meta: makeTaskMeta({
projectId: 'p1',
name: 'Alpha',
repoDir: '/repo/a',
}),
});
store.appendEvent({
occurredAt: 2000,
kind: 'project-archived',
meta: makeTaskMeta({ projectId: 'p1', reason: 'done' }),
});
const result = buildActiveProjectsFromEvents(store);
expect(result.projectIds).toEqual([]);
});
it('events without meta are skipped gracefully', () => {
store.appendEvent({
occurredAt: 1000,
kind: 'project-created',
});
const result = buildActiveProjectsFromEvents(store);
expect(result.projectIds).toEqual([]);
});
});
// ── buildAgentLoopTraceFromEvents ─────────────────────────────────
describe('buildAgentLoopTraceFromEvents', () => {
let tmpDir: string;
let store: PulseStore;
beforeEach(() => {
tmpDir = mkdtempSync(join(tmpdir(), 'pulse-trace-'));
store = createStore({
eventsDbPath: join(tmpDir, 'events.db'),
objectsDir: join(tmpDir, 'objects'),
});
});
afterEach(() => {
store.close();
rmSync(tmpDir, { recursive: true, force: true });
});
it('empty store → empty messages, nextCheckAt = 0', () => {
const result = buildAgentLoopTraceFromEvents(store, 'proj-1');
expect(result.messages).toEqual([]);
expect(result.nextCheckAt).toBe(0);
});
it('llm-call-completed creates assistant message with toolCalls', () => {
store.appendEvent({
occurredAt: 1000,
kind: 'llm-call-completed',
meta: JSON.stringify({
projectId: 'proj-1',
model: 'gpt-4',
toolCalls: [{ name: 'read_file', arguments: { path: '/tmp/a' } }],
durationMs: 500,
}),
});
const result = buildAgentLoopTraceFromEvents(store, 'proj-1');
expect(result.messages).toHaveLength(1);
expect(result.messages[0].role).toBe('assistant');
expect(result.messages[0].toolCalls).toHaveLength(1);
expect(result.messages[0].toolCalls![0].name).toBe('read_file');
expect(result.messages[0].ts).toBe(1000);
});
it('tool-response creates tool message', () => {
store.appendEvent({
occurredAt: 2000,
kind: 'tool-response',
meta: JSON.stringify({
projectId: 'proj-1',
toolCallIndex: 0,
toolName: 'read_file',
result: 'file content here',
}),
});
const result = buildAgentLoopTraceFromEvents(store, 'proj-1');
expect(result.messages).toHaveLength(1);
expect(result.messages[0].role).toBe('tool');
expect(result.messages[0].toolName).toBe('read_file');
expect(result.messages[0].result).toBe('file content here');
});
it('filters by projectId', () => {
store.appendEvent({
occurredAt: 1000,
kind: 'llm-call-completed',
meta: JSON.stringify({ projectId: 'proj-1', durationMs: 100 }),
});
store.appendEvent({
occurredAt: 2000,
kind: 'llm-call-completed',
meta: JSON.stringify({ projectId: 'proj-2', durationMs: 200 }),
});
const result1 = buildAgentLoopTraceFromEvents(store, 'proj-1');
expect(result1.messages).toHaveLength(1);
const result2 = buildAgentLoopTraceFromEvents(store, 'proj-2');
expect(result2.messages).toHaveLength(1);
});
it('ring buffer truncates to 20 messages', () => {
for (let i = 0; i < 25; i++) {
store.appendEvent({
occurredAt: 1000 + i,
kind: 'llm-call-completed',
meta: JSON.stringify({ projectId: 'proj-1', durationMs: 100 }),
});
}
const result = buildAgentLoopTraceFromEvents(store, 'proj-1');
expect(result.messages).toHaveLength(20);
expect(result.messages[0].ts).toBe(1005);
expect(result.messages[19].ts).toBe(1024);
});
it('nextCheckAt from set_next_check tool call', () => {
store.appendEvent({
occurredAt: 5000,
kind: 'llm-call-completed',
meta: JSON.stringify({
projectId: 'proj-1',
durationMs: 100,
toolCalls: [{ name: 'set_next_check', arguments: { tickMs: 30000 } }],
}),
});
const result = buildAgentLoopTraceFromEvents(store, 'proj-1');
expect(result.nextCheckAt).toBe(35000);
});
it('events without meta are skipped', () => {
store.appendEvent({
occurredAt: 1000,
kind: 'llm-call-completed',
});
const result = buildAgentLoopTraceFromEvents(store, 'proj-1');
expect(result.messages).toEqual([]);
});
});
@@ -1,261 +0,0 @@
import type { PulseStore } from '../store.js';
import type {
ActiveProjectsData,
AgentLoopTraceData,
InflightBrokerData,
LlmCallCompletedMeta,
PendingTasksData,
ProjectState,
TaskRoutingMeta,
TaskState,
ToolResponseMeta,
TraceMessage,
} from '../task-events.js';
/**
* Fold task-* events into current task state.
*
* State machine:
* task-created → status: 'pending'
* task-routing → status: 'routing'
* task-assigned → status: 'assigned', store assigneeId + assignedBy
* task-responded → status: 'pending', store lastRespondedResult (cycle back for re-broker)
* task-closed → remove from map (only if creatorId matches)
*/
export function buildPendingTasksFromEvents(
store: PulseStore,
): PendingTasksData {
const kinds = [
'task-created',
'task-routing',
'task-assigned',
'task-responded',
'task-closed',
];
const allEvents = kinds.flatMap((kind) => store.queryByKind(kind));
allEvents.sort((a, b) => a.occurredAt - b.occurredAt);
const taskMap = new Map<string, TaskState>();
for (const event of allEvents) {
if (!event.meta) continue;
const meta = JSON.parse(event.meta);
switch (event.kind) {
case 'task-created': {
taskMap.set(meta.taskId, {
taskId: meta.taskId,
projectId: meta.projectId,
title: meta.title,
description: meta.description ?? meta.prompt ?? '',
type: meta.type ?? 'action',
priority: meta.priority ?? 0,
creatorId: meta.creatorId ?? '',
status: 'pending',
createdAt: event.occurredAt,
updatedAt: event.occurredAt,
});
break;
}
case 'task-routing': {
const m = meta as TaskRoutingMeta;
const task = taskMap.get(m.taskId);
if (task) {
task.status = 'routing';
task.updatedAt = event.occurredAt;
}
break;
}
case 'task-assigned': {
const task = taskMap.get(meta.taskId);
if (task) {
task.status = 'assigned';
task.assigneeId = meta.assigneeId;
task.updatedAt = event.occurredAt;
}
break;
}
case 'task-responded': {
const task = taskMap.get(meta.taskId);
if (task) {
task.status = 'pending';
task.lastRespondedResult = meta.result;
task.updatedAt = event.occurredAt;
}
break;
}
case 'task-closed': {
const task = taskMap.get(meta.taskId);
if (task && task.creatorId === meta.creatorId) {
taskMap.delete(meta.taskId);
}
break;
}
}
}
const activeTasks = [...taskMap.values()];
const byProject: Record<string, TaskState[]> = {};
for (const task of activeTasks) {
if (!byProject[task.projectId]) byProject[task.projectId] = [];
byProject[task.projectId].push(task);
}
return {
pendingCount: activeTasks.length,
tasks: activeTasks,
byProject,
checkedAt: Date.now(),
};
}
/**
* Fold project-created events → Record<projectId, ProjectState>.
*/
export function buildProjectsFromEvents(
store: PulseStore,
): Record<string, ProjectState> {
const allEvents = store.queryByKind('project-created');
allEvents.sort((a, b) => a.occurredAt - b.occurredAt);
const projects: Record<string, ProjectState> = {};
for (const event of allEvents) {
if (!event.meta) continue;
const meta = JSON.parse(event.meta);
projects[meta.projectId] = {
projectId: meta.projectId,
name: meta.name,
repoDir: meta.repoDir ?? '',
};
}
return projects;
}
/**
* Check whether there is an inflight broker effect in the system scope.
*
* Looks for effect-executing events whose meta contains `"broker"` kind
* that do NOT have a corresponding effect-acked or effect-failed event.
*/
export function buildInflightBrokerFromEvents(
systemStore: PulseStore,
): InflightBrokerData {
const executingEvents = systemStore.queryByKind('effect-executing');
for (const ev of executingEvents) {
if (!ev.meta) continue;
try {
const meta = JSON.parse(ev.meta);
if (meta.type !== 'broker') continue;
} catch {
continue;
}
const key = ev.key ?? String(ev.id);
const acked = systemStore.getLatest('effect-acked', key);
if (acked) continue;
const failed = systemStore.getLatest('effect-failed', key);
if (failed) continue;
return { active: true };
}
return { active: false };
}
/**
* Fold project-created/paused/archived events → active project list.
*/
export function buildActiveProjectsFromEvents(
store: PulseStore,
): ActiveProjectsData {
const kinds = ['project-created', 'project-paused', 'project-archived'];
const allEvents = kinds.flatMap((kind) => store.queryByKind(kind));
allEvents.sort((a, b) => a.occurredAt - b.occurredAt);
const active = new Set<string>();
for (const event of allEvents) {
if (!event.meta) continue;
const meta = JSON.parse(event.meta);
switch (event.kind) {
case 'project-created':
active.add(meta.projectId);
break;
case 'project-paused':
case 'project-archived':
active.delete(meta.projectId);
break;
}
}
return { projectIds: [...active] };
}
const TRACE_RING_BUFFER_MAX = 20;
/**
* Fold llm-call-completed + tool-response events for a given projectId
* → ring buffer of last 20 messages + nextCheckAt.
*/
export function buildAgentLoopTraceFromEvents(
store: PulseStore,
projectId: string,
): AgentLoopTraceData {
const kinds = ['llm-call-completed', 'tool-response'];
const allEvents = kinds.flatMap((kind) => store.queryByKind(kind));
allEvents.sort((a, b) => a.occurredAt - b.occurredAt);
const messages: TraceMessage[] = [];
let nextCheckAt = 0;
for (const event of allEvents) {
if (!event.meta) continue;
const meta = JSON.parse(event.meta);
if (meta.projectId !== projectId) continue;
switch (event.kind) {
case 'llm-call-completed': {
const m = meta as LlmCallCompletedMeta;
messages.push({
role: 'assistant',
toolCalls: m.toolCalls,
ts: event.occurredAt,
});
if (m.toolCalls) {
const setCheck = m.toolCalls.find(
(tc) => tc.name === 'set_next_check',
);
if (setCheck) {
const tickMs = (setCheck.arguments as { tickMs?: number }).tickMs;
if (typeof tickMs === 'number') {
nextCheckAt = event.occurredAt + tickMs;
}
}
}
break;
}
case 'tool-response': {
const m = meta as ToolResponseMeta;
messages.push({
role: 'tool',
toolName: m.toolName,
result: m.result,
ts: event.occurredAt,
});
break;
}
}
}
while (messages.length > TRACE_RING_BUFFER_MAX) {
messages.shift();
}
return { messages, nextCheckAt };
}