diff --git a/packages/pulse/COUNCIL.md b/packages/pulse/COUNCIL.md deleted file mode 100644 index 4d83afa..0000000 --- a/packages/pulse/COUNCIL.md +++ /dev/null @@ -1,63 +0,0 @@ -# Council Model - -The Council is a multi-agent deliberation framework built on Pulse's -event-sourced store. Personas discuss Topics under the guidance of a Moderator, -each executing inside an isolated Container. - -## Core Concepts - -### Persona Registry (`src/persona.ts`) -A **Persona** is an agent identity carrying a name, container type, and -capability list. `buildPersonasFromEvents()` projects `persona-registered` and -`persona-updated` events (merged by `occurredAt`) into a `Map`. -Re-registering the same ID is an idempotent overwrite; updates patch only the -fields provided. - -### Container Registry (`src/container.ts`) -A **Container** is a runtime environment (`cursor`, `claude-code`, `hermes`, …) -described by `containerId`, `type`, `host`, `tools[]`, and `status` -(`online | offline`). `buildContainersFromEvents()` replays -`container-registered` and `container-status-changed` events into a live map. - -### Persona × Container Binding (`src/binding.ts`) -A **Binding** ties one Persona to exactly one Container at a time. -`persona-bound` creates or overwrites the link; `persona-unbound` deletes it. -`resolveRole(personaId, personas, bindings, containers)` returns the full -`{ persona, container }` pair needed for dispatch, or `null` if any lookup fails. - -### Topic / Sub-Topic (`src/topic.ts`) -A **Topic** is a named discussion thread (`topicId`, `title`, `createdBy`) -that transitions through `open → closed` and may carry a `summary`. -`parentTopicId` enables nesting — the Moderator can spawn sub-topics, creating -recursive councils whose summaries fold back into the parent. - -### Moderator (`src/moderator.ts`) -The Moderator is a pure async function: -`(participants: PersonaState[], history: CouncilMessage[]) → ModeratorDecision`. -Four decision types: - -| Decision | Effect | -|----------|--------| -| `speak` | Select the next speaker by `personaId` | -| `add` | Invite a new Persona into the council | -| `spawn` | Create a sub-topic with its own participant set | -| `close` | End deliberation, optionally attaching a summary | - -Built-in strategies: - -| Strategy | File | Behavior | -|----------|------|----------| -| Round-robin | `src/moderators/round-robin.ts` | Each participant speaks once in registration order, then closes | -| LLM-driven | `src/moderators/llm-moderator.ts` | An LLM picks the next action via tool calls (`next_speaker`, `add_member`, `spawn_sub_topic`, `close_council`) | - -### Council Execution Loop (`src/council.ts`) -`runCouncil(opts: CouncilOptions): Promise` drives deliberation: - -1. Call `moderator(participants, history)` to obtain a `ModeratorDecision`. -2. Execute the decision: - - **speak** → invoke `onSpeak(personaId)`, append result to history. - - **add** → push persona into participants, invoke `onAddMember` callback. - - **spawn** → invoke `onSpawn(topicId, title, participants)` recursively; - the sub-council's summary is appended to parent history. - - **close** → return `{ history, summary, rounds }`. -3. Repeat until `close` is returned or `maxRounds` (default 10) is exceeded. diff --git a/packages/pulse/V2-SERIAL.md b/packages/pulse/V2-SERIAL.md deleted file mode 100644 index 81e8dbf..0000000 --- a/packages/pulse/V2-SERIAL.md +++ /dev/null @@ -1 +0,0 @@ -Council v2 serialization test passed. diff --git a/packages/pulse/src/binding.ts b/packages/pulse/src/binding.ts deleted file mode 100644 index 123e409..0000000 --- a/packages/pulse/src/binding.ts +++ /dev/null @@ -1,88 +0,0 @@ -import type { Container } from './container.js'; -import type { PulseStore } from './store.js'; -import type { PersonaState } from './task-events.js'; - -export interface PersonaBinding { - personaId: string; - containerId: string; - boundAt: number; -} - -export interface PersonaBoundMeta { - personaId: string; - containerId: string; -} - -export interface PersonaUnboundMeta { - personaId: string; - containerId: string; - reason?: string; -} - -/** - * Build a Map of PersonaBinding from persona-bound and persona-unbound events. - * - * A Persona can only be bound to one Container at a time. - * persona-bound overwrites any previous binding; persona-unbound removes it. - */ -export function buildBindingsFromEvents( - store: PulseStore, -): Map { - const bound = store.queryByKind('persona-bound'); - const unbound = store.queryByKind('persona-unbound'); - - const allEvents = [...bound, ...unbound]; - allEvents.sort((a, b) => a.occurredAt - b.occurredAt); - - const bindings = new Map(); - - for (const ev of allEvents) { - if (!ev.meta) continue; - let meta: Record; - try { - meta = JSON.parse(ev.meta); - } catch { - continue; - } - - const personaId = meta.personaId as string; - if (!personaId) continue; - - if (ev.kind === 'persona-bound') { - const m = meta as unknown as PersonaBoundMeta; - bindings.set(personaId, { - personaId: m.personaId, - containerId: m.containerId, - boundAt: ev.occurredAt, - }); - } else if (ev.kind === 'persona-unbound') { - bindings.delete(personaId); - } - } - - return bindings; -} - -/** - * Resolve a Role = Persona × Container. - * - * Looks up the persona, finds its current binding, then resolves the container. - * Returns null if persona not found, not bound, or container doesn't exist. - */ -export function resolveRole( - personaId: string, - personas: Map, - bindings: Map, - containers: Map, -): { persona: PersonaState; container: Container } | null { - const persona = personas.get(personaId); - if (!persona) return null; - - const binding = bindings.get(personaId); - if (!binding) return null; - - const container = containers.get(binding.containerId); - if (!container) return null; - - return { persona, container }; -} diff --git a/packages/pulse/src/container.ts b/packages/pulse/src/container.ts deleted file mode 100644 index 5c2a271..0000000 --- a/packages/pulse/src/container.ts +++ /dev/null @@ -1,73 +0,0 @@ -import type { PulseStore } from './store.js'; -import type { ContainerStatus, ContainerType } from './task-events.js'; - -export interface Container { - containerId: string; - type: ContainerType; - host: string; - status: ContainerStatus; - tools: string[]; - registeredAt: number; - updatedAt: number; -} - -export interface ContainerRegisteredMeta { - containerId: string; - type: ContainerType; - host: string; - tools: string[]; -} - -export interface ContainerStatusChangedMeta { - containerId: string; - status: ContainerStatus; -} - -/** - * Build a Map of Container from container-registered and container-status-changed events. - */ -export function buildContainersFromEvents( - store: PulseStore, -): Map { - const registered = store.queryByKind('container-registered'); - const statusChanged = store.queryByKind('container-status-changed'); - - const allEvents = [...registered, ...statusChanged]; - allEvents.sort((a, b) => a.occurredAt - b.occurredAt); - - const containers = new Map(); - - for (const ev of allEvents) { - if (!ev.meta) continue; - let meta: Record; - try { - meta = JSON.parse(ev.meta); - } catch { - continue; - } - - const containerId = meta.containerId as string; - if (!containerId) continue; - - if (ev.kind === 'container-registered') { - const m = meta as unknown as ContainerRegisteredMeta; - containers.set(containerId, { - containerId: m.containerId, - type: m.type, - host: m.host, - status: 'online', - tools: m.tools, - registeredAt: ev.occurredAt, - updatedAt: ev.occurredAt, - }); - } else if (ev.kind === 'container-status-changed') { - const existing = containers.get(containerId); - if (!existing) continue; - const m = meta as unknown as ContainerStatusChangedMeta; - existing.status = m.status; - existing.updatedAt = ev.occurredAt; - } - } - - return containers; -} diff --git a/packages/pulse/src/council.ts b/packages/pulse/src/council.ts deleted file mode 100644 index 79dcf69..0000000 --- a/packages/pulse/src/council.ts +++ /dev/null @@ -1,92 +0,0 @@ -import type { CouncilMessage, ModeratorFn } from './moderator.js'; -import type { PersonaState } from './task-events.js'; - -export interface CouncilOptions { - moderator: ModeratorFn; - participants: PersonaState[]; - /** Pre-existing council history (e.g. from task-responded events) */ - history?: CouncilMessage[]; - /** Callback when a participant speaks — returns their speech content */ - onSpeak: (personaId: string) => Promise; - /** Callback when a new member is added */ - onAddMember?: (persona: PersonaState) => Promise; - /** Callback when a sub-topic is spawned — returns the sub-council result */ - onSpawn?: ( - topicId: string, - title: string, - participants: PersonaState[], - ) => Promise; - /** Maximum rounds to prevent infinite loops (default 10) */ - maxRounds?: number; -} - -export interface CouncilResult { - history: CouncilMessage[]; - summary?: string; - rounds: number; -} - -/** - * Run a Council deliberation loop. - * - * 1. Call moderator(participants, history) - * 2. Based on decision: - * - speak → onSpeak → append to history → back to 1 - * - add → onAddMember → append to participants → back to 1 - * - close → return final history + summary - * 3. Force close after maxRounds - */ -export async function runCouncil(opts: CouncilOptions): Promise { - const { moderator, onSpeak, onAddMember, onSpawn, maxRounds = 10 } = opts; - - const participants = [...opts.participants]; - const history: CouncilMessage[] = [...(opts.history ?? [])]; - let rounds = 0; - - while (rounds < maxRounds) { - rounds++; - - const decision = await moderator(participants, history); - - switch (decision.action) { - case 'speak': { - const content = await onSpeak(decision.personaId); - history.push({ - personaId: decision.personaId, - content, - timestamp: Date.now(), - }); - break; - } - case 'add': { - participants.push(decision.persona); - if (onAddMember) { - await onAddMember(decision.persona); - } - break; - } - case 'close': { - return { history, summary: decision.summary, rounds }; - } - case 'spawn': { - if (onSpawn) { - const subResult = await onSpawn( - decision.topicId, - decision.title, - decision.participants, - ); - history.push({ - personaId: `sub-council:${decision.topicId}`, - content: - subResult.summary ?? 'Sub-council completed without summary', - timestamp: Date.now(), - }); - } - break; - } - } - } - - // maxRounds exceeded — force close - return { history, summary: 'Max rounds exceeded', rounds }; -} diff --git a/packages/pulse/src/e2e/council-demo.ts b/packages/pulse/src/e2e/council-demo.ts index 63abf79..7d06bba 100644 --- a/packages/pulse/src/e2e/council-demo.ts +++ b/packages/pulse/src/e2e/council-demo.ts @@ -1,9 +1,9 @@ #!/usr/bin/env bun /** - * Pulse Council Demo — Full Pipeline + * Pulse Council Demo — v2 TopicType Pipeline * * Executable script (not a test) that simulates the complete - * Council dispatch chain driven by the Rules engine. + * Council v2 dispatch chain driven by TopicType rules. * * Run: cd ~/repos/pulse && bun packages/pulse/src/e2e/council-demo.ts * @@ -13,22 +13,11 @@ import { mkdtempSync, rmSync } from 'node:fs'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; -import { createBrokerExecutor } from '../executors/broker.js'; -import type { Sensed } from '../index.js'; -import { - buildBindingsFromEvents, - buildContainersFromEvents, - buildPersonasFromEvents, - buildTopicsFromEvents, - createRoundRobinModerator, - createStore, - runCouncil, -} from '../index.js'; -import type { LlmClient } from '../llm-client.js'; import { createOpenAiLlmClient } from '../llm-client.js'; -import { createTaskRule } from '../rules/task-rule.js'; -import type { PendingTasksData } from '../task-events.js'; -import { buildPendingTasksFromEvents } from '../watchers/pending-tasks-projection.js'; +import { createStore } from '../store.js'; +import { createCodingTaskType } from '../topics/coding-task.js'; +import { createArchitectRole } from '../topics/roles/architect-llm.js'; +import { createTopicRule } from '../topics/topic-rule-adapter.js'; // ── Helpers ──────────────────────────────────────────────────── @@ -42,17 +31,6 @@ function logItem(msg: string) { console.log(` ✅ ${msg}`); } -function logSpeech(persona: string, content: string) { - console.log(` ${persona}: "${content}"`); -} - -function assert(condition: boolean, msg: string) { - if (!condition) { - console.error(` ❌ ASSERTION FAILED: ${msg}`); - process.exit(1); - } -} - // ── CLI Args ─────────────────────────────────────────────────── const LIVE = process.argv.includes('--live'); @@ -62,437 +40,6 @@ const KEEP = process.argv.includes('--keep'); // ── Main ─────────────────────────────────────────────────────── async function main() { - const useCustomDb = !!DB_PATH; - const tmpDir = useCustomDb - ? undefined - : mkdtempSync(join(tmpdir(), 'pulse-council-demo-')); - const dbPath = DB_PATH ?? join(tmpDir!, 'events.db'); - const objDir = DB_PATH - ? join(require('node:path').dirname(DB_PATH), 'objects') - : join(tmpDir!, 'objects'); - - const store = createStore({ - eventsDbPath: dbPath, - objectsDir: objDir, - }); - - try { - console.log(); - console.log(`🍊 === Pulse Council Demo — Full Pipeline ===`); - console.log(SEP); - - // ── Step 1: Register Personas ────────────────────────────── - - console.log(); - log('📝', 'Step 1: Register Personas'); - - const now = Date.now(); - store.appendEvent({ - occurredAt: now, - kind: 'persona-registered', - key: 'xiaoju', - meta: JSON.stringify({ - personaId: 'xiaoju', - name: '小橘', - container: 'openclaw', - capabilities: ['coordination', 'review'], - }), - }); - logItem('xiaoju (coordination, review)'); - - store.appendEvent({ - occurredAt: now + 1, - kind: 'persona-registered', - key: 'cursor-agent', - meta: JSON.stringify({ - personaId: 'cursor-agent', - name: 'Cursor Agent', - container: 'cursor', - capabilities: ['coding', 'refactor'], - }), - }); - logItem('cursor-agent (coding, refactor)'); - - const personas = buildPersonasFromEvents(store); - assert(personas.size === 2, 'Expected 2 personas'); - - // ── Step 2: Register Containers ──────────────────────────── - - console.log(); - log('📦', 'Step 2: Register Containers'); - - store.appendEvent({ - occurredAt: now + 2, - kind: 'container-registered', - key: 'openclaw-neko', - meta: JSON.stringify({ - containerId: 'openclaw-neko', - type: 'openclaw', - host: 'neko-vm', - tools: ['a2a', 'feishu'], - }), - }); - logItem('openclaw-neko (openclaw @ neko-vm)'); - - store.appendEvent({ - occurredAt: now + 3, - kind: 'container-registered', - key: 'cursor-neko', - meta: JSON.stringify({ - containerId: 'cursor-neko', - type: 'cursor', - host: 'neko-vm', - tools: ['file-edit', 'terminal'], - }), - }); - logItem('cursor-neko (cursor @ neko-vm)'); - - const containers = buildContainersFromEvents(store); - assert(containers.size === 2, 'Expected 2 containers'); - - // ── Step 3: Bind Persona × Container ─────────────────────── - - console.log(); - log('🔗', 'Step 3: Bind Persona × Container'); - - store.appendEvent({ - occurredAt: now + 4, - kind: 'persona-bound', - key: 'xiaoju', - meta: JSON.stringify({ - personaId: 'xiaoju', - containerId: 'openclaw-neko', - }), - }); - logItem('xiaoju → openclaw-neko'); - - store.appendEvent({ - occurredAt: now + 5, - kind: 'persona-bound', - key: 'cursor-agent', - meta: JSON.stringify({ - personaId: 'cursor-agent', - containerId: 'cursor-neko', - }), - }); - logItem('cursor-agent → cursor-neko'); - - const bindings = buildBindingsFromEvents(store); - assert(bindings.size === 2, 'Expected 2 bindings'); - - // ── Step 4: Create Task ──────────────────────────────────── - - console.log(); - log('📋', 'Step 4: Create Task'); - - store.appendEvent({ - occurredAt: now + 10, - kind: 'task-created', - key: 'task-fix-login-bug', - meta: JSON.stringify({ - taskId: 'task-fix-login-bug', - projectId: 'proj-neko', - title: 'Fix login redirect bug', - description: 'Login page redirects to /undefined after auth', - type: 'bug', - priority: 5, - creatorId: 'xiaoju', - }), - }); - logItem('task-fix-login-bug: "Fix login redirect bug"'); - - // ── Step 5: Rules Engine — detect pending task ───────────── - - console.log(); - log('⚙️', 'Step 5: Rules Engine — detect pending task'); - - // Build pending tasks from events to construct snapshot - const pendingData = buildPendingTasksFromEvents(store); - assert(pendingData.pendingCount > 0, 'Expected pending tasks'); - - // Construct snapshot with pending-tasks sensed data - type Snapshot = Record & { timestamp: number }; - type Effect = Record; - - const currSnapshot: Snapshot = { - timestamp: Date.now(), - 'pending-tasks': { - data: pendingData, - refreshedAt: Date.now(), - } as Sensed, - }; - - const prevSnapshot: Snapshot = { timestamp: now - 1000 }; - - const taskRule = createTaskRule(); - const baseInner = async ( - _p: Snapshot, - _c: Snapshot, - ): Promise<[Effect[], number]> => [[], 15000]; - - const [effects] = await taskRule(prevSnapshot, currSnapshot, baseInner); - - const brokerEffect = effects.find((e) => e.kind === 'broker'); - assert(!!brokerEffect, 'Expected broker effect from task rule'); - logItem( - `Task Rule produced broker effect: taskIds=${JSON.stringify((brokerEffect as { taskIds: string[] }).taskIds)}`, - ); - - // ── Step 6: Broker Executor — assign task ────────────────── - - console.log(); - const llmMode = LIVE ? 'LIVE LLM' : 'mock LLM'; - log('🤖', `Step 6: Broker Executor — assign task (${llmMode})`); - - let llmClient: LlmClient; - - if (LIVE) { - const baseUrl = - process.env.PULSE_LLM_BASE_URL ?? process.env.OPENAI_BASE_URL; - const apiKey = - process.env.PULSE_LLM_API_KEY ?? process.env.OPENAI_API_KEY; - const model = process.env.PULSE_LLM_MODEL ?? 'gpt-4o-mini'; - - if (!baseUrl || !apiKey) { - console.error( - ' ❌ --live requires PULSE_LLM_BASE_URL + PULSE_LLM_API_KEY (or OPENAI_BASE_URL + OPENAI_API_KEY)', - ); - process.exit(1); - } - - logItem(`LLM: ${model} @ ${baseUrl}`); - llmClient = createOpenAiLlmClient({ baseUrl, apiKey, model }); - } else { - llmClient = { - chat: async (_req) => ({ - tool_calls: [ - { - id: 'call_1', - function: { - name: 'assign_task', - arguments: JSON.stringify({ - taskId: 'task-fix-login-bug', - assigneeId: 'cursor-agent', - }), - }, - }, - ], - }), - }; - } - - const executeBroker = createBrokerExecutor({ - llmClient, - routineStore: store, - getPersonas: () => buildPersonasFromEvents(store), - }); - - await executeBroker(brokerEffect as { kind: 'broker'; taskIds: string[] }); - - // Verify events written - const routingEvents = store.queryByKind('task-routing'); - assert(routingEvents.length > 0, 'Expected task-routing event'); - logItem('task-routing event written'); - - logItem('LLM decided: assign task-fix-login-bug → cursor-agent'); - - const assignedEvents = store.queryByKind('task-assigned'); - assert(assignedEvents.length > 0, 'Expected task-assigned event'); - const assignedMeta = JSON.parse(assignedEvents[0].meta!); - assert( - assignedMeta.assigneeId === 'cursor-agent', - 'Expected cursor-agent assignment', - ); - logItem('task-assigned event written'); - - // ── Step 7: Cursor completes coding ──────────────────────── - - console.log(); - log('💻', 'Step 7: Cursor completes coding'); - - store.appendEvent({ - occurredAt: Date.now(), - kind: 'task-responded', - key: 'task-fix-login-bug', - meta: JSON.stringify({ - taskId: 'task-fix-login-bug', - assigneeId: 'cursor-agent', - result: - 'Fixed auth.ts line 42, added 3 test cases. PR #123 ready for review.', - }), - }); - logItem('task-responded: "Fixed auth.ts line 42..."'); - - // ── Step 8: Council Deliberation ─────────────────────────── - - console.log(); - log('🗣️', 'Step 8: Council Deliberation'); - - // Create topic for the task - store.appendEvent({ - occurredAt: Date.now(), - kind: 'topic-created', - key: 'task-fix-login-bug', - meta: JSON.stringify({ - topicId: 'task-fix-login-bug', - title: 'Fix login redirect bug', - createdBy: 'xiaoju', - }), - }); - - const councilSpeeches: Record = { - xiaoju: ['Login bug fixed, please review PR #123'], - 'cursor-agent': ['All tests pass, ready for merge'], - }; - const councilIdx: Record = { xiaoju: 0, 'cursor-agent': 0 }; - - const councilResult = await runCouncil({ - moderator: createRoundRobinModerator(), - participants: [personas.get('xiaoju')!, personas.get('cursor-agent')!], - onSpeak: async (personaId: string) => { - const idx = councilIdx[personaId] ?? 0; - councilIdx[personaId] = idx + 1; - return councilSpeeches[personaId]?.[idx] ?? '...'; - }, - }); - - for (const msg of councilResult.history) { - logSpeech(msg.personaId, msg.content); - } - logItem(`Council closed: ${councilResult.rounds} rounds`); - - // ── Step 9: Review Sub-Council ───────────────────────────── - - console.log(); - log('🔍', 'Step 9: Review Sub-Council'); - - store.appendEvent({ - occurredAt: Date.now(), - kind: 'topic-created', - key: 'review-pr-123', - meta: JSON.stringify({ - topicId: 'review-pr-123', - parentTopicId: 'task-fix-login-bug', - title: 'Review PR #123', - createdBy: 'xiaoju', - }), - }); - logItem('Sub-topic: review-pr-123 (parent: task-fix-login-bug)'); - - const reviewSpeeches: Record = { - xiaoju: ['Edge case: expired tokens?'], - 'cursor-agent': ['Added token expiry check'], - }; - const reviewIdx: Record = { xiaoju: 0, 'cursor-agent': 0 }; - - const reviewResult = await runCouncil({ - moderator: createRoundRobinModerator(), - participants: [personas.get('xiaoju')!, personas.get('cursor-agent')!], - onSpeak: async (personaId: string) => { - const idx = reviewIdx[personaId] ?? 0; - reviewIdx[personaId] = idx + 1; - return reviewSpeeches[personaId]?.[idx] ?? '...'; - }, - }); - - for (const msg of reviewResult.history) { - logSpeech(msg.personaId, msg.content); - } - logItem(`Review closed: ${reviewResult.rounds} rounds`); - - // ── Step 10: Close Topics ────────────────────────────────── - - console.log(); - log('🏁', 'Step 10: Close Topics'); - - store.appendEvent({ - occurredAt: Date.now(), - kind: 'topic-closed', - key: 'review-pr-123', - meta: JSON.stringify({ - topicId: 'review-pr-123', - summary: 'PR approved with token expiry fix', - }), - }); - logItem('review-pr-123: closed (PR approved with token expiry fix)'); - - store.appendEvent({ - occurredAt: Date.now() + 1, - kind: 'topic-closed', - key: 'task-fix-login-bug', - meta: JSON.stringify({ - topicId: 'task-fix-login-bug', - summary: 'Login bug fixed, PR merged', - }), - }); - logItem('task-fix-login-bug: closed (Login bug fixed, PR merged)'); - - // ── Event Log ────────────────────────────────────────────── - - console.log(); - console.log(`📊 === Event Log (chronological) ===`); - - const allEvents = store.getAfter(0); - allEvents.sort((a, b) => a.occurredAt - b.occurredAt); - - for (const ev of allEvents) { - const key = - ev.key ?? - (ev.meta - ? (JSON.parse(ev.meta).taskId ?? - JSON.parse(ev.meta).personaId ?? - JSON.parse(ev.meta).containerId ?? - '') - : ''); - console.log( - ` #${String(ev.id).padStart(2)} ${ev.kind.padEnd(22)} ${key}`, - ); - } - - // ── Final State ──────────────────────────────────────────── - - console.log(); - console.log(`📊 === Final State ===`); - - const finalPersonas = buildPersonasFromEvents(store); - const finalContainers = buildContainersFromEvents(store); - const finalBindings = buildBindingsFromEvents(store); - const finalTopics = buildTopicsFromEvents(store); - - const personaNames = Array.from(finalPersonas.keys()).join(', '); - const containerNames = Array.from(finalContainers.keys()).join(', '); - const allClosed = Array.from(finalTopics.values()).every( - (t) => t.status === 'closed', - ); - - console.log(` Personas: ${finalPersonas.size} (${personaNames})`); - console.log(` Containers: ${finalContainers.size} (${containerNames})`); - console.log(` Bindings: ${finalBindings.size}`); - console.log( - ` Topics: ${finalTopics.size} (${allClosed ? 'all closed' : 'some open'})`, - ); - - console.log(); - console.log(SEP); - console.log(`✅ All steps completed successfully!`); - console.log(); - } finally { - store.close(); - if (KEEP || useCustomDb) { - console.log(`📁 Events DB preserved: ${dbPath}`); - } else if (tmpDir) { - rmSync(tmpDir, { recursive: true, force: true }); - } - } -} - -// ── V2 Demo ──────────────────────────────────────────────────── - -async function mainV2() { - const { createCodingTaskType } = await import('../topics/coding-task.js'); - const { createTopicRule } = await import('../topics/topic-rule-adapter.js'); - - const _V2 = true; const useCustomDb = !!DB_PATH; const tmpDir = useCustomDb ? null @@ -524,45 +71,13 @@ async function mainV2() { const model = process.env.PULSE_LLM_MODEL ?? 'qwen-plus'; if (!baseUrl || !apiKey) { console.error( - '❌ --live --v2 requires PULSE_LLM_BASE_URL + PULSE_LLM_API_KEY', + '❌ --live requires PULSE_LLM_BASE_URL + PULSE_LLM_API_KEY', ); process.exit(1); } const llm = createOpenAiLlmClient({ baseUrl, apiKey, model }); codingTask = createCodingTaskType({ - architectFn: async (ctx, s) => { - const resp = await llm.chat({ - messages: [ - { - role: 'system', - content: - 'You are a software architect. Analyze the task and suggest target files and approach. Reply in JSON: {"analysis": "...", "targetFiles": ["..."]}', - }, - { - role: 'user', - content: `Task: ${ctx.title}\nDescription: ${ctx.description}\nRepo: ${ctx.repoDir}`, - }, - ], - }); - let parsed: { analysis?: string; targetFiles?: string[] }; - try { - parsed = JSON.parse(resp.content ?? '{}'); - } catch { - parsed = { - analysis: resp.content ?? 'No analysis', - targetFiles: [], - }; - } - s.appendEvent({ - occurredAt: Date.now(), - kind: 'coding.analyzed', - meta: JSON.stringify({ - topicId: ctx.topicId, - analysis: parsed.analysis ?? 'No analysis', - targetFiles: parsed.targetFiles ?? [], - }), - }); - }, + architectFn: createArchitectRole(llm), }); } else { codingTask = createCodingTaskType(); @@ -650,8 +165,7 @@ async function mainV2() { } } -const isV2 = process.argv.includes('--v2'); -(isV2 ? mainV2 : main)().catch((err) => { +main().catch((err) => { console.error('❌ Demo failed:', err); process.exit(1); }); diff --git a/packages/pulse/src/e2e/t10-council-e2e.test.ts b/packages/pulse/src/e2e/t10-council-e2e.test.ts deleted file mode 100644 index 03c577b..0000000 --- a/packages/pulse/src/e2e/t10-council-e2e.test.ts +++ /dev/null @@ -1,366 +0,0 @@ -/** - * E2E T10 — Council full pipeline: task → code → review → close - * - * Simulates the real coding task lifecycle: - * Persona registration → Container registration → Binding → - * Topic creation → Council deliberation → Coding response → - * Review sub-topic → Close - * - * Uses real SQLite store (no mocks). - * - * Run: bun test packages/pulse/src/e2e/t10-council-e2e.test.ts - */ - -import { afterAll, beforeAll, describe, expect, it } from 'bun:test'; -import { mkdtempSync, rmSync } from 'node:fs'; -import { tmpdir } from 'node:os'; -import { join } from 'node:path'; -import type { CouncilResult } from '../council.js'; -import { - buildBindingsFromEvents, - buildContainersFromEvents, - buildPersonasFromEvents, - buildTopicsFromEvents, - createRoundRobinModerator, - createStore, - type PulseStore, - resolveRole, - runCouncil, -} from '../index.js'; - -// ── Shared Context ───────────────────────────────────────────── - -interface Ctx { - tmpDir: string; - store: PulseStore; - councilResult?: CouncilResult; - reviewResult?: CouncilResult; -} - -const ctx: Ctx = {} as Ctx; - -// ── Test Suite ───────────────────────────────────────────────── - -describe('E2E: Council full pipeline — task → code → review → close', () => { - beforeAll(() => { - ctx.tmpDir = mkdtempSync(join(tmpdir(), 'pulse-council-e2e-')); - ctx.store = createStore({ - eventsDbPath: join(ctx.tmpDir, 'events.db'), - objectsDir: join(ctx.tmpDir, 'objects'), - }); - }); - - afterAll(() => { - ctx.store.close(); - rmSync(ctx.tmpDir, { recursive: true, force: true }); - }); - - // ── Step 1: Register Personas ────────────────────────────────── - - it('Step 1: register personas — xiaoju + cursor-agent', () => { - const now = Date.now(); - - ctx.store.appendEvent({ - occurredAt: now, - kind: 'persona-registered', - key: 'xiaoju', - meta: JSON.stringify({ - personaId: 'xiaoju', - name: '小橘', - container: 'openclaw', - capabilities: ['coordination', 'review'], - }), - }); - - ctx.store.appendEvent({ - occurredAt: now + 1, - kind: 'persona-registered', - key: 'cursor-agent', - meta: JSON.stringify({ - personaId: 'cursor-agent', - name: 'Cursor Agent', - container: 'cursor', - capabilities: ['coding', 'refactor'], - }), - }); - - const personas = buildPersonasFromEvents(ctx.store); - expect(personas.size).toBe(2); - expect(personas.get('xiaoju')?.capabilities).toEqual([ - 'coordination', - 'review', - ]); - expect(personas.get('cursor-agent')?.capabilities).toEqual([ - 'coding', - 'refactor', - ]); - }); - - // ── Step 2: Register Containers ──────────────────────────────── - - it('Step 2: register containers — openclaw-neko + cursor-neko', () => { - const now = Date.now(); - - ctx.store.appendEvent({ - occurredAt: now, - kind: 'container-registered', - key: 'openclaw-neko', - meta: JSON.stringify({ - containerId: 'openclaw-neko', - type: 'openclaw', - host: 'neko-vm', - tools: ['a2a', 'feishu'], - }), - }); - - ctx.store.appendEvent({ - occurredAt: now + 1, - kind: 'container-registered', - key: 'cursor-neko', - meta: JSON.stringify({ - containerId: 'cursor-neko', - type: 'cursor', - host: 'neko-vm', - tools: ['file-edit', 'terminal'], - }), - }); - - const containers = buildContainersFromEvents(ctx.store); - expect(containers.size).toBe(2); - expect(containers.get('openclaw-neko')?.type).toBe('openclaw'); - expect(containers.get('cursor-neko')?.tools).toEqual([ - 'file-edit', - 'terminal', - ]); - }); - - // ── Step 3: Bind Persona × Container ─────────────────────────── - - it('Step 3: bind personas to containers + resolveRole', () => { - const now = Date.now(); - - ctx.store.appendEvent({ - occurredAt: now, - kind: 'persona-bound', - key: 'xiaoju', - meta: JSON.stringify({ - personaId: 'xiaoju', - containerId: 'openclaw-neko', - }), - }); - - ctx.store.appendEvent({ - occurredAt: now + 1, - kind: 'persona-bound', - key: 'cursor-agent', - meta: JSON.stringify({ - personaId: 'cursor-agent', - containerId: 'cursor-neko', - }), - }); - - const bindings = buildBindingsFromEvents(ctx.store); - expect(bindings.size).toBe(2); - expect(bindings.get('xiaoju')?.containerId).toBe('openclaw-neko'); - expect(bindings.get('cursor-agent')?.containerId).toBe('cursor-neko'); - - // Verify Role resolution - const personas = buildPersonasFromEvents(ctx.store); - const containers = buildContainersFromEvents(ctx.store); - - const xiaojuRole = resolveRole('xiaoju', personas, bindings, containers); - expect(xiaojuRole).not.toBeNull(); - expect(xiaojuRole!.persona.personaId).toBe('xiaoju'); - expect(xiaojuRole!.container.containerId).toBe('openclaw-neko'); - - const cursorRole = resolveRole( - 'cursor-agent', - personas, - bindings, - containers, - ); - expect(cursorRole).not.toBeNull(); - expect(cursorRole!.container.type).toBe('cursor'); - }); - - // ── Step 4: Create Topic (coding task) ───────────────────────── - - it('Step 4: create topic — task-fix-login-bug', () => { - ctx.store.appendEvent({ - occurredAt: Date.now(), - kind: 'topic-created', - key: 'task-fix-login-bug', - meta: JSON.stringify({ - topicId: 'task-fix-login-bug', - title: 'Fix login redirect bug', - createdBy: 'xiaoju', - }), - }); - - const topics = buildTopicsFromEvents(ctx.store); - expect(topics.size).toBe(1); - - const topic = topics.get('task-fix-login-bug'); - expect(topic).not.toBeNull(); - expect(topic!.title).toBe('Fix login redirect bug'); - expect(topic!.status).toBe('open'); - expect(topic!.createdBy).toBe('xiaoju'); - }); - - // ── Step 5: Council deliberation — assign task ───────────────── - - it('Step 5: council deliberation — round-robin assignment', async () => { - const personas = buildPersonasFromEvents(ctx.store); - const participants = [ - personas.get('xiaoju')!, - personas.get('cursor-agent')!, - ]; - - const speeches: Record = { - xiaoju: 'Login redirect bug in auth.ts line 42. cursor-agent please fix.', - 'cursor-agent': "Understood. I'll fix auth.ts and add tests.", - }; - - ctx.councilResult = await runCouncil({ - moderator: createRoundRobinModerator(), - participants, - onSpeak: async (personaId: string) => speeches[personaId]!, - }); - - expect(ctx.councilResult.history).toHaveLength(2); - expect(ctx.councilResult.history[0]!.personaId).toBe('xiaoju'); - expect(ctx.councilResult.history[1]!.personaId).toBe('cursor-agent'); - // 2 speak rounds + 1 close decision = 3 rounds - expect(ctx.councilResult.rounds).toBe(3); - expect(ctx.councilResult.summary).toBe('All participants have spoken'); - }); - - // ── Step 6: Simulate coding completion ───────────────────────── - - it('Step 6: task-responded — cursor-agent completes coding', () => { - const ev = ctx.store.appendEvent({ - occurredAt: Date.now(), - kind: 'task-responded', - key: 'task-fix-login-bug', - meta: JSON.stringify({ - taskId: 'task-fix-login-bug', - assigneeId: 'cursor-agent', - result: - 'Fixed auth.ts line 42, added 3 test cases. PR #123 ready for review.', - }), - }); - - expect(ev.id).toBeGreaterThan(0); - expect(ev.kind).toBe('task-responded'); - }); - - // ── Step 7: Review Council — sub-topic ───────────────────────── - - it('Step 7: review council — sub-topic review-pr-123', async () => { - // Create review sub-topic - ctx.store.appendEvent({ - occurredAt: Date.now(), - kind: 'topic-created', - key: 'review-pr-123', - meta: JSON.stringify({ - topicId: 'review-pr-123', - parentTopicId: 'task-fix-login-bug', - title: 'Review PR #123', - createdBy: 'xiaoju', - }), - }); - - const topics = buildTopicsFromEvents(ctx.store); - const reviewTopic = topics.get('review-pr-123'); - expect(reviewTopic).not.toBeNull(); - expect(reviewTopic!.parentTopicId).toBe('task-fix-login-bug'); - - // Run review council - const personas = buildPersonasFromEvents(ctx.store); - const participants = [ - personas.get('xiaoju')!, - personas.get('cursor-agent')!, - ]; - - const reviewSpeeches: Record = { - xiaoju: 'Edge case: what about expired tokens?', - 'cursor-agent': 'Good catch, added token expiry check and test.', - }; - - ctx.reviewResult = await runCouncil({ - moderator: createRoundRobinModerator(), - participants, - onSpeak: async (personaId: string) => reviewSpeeches[personaId]!, - }); - - expect(ctx.reviewResult.history).toHaveLength(2); - expect(ctx.reviewResult.rounds).toBe(3); - expect(ctx.reviewResult.summary).toBe('All participants have spoken'); - }); - - // ── Step 8: Close topics ─────────────────────────────────────── - - it('Step 8: close both topics with summaries', () => { - const now = Date.now(); - - ctx.store.appendEvent({ - occurredAt: now, - kind: 'topic-closed', - key: 'review-pr-123', - meta: JSON.stringify({ - topicId: 'review-pr-123', - summary: 'PR approved with token expiry fix', - }), - }); - - ctx.store.appendEvent({ - occurredAt: now + 1, - kind: 'topic-closed', - key: 'task-fix-login-bug', - meta: JSON.stringify({ - topicId: 'task-fix-login-bug', - summary: 'Login bug fixed, PR merged', - }), - }); - - const topics = buildTopicsFromEvents(ctx.store); - - const mainTopic = topics.get('task-fix-login-bug'); - expect(mainTopic!.status).toBe('closed'); - expect(mainTopic!.summary).toBe('Login bug fixed, PR merged'); - - const reviewTopic = topics.get('review-pr-123'); - expect(reviewTopic!.status).toBe('closed'); - expect(reviewTopic!.summary).toBe('PR approved with token expiry fix'); - }); - - // ── Step 9: Final consistency check ──────────────────────────── - - it('Step 9: full state consistency — all entities intact', () => { - const personas = buildPersonasFromEvents(ctx.store); - expect(personas.size).toBe(2); - expect(personas.has('xiaoju')).toBe(true); - expect(personas.has('cursor-agent')).toBe(true); - - const containers = buildContainersFromEvents(ctx.store); - expect(containers.size).toBe(2); - expect(containers.has('openclaw-neko')).toBe(true); - expect(containers.has('cursor-neko')).toBe(true); - - const bindings = buildBindingsFromEvents(ctx.store); - expect(bindings.size).toBe(2); - expect(bindings.get('xiaoju')?.containerId).toBe('openclaw-neko'); - expect(bindings.get('cursor-agent')?.containerId).toBe('cursor-neko'); - - const topics = buildTopicsFromEvents(ctx.store); - expect(topics.size).toBe(2); - for (const [, topic] of topics) { - expect(topic.status).toBe('closed'); - expect(topic.summary).toBeTruthy(); - } - - // Council histories are complete - expect(ctx.councilResult!.history).toHaveLength(2); - expect(ctx.reviewResult!.history).toHaveLength(2); - }); -}); diff --git a/packages/pulse/src/executors/broker.ts b/packages/pulse/src/executors/broker.ts deleted file mode 100644 index 0d90404..0000000 --- a/packages/pulse/src/executors/broker.ts +++ /dev/null @@ -1,181 +0,0 @@ -import type { LlmClient, LlmResponse } from '../llm-client.js'; -import type { BrokerEffect } from '../rules/task-rule.js'; -import type { PulseStore } from '../store.js'; -import type { PersonaState, TaskState } from '../task-events.js'; - -export interface BrokerExecutorOptions { - llmClient: LlmClient; - routineStore: PulseStore; - getPersonas?: () => Map; -} - -const DEFAULT_AGENT_IDS = ['cursor']; - -function buildBrokerTools(agentIds: string[]) { - return [ - { - type: 'function' as const, - function: { - name: 'assign_task', - description: 'Assign a task to an agent', - parameters: { - type: 'object', - properties: { - taskId: { type: 'string' }, - assigneeId: { - type: 'string', - enum: agentIds, - }, - }, - required: ['taskId', 'assigneeId'], - }, - }, - }, - ]; -} - -function buildBrokerSystemPrompt( - personas: Map | null, -): string { - if (personas && personas.size > 0) { - const agentLines = Array.from(personas.values()) - .map( - (p) => - `- ${p.personaId} (${p.name}): container=${p.container}, capabilities=[${p.capabilities.join(', ')}]`, - ) - .join('\n'); - return `You are a task broker. Given a list of pending tasks, decide which agent to assign each task to. - -Available agents: -${agentLines} - -For each task, call assign_task with the taskId and assigneeId. -Assign all tasks — do not skip any. Match tasks to agents based on their capabilities.`; - } - return `You are a task broker. Given a list of pending tasks, decide which agent to assign each task to. - -Currently available agents: ["cursor"] - -For each task, call assign_task with the taskId and assigneeId. -Assign all tasks — do not skip any.`; -} - -/** - * Create a broker executor that uses an LLM to assign pending tasks. - * - * Flow: - * 1. Read pending task details from routineStore - * 2. Read project details from routineStore - * 3. Call LLM to decide assigneeId for each task - * 4. Write task-assigned events to routineStore - */ -export function createBrokerExecutor(opts: BrokerExecutorOptions) { - const { llmClient, routineStore, getPersonas } = opts; - - return async function executeBroker(effect: BrokerEffect): Promise { - if (effect.kind !== 'broker') return; - - const brokerSessionId = `broker-${Date.now()}`; - - const taskEvents = routineStore.queryByKind('task-created'); - taskEvents.sort((a, b) => a.occurredAt - b.occurredAt); - - const taskMap = new Map(); - for (const ev of taskEvents) { - if (!ev.meta) continue; - const meta = JSON.parse(ev.meta); - taskMap.set(meta.taskId, { - taskId: meta.taskId, - projectId: meta.projectId, - title: meta.title, - description: meta.description ?? '', - type: meta.type ?? 'action', - priority: meta.priority ?? 0, - creatorId: meta.creatorId ?? '', - status: 'pending', - createdAt: ev.occurredAt, - updatedAt: ev.occurredAt, - }); - } - - const tasks = effect.taskIds - .map((id) => taskMap.get(id)) - .filter((t): t is TaskState => t !== undefined); - - if (tasks.length === 0) return; - - for (const task of tasks) { - routineStore.appendEvent({ - occurredAt: Date.now(), - kind: 'task-routing', - meta: JSON.stringify({ - taskId: task.taskId, - brokerSessionId, - }), - }); - } - - const taskSummary = tasks - .map( - (t) => - `- taskId=${t.taskId} type=${t.type} priority=${t.priority} title="${t.title}"`, - ) - .join('\n'); - - // Build dynamic tools and prompt from personas (or fallback to defaults) - const personas = getPersonas ? getPersonas() : null; - const agentIds = - personas && personas.size > 0 - ? Array.from(personas.keys()) - : DEFAULT_AGENT_IDS; - const brokerTools = buildBrokerTools(agentIds); - const brokerSystemPrompt = buildBrokerSystemPrompt(personas); - - let response: LlmResponse; - try { - response = await llmClient.chat({ - messages: [ - { role: 'system', content: brokerSystemPrompt }, - { role: 'user', content: `Pending tasks:\n${taskSummary}` }, - ], - tools: brokerTools, - }); - } catch (err) { - for (const task of tasks) { - routineStore.appendEvent({ - occurredAt: Date.now(), - kind: 'task-responded', - meta: JSON.stringify({ - taskId: task.taskId, - assigneeId: 'broker', - result: `Routing failed: ${err instanceof Error ? err.message : String(err)}`, - }), - }); - } - return; - } - - if (!response.tool_calls) return; - - for (const tc of response.tool_calls) { - if (tc.function.name !== 'assign_task') continue; - let args: { taskId?: string; assigneeId?: string }; - try { - args = JSON.parse(tc.function.arguments); - } catch { - continue; - } - if (!args.taskId || !args.assigneeId) continue; - - routineStore.appendEvent({ - occurredAt: Date.now(), - kind: 'task-assigned', - meta: JSON.stringify({ - taskId: args.taskId, - assigneeId: args.assigneeId, - assignedBy: 'broker', - }), - }); - } - }; -} diff --git a/packages/pulse/src/executors/index.ts b/packages/pulse/src/executors/index.ts index 2e4bded..2df0816 100644 --- a/packages/pulse/src/executors/index.ts +++ b/packages/pulse/src/executors/index.ts @@ -1,7 +1,3 @@ -export { - type BrokerExecutorOptions, - createBrokerExecutor, -} from './broker.js'; export { executeSurvivalEffect, type SurvivalEffect, diff --git a/packages/pulse/src/index.ts b/packages/pulse/src/index.ts index 0e24358..a25ed34 100644 --- a/packages/pulse/src/index.ts +++ b/packages/pulse/src/index.ts @@ -932,66 +932,10 @@ export { createAgentLoopRule, } from './rules/agent-loop.js'; -// ── Task Rule ─────────────────────────────────────────────────── - -export { - type BrokerEffect, - type CursorTaskEffect, - createTaskRule, -} from './rules/task-rule.js'; - -// ── Broker Executor ───────────────────────────────────────────── - -export { - type BrokerExecutorOptions, - createBrokerExecutor, -} from './executors/broker.js'; - // ── Persona Registry ──────────────────────────────────────────── export { buildPersonasFromEvents } from './persona.js'; -// ── Container Registry ────────────────────────────────────────── - -export type { - Container, - ContainerRegisteredMeta, - ContainerStatusChangedMeta, -} from './container.js'; -export { buildContainersFromEvents } from './container.js'; - -// ── Persona × Container Binding ───────────────────────────────── - -export type { - PersonaBinding, - PersonaBoundMeta, - PersonaUnboundMeta, -} from './binding.js'; -export { buildBindingsFromEvents, resolveRole } from './binding.js'; - -// ── Council / Moderator ───────────────────────────────────────── - -export type { CouncilOptions, CouncilResult } from './council.js'; -export { runCouncil } from './council.js'; -export type { - CouncilMessage, - ModeratorDecision, - ModeratorFn, -} from './moderator.js'; -export type { LlmModeratorOptions } from './moderators/llm-moderator.js'; -export { createLlmModerator } from './moderators/llm-moderator.js'; -export { createRoundRobinModerator } from './moderators/round-robin.js'; - -// ── Topic ─────────────────────────────────────────────────────── - -export type { - Topic, - TopicClosedMeta, - TopicCreatedMeta, - TopicStatus, -} from './topic.js'; -export { buildTopicsFromEvents } from './topic.js'; - // ── Council v2: TopicType ──────────────────────────────────────── export type { CodingTaskContext } from './topics/coding-task.js'; diff --git a/packages/pulse/src/moderator.ts b/packages/pulse/src/moderator.ts deleted file mode 100644 index 0610fcf..0000000 --- a/packages/pulse/src/moderator.ts +++ /dev/null @@ -1,26 +0,0 @@ -import type { PersonaState } from './task-events.js'; - -/** A message in the Council context — only Role speech, no Moderator actions */ -export interface CouncilMessage { - personaId: string; - content: string; - timestamp: number; -} - -/** Moderator 决策结果 */ -export type ModeratorDecision = - | { action: 'speak'; personaId: string } - | { action: 'add'; persona: PersonaState } - | { action: 'close'; summary?: string } - | { - action: 'spawn'; - topicId: string; - title: string; - participants: PersonaState[]; - }; - -/** Moderator 纯函数签名 */ -export type ModeratorFn = ( - participants: PersonaState[], - history: CouncilMessage[], -) => Promise; diff --git a/packages/pulse/src/moderators/llm-moderator.ts b/packages/pulse/src/moderators/llm-moderator.ts deleted file mode 100644 index 74d3622..0000000 --- a/packages/pulse/src/moderators/llm-moderator.ts +++ /dev/null @@ -1,193 +0,0 @@ -import type { LlmClient } from '../llm-client.js'; -import type { - CouncilMessage, - ModeratorDecision, - ModeratorFn, -} from '../moderator.js'; -import type { ContainerType, PersonaState } from '../task-events.js'; - -function buildModeratorTools(participantIds: string[]) { - return [ - { - type: 'function' as const, - function: { - name: 'next_speaker', - description: 'Select the next participant to speak', - parameters: { - type: 'object', - properties: { - personaId: { - type: 'string', - enum: participantIds, - description: 'The persona ID of the next speaker', - }, - }, - required: ['personaId'], - }, - }, - }, - { - type: 'function' as const, - function: { - name: 'add_member', - description: 'Add a new member to the council', - parameters: { - type: 'object', - properties: { - personaId: { type: 'string', description: 'Unique persona ID' }, - name: { type: 'string', description: 'Display name' }, - container: { - type: 'string', - enum: ['openclaw', 'cursor', 'claude-code', 'hermes'], - description: 'Container type', - }, - capabilities: { - type: 'array', - items: { type: 'string' }, - description: 'List of capabilities', - }, - }, - required: ['personaId', 'name', 'container', 'capabilities'], - }, - }, - }, - { - type: 'function' as const, - function: { - name: 'close_council', - description: 'Close the council session', - parameters: { - type: 'object', - properties: { - summary: { type: 'string', description: 'Optional summary' }, - }, - }, - }, - }, - { - type: 'function' as const, - function: { - name: 'spawn_sub_topic', - description: - 'Create a sub-topic and spawn a nested council to discuss it', - parameters: { - type: 'object', - properties: { - topicId: { - type: 'string', - description: 'Unique ID for the sub-topic', - }, - title: { type: 'string', description: 'Title of the sub-topic' }, - participantIds: { - type: 'array', - items: { type: 'string', enum: participantIds }, - description: 'IDs of participants to include in the sub-council', - }, - }, - required: ['topicId', 'title', 'participantIds'], - }, - }, - }, - ]; -} - -function buildSystemPrompt( - participants: PersonaState[], - history: CouncilMessage[], -): string { - const participantLines = participants - .map( - (p) => - `- ${p.personaId} (${p.name}): container=${p.container}, capabilities=[${p.capabilities.join(', ')}]`, - ) - .join('\n'); - - const historyLines = - history.length > 0 - ? history.map((m) => `[${m.personaId}]: ${m.content}`).join('\n') - : '(no messages yet)'; - - return `You are a council moderator. Your job is to decide who speaks next, whether to add new members, or whether to close the council. - -Participants: -${participantLines} - -Conversation so far: -${historyLines} - -Decide the next action by calling one of the available tools.`; -} - -export interface LlmModeratorOptions { - llmClient: LlmClient; -} - -/** - * Create an LLM-driven moderator. - * - * Uses tool calls to decide: next_speaker, add_member, or close_council. - */ -export function createLlmModerator(opts: LlmModeratorOptions): ModeratorFn { - const { llmClient } = opts; - - return async ( - participants: PersonaState[], - history: CouncilMessage[], - ): Promise => { - const participantIds = participants.map((p) => p.personaId); - const tools = buildModeratorTools(participantIds); - const systemPrompt = buildSystemPrompt(participants, history); - - const response = await llmClient.chat({ - messages: [ - { role: 'system', content: systemPrompt }, - { role: 'user', content: 'Decide the next action for this council.' }, - ], - tools, - }); - - // Parse tool call - if (response.tool_calls && response.tool_calls.length > 0) { - const call = response.tool_calls[0]!; - const args = JSON.parse(call.function.arguments); - - switch (call.function.name) { - case 'next_speaker': - return { action: 'speak', personaId: args.personaId }; - case 'add_member': - return { - action: 'add', - persona: { - personaId: args.personaId, - name: args.name, - container: args.container as ContainerType, - capabilities: args.capabilities, - registeredAt: Date.now(), - updatedAt: Date.now(), - }, - }; - case 'close_council': - return { action: 'close', summary: args.summary }; - case 'spawn_sub_topic': { - const subParticipants = participants.filter((p) => - (args.participantIds as string[]).includes(p.personaId), - ); - return { - action: 'spawn', - topicId: args.topicId, - title: args.title, - participants: subParticipants, - }; - } - default: - return { action: 'close', summary: 'Unknown tool call — closing' }; - } - } - - // No tool call — default to close - return { - action: 'close', - summary: response.content || 'LLM did not call a tool', - }; - }; -} diff --git a/packages/pulse/src/moderators/round-robin.test.ts b/packages/pulse/src/moderators/round-robin.test.ts deleted file mode 100644 index cc726f9..0000000 --- a/packages/pulse/src/moderators/round-robin.test.ts +++ /dev/null @@ -1,52 +0,0 @@ -import { describe, expect, test } from 'bun:test'; -import type { PersonaState } from '../task-events.js'; -import { createRoundRobinModerator } from './round-robin.js'; - -function makePersona(id: string): PersonaState { - return { - personaId: id, - name: id, - container: 'openclaw', - capabilities: [], - registeredAt: Date.now(), - updatedAt: Date.now(), - }; -} - -describe('round-robin moderator', () => { - test('cycles through participants in order', async () => { - const mod = createRoundRobinModerator(); - const participants = [makePersona('a'), makePersona('b'), makePersona('c')]; - - const d1 = await mod(participants, []); - expect(d1).toEqual({ action: 'speak', personaId: 'a' }); - - const d2 = await mod(participants, [ - { personaId: 'a', content: 'hi', timestamp: 1 }, - ]); - expect(d2).toEqual({ action: 'speak', personaId: 'b' }); - - const d3 = await mod(participants, [ - { personaId: 'a', content: 'hi', timestamp: 1 }, - { personaId: 'b', content: 'hey', timestamp: 2 }, - ]); - expect(d3).toEqual({ action: 'speak', personaId: 'c' }); - }); - - test('closes after all have spoken', async () => { - const mod = createRoundRobinModerator(); - const participants = [makePersona('a'), makePersona('b')]; - const history = [ - { personaId: 'a', content: 'hi', timestamp: 1 }, - { personaId: 'b', content: 'hey', timestamp: 2 }, - ]; - const decision = await mod(participants, history); - expect(decision.action).toBe('close'); - }); - - test('closes immediately with no participants', async () => { - const mod = createRoundRobinModerator(); - const decision = await mod([], []); - expect(decision.action).toBe('close'); - }); -}); diff --git a/packages/pulse/src/moderators/round-robin.ts b/packages/pulse/src/moderators/round-robin.ts deleted file mode 100644 index 63e1551..0000000 --- a/packages/pulse/src/moderators/round-robin.ts +++ /dev/null @@ -1,36 +0,0 @@ -import type { - CouncilMessage, - ModeratorDecision, - ModeratorFn, -} from '../moderator.js'; -import type { PersonaState } from '../task-events.js'; - -/** - * Create a round-robin moderator. - * - * Each participant speaks once in order, then the council closes. - * Never adds new members. - */ -export function createRoundRobinModerator(): ModeratorFn { - return async ( - participants: PersonaState[], - history: CouncilMessage[], - ): Promise => { - if (participants.length === 0) { - return { action: 'close', summary: 'No participants' }; - } - - // Find who has already spoken - const spoken = new Set(history.map((m) => m.personaId)); - - // Find the first participant who hasn't spoken yet - for (const p of participants) { - if (!spoken.has(p.personaId)) { - return { action: 'speak', personaId: p.personaId }; - } - } - - // Everyone has spoken — close - return { action: 'close', summary: 'All participants have spoken' }; - }; -} diff --git a/packages/pulse/src/persona.ts b/packages/pulse/src/persona.ts index 23c90f0..38f4178 100644 --- a/packages/pulse/src/persona.ts +++ b/packages/pulse/src/persona.ts @@ -47,7 +47,7 @@ export function buildPersonasFromEvents( }); } else if (ev.kind === 'persona-updated') { const existing = personas.get(personaId); - if (!existing) continue; // can't update what doesn't exist + if (!existing) continue; const m = meta as unknown as PersonaUpdatedMeta; if (m.container !== undefined) existing.container = m.container; if (m.capabilities !== undefined) existing.capabilities = m.capabilities; diff --git a/packages/pulse/src/rules/task-rule.test.ts b/packages/pulse/src/rules/task-rule.test.ts deleted file mode 100644 index 47d3efe..0000000 --- a/packages/pulse/src/rules/task-rule.test.ts +++ /dev/null @@ -1,251 +0,0 @@ -import { beforeEach, describe, expect, jest, test } from 'bun:test'; -import type { Sensed } from '../index.js'; -import type { PendingTasksData, TaskState } from '../task-events.js'; -import { createTaskRule } from './task-rule.js'; - -type Snapshot = Record & { timestamp: number }; -type Effect = Record; - -function makeTask(overrides: Partial = {}): TaskState { - return { - taskId: 'task-1', - projectId: 'proj-1', - title: 'Fix bug', - description: 'Fix the login bug', - type: 'bug', - priority: 0, - creatorId: 'user-1', - status: 'pending', - createdAt: Date.now(), - updatedAt: Date.now(), - ...overrides, - }; -} - -function makeSnapshot(overrides: Record = {}): Snapshot { - return { timestamp: Date.now(), ...overrides }; -} - -function makePendingTasksSlice(tasks: TaskState[]): Record { - return { - 'pending-tasks': { - data: { - pendingCount: tasks.length, - tasks, - byProject: {}, - checkedAt: Date.now(), - }, - } as Sensed, - }; -} - -const mockInner = jest.fn(async () => [[], 15000] as [Effect[], number]); - -describe('createTaskRule', () => { - beforeEach(() => { - mockInner.mockClear(); - mockInner.mockResolvedValue([[], 15000]); - }); - - test('no pending-tasks → pass through, no new effects', async () => { - const rule = createTaskRule(); - const prev = makeSnapshot(); - const curr = makeSnapshot(); - - const [effects, tickMs] = await rule(prev, curr, mockInner); - - expect(effects).toEqual([]); - expect(tickMs).toBe(15000); - }); - - test('first tick: pending tasks → emit broker effect', async () => { - const rule = createTaskRule(); - const tasks = [makeTask({ taskId: 't1' }), makeTask({ taskId: 't2' })]; - const curr = makeSnapshot(makePendingTasksSlice(tasks)); - - const [effects] = await rule(makeSnapshot(), curr, mockInner); - - expect(effects).toContainEqual( - expect.objectContaining({ - kind: 'broker', - taskIds: ['t1', 't2'], - }), - ); - }); - - test('first tick: assigned tasks → emit cursor effect per task', async () => { - const rule = createTaskRule(); - const tasks = [ - makeTask({ - taskId: 't1', - projectId: 'proj-a', - status: 'assigned', - assigneeId: 'cursor', - }), - makeTask({ - taskId: 't2', - projectId: 'proj-b', - status: 'assigned', - assigneeId: 'cursor', - }), - ]; - const curr = makeSnapshot(makePendingTasksSlice(tasks)); - - const [effects] = await rule(makeSnapshot(), curr, mockInner); - - expect(effects).toContainEqual( - expect.objectContaining({ - kind: 'cursor', - taskId: 't1', - projectId: 'proj-a', - }), - ); - expect(effects).toContainEqual( - expect.objectContaining({ - kind: 'cursor', - taskId: 't2', - projectId: 'proj-b', - }), - ); - }); - - test('first tick: mix of pending and assigned → both broker and cursor effects', async () => { - const rule = createTaskRule(); - const tasks = [ - makeTask({ taskId: 't1', status: 'pending' }), - makeTask({ - taskId: 't2', - status: 'assigned', - assigneeId: 'cursor', - projectId: 'proj-2', - }), - ]; - const curr = makeSnapshot(makePendingTasksSlice(tasks)); - - const [effects] = await rule(makeSnapshot(), curr, mockInner); - - expect(effects).toContainEqual( - expect.objectContaining({ kind: 'broker', taskIds: ['t1'] }), - ); - expect(effects).toContainEqual( - expect.objectContaining({ kind: 'cursor', taskId: 't2' }), - ); - }); - - test('already pending task → no broker effect (diff-driven)', async () => { - const rule = createTaskRule(); - const tasks = [makeTask({ taskId: 't1', status: 'pending' })]; - const prev = makeSnapshot(makePendingTasksSlice(tasks)); - const curr = makeSnapshot(makePendingTasksSlice(tasks)); - - const [effects] = await rule(prev, curr, mockInner); - - const brokerEffects = effects.filter((e) => e.kind === 'broker'); - expect(brokerEffects).toHaveLength(0); - }); - - test('already assigned task → no cursor effect (diff-driven)', async () => { - const rule = createTaskRule(); - const tasks = [ - makeTask({ taskId: 't1', status: 'assigned', assigneeId: 'cursor' }), - ]; - const prev = makeSnapshot(makePendingTasksSlice(tasks)); - const curr = makeSnapshot(makePendingTasksSlice(tasks)); - - const [effects] = await rule(prev, curr, mockInner); - - const cursorEffects = effects.filter((e) => e.kind === 'cursor'); - expect(cursorEffects).toHaveLength(0); - }); - - test('new pending task appears → emit broker', async () => { - const rule = createTaskRule(); - const prevTasks = [makeTask({ taskId: 't1', status: 'pending' })]; - const currTasks = [ - makeTask({ taskId: 't1', status: 'pending' }), - makeTask({ taskId: 't2', status: 'pending' }), - ]; - const prev = makeSnapshot(makePendingTasksSlice(prevTasks)); - const curr = makeSnapshot(makePendingTasksSlice(currTasks)); - - const [effects] = await rule(prev, curr, mockInner); - - expect(effects).toContainEqual( - expect.objectContaining({ kind: 'broker', taskIds: ['t2'] }), - ); - }); - - test('task transitions from pending to assigned → emit cursor', async () => { - const rule = createTaskRule(); - const prevTasks = [ - makeTask({ taskId: 't1', status: 'pending', projectId: 'proj-a' }), - ]; - const currTasks = [ - makeTask({ - taskId: 't1', - status: 'assigned', - assigneeId: 'cursor', - projectId: 'proj-a', - }), - ]; - const prev = makeSnapshot(makePendingTasksSlice(prevTasks)); - const curr = makeSnapshot(makePendingTasksSlice(currTasks)); - - const [effects] = await rule(prev, curr, mockInner); - - expect(effects).toContainEqual( - expect.objectContaining({ - kind: 'cursor', - taskId: 't1', - projectId: 'proj-a', - }), - ); - // No broker since t1 is no longer pending in curr - const brokerEffects = effects.filter((e) => e.kind === 'broker'); - expect(brokerEffects).toHaveLength(0); - }); - - test('task transitions from routing to assigned → emit cursor', async () => { - const rule = createTaskRule(); - const prevTasks = [ - makeTask({ taskId: 't1', status: 'routing', projectId: 'proj-a' }), - ]; - const currTasks = [ - makeTask({ - taskId: 't1', - status: 'assigned', - assigneeId: 'cursor', - projectId: 'proj-a', - }), - ]; - const prev = makeSnapshot(makePendingTasksSlice(prevTasks)); - const curr = makeSnapshot(makePendingTasksSlice(currTasks)); - - const [effects] = await rule(prev, curr, mockInner); - - expect(effects).toContainEqual( - expect.objectContaining({ - kind: 'cursor', - taskId: 't1', - projectId: 'proj-a', - }), - ); - }); - - test('inner rule effects are preserved', async () => { - const innerEffect = { kind: 'collect', key: 'system' }; - mockInner.mockResolvedValueOnce([[innerEffect], 10000]); - - const rule = createTaskRule(); - const tasks = [ - makeTask({ taskId: 't1', status: 'assigned', assigneeId: 'cursor' }), - ]; - const curr = makeSnapshot(makePendingTasksSlice(tasks)); - - const [effects, tickMs] = await rule(makeSnapshot(), curr, mockInner); - - expect(effects[0]).toEqual(innerEffect); - expect(effects.length).toBe(2); - expect(tickMs).toBe(10000); - }); -}); diff --git a/packages/pulse/src/rules/task-rule.ts b/packages/pulse/src/rules/task-rule.ts deleted file mode 100644 index 3bdb581..0000000 --- a/packages/pulse/src/rules/task-rule.ts +++ /dev/null @@ -1,75 +0,0 @@ -import type { Rule, Sensed } from '../index.js'; -import type { PendingTasksData, TaskState } from '../task-events.js'; - -export interface BrokerEffect { - kind: 'broker'; - taskIds: string[]; -} - -export interface CursorTaskEffect { - kind: 'cursor'; - taskId: string; - projectId: string; -} - -type Snapshot = Record & { timestamp: number }; -type Effect = Record; - -/** - * Lightweight task rule — no LLM call. - * - * - pending tasks → emit broker effect (idempotent: skip if any task is routing) - * - assigned tasks → emit cursor effect per task - */ -export function createTaskRule< - S extends Snapshot = Snapshot, - E extends Effect = Effect, ->(): Rule { - return async (prev, curr, inner) => { - const [effects, tickMs] = await inner(prev, curr); - const newEffects: E[] = []; - - const currTasks = - (curr['pending-tasks'] as Sensed | undefined)?.data - ?.tasks ?? []; - const prevTasks = - (prev['pending-tasks'] as Sensed | undefined)?.data - ?.tasks ?? []; - - // Broker: only emit for tasks that newly entered pending - const prevPendingIds = new Set( - prevTasks - .filter((t: TaskState) => t.status === 'pending') - .map((t: TaskState) => t.taskId), - ); - const newPending = currTasks.filter( - (t: TaskState) => t.status === 'pending' && !prevPendingIds.has(t.taskId), - ); - if (newPending.length > 0) { - newEffects.push({ - kind: 'broker', - taskIds: newPending.map((t: TaskState) => t.taskId), - } as unknown as E); - } - - // Cursor: only emit for tasks that newly became assigned - const prevAssignedIds = new Set( - prevTasks - .filter((t: TaskState) => t.status === 'assigned') - .map((t: TaskState) => t.taskId), - ); - const newAssigned = currTasks.filter( - (t: TaskState) => - t.status === 'assigned' && !prevAssignedIds.has(t.taskId), - ); - for (const task of newAssigned) { - newEffects.push({ - kind: 'cursor', - taskId: task.taskId, - projectId: task.projectId, - } as unknown as E); - } - - return [[...effects, ...newEffects], tickMs]; - }; -} diff --git a/packages/pulse/src/topic.test.ts b/packages/pulse/src/topic.test.ts deleted file mode 100644 index da0291b..0000000 --- a/packages/pulse/src/topic.test.ts +++ /dev/null @@ -1,113 +0,0 @@ -import { describe, expect, test } from 'bun:test'; -import { mkdtempSync } from 'node:fs'; -import { tmpdir } from 'node:os'; -import { join } from 'node:path'; -import { createStore } from './store.js'; -import type { TopicClosedMeta, TopicCreatedMeta } from './topic.js'; -import { buildTopicsFromEvents } from './topic.js'; - -function createTestStore() { - const tmp = mkdtempSync(join(tmpdir(), 'topic-test-')); - return createStore({ - eventsDbPath: join(tmp, 'events.db'), - objectsDir: join(tmp, 'objects'), - }); -} - -describe('buildTopicsFromEvents', () => { - test('create topic', () => { - const store = createTestStore(); - const meta: TopicCreatedMeta = { - topicId: 't1', - title: 'Design review', - createdBy: 'alice', - }; - store.appendEvent({ - occurredAt: 1000, - kind: 'topic-created', - key: 't1', - meta: JSON.stringify(meta), - }); - - const topics = buildTopicsFromEvents(store); - expect(topics.size).toBe(1); - const t = topics.get('t1')!; - expect(t.topicId).toBe('t1'); - expect(t.title).toBe('Design review'); - expect(t.createdBy).toBe('alice'); - expect(t.status).toBe('open'); - expect(t.createdAt).toBe(1000); - expect(t.parentTopicId).toBeUndefined(); - store.close(); - }); - - test('create child topic with parentTopicId', () => { - const store = createTestStore(); - store.appendEvent({ - occurredAt: 1000, - kind: 'topic-created', - key: 't1', - meta: JSON.stringify({ - topicId: 't1', - title: 'Parent', - createdBy: 'alice', - } satisfies TopicCreatedMeta), - }); - store.appendEvent({ - occurredAt: 2000, - kind: 'topic-created', - key: 't2', - meta: JSON.stringify({ - topicId: 't2', - parentTopicId: 't1', - title: 'Child', - createdBy: 'bob', - } satisfies TopicCreatedMeta), - }); - - const topics = buildTopicsFromEvents(store); - expect(topics.size).toBe(2); - const child = topics.get('t2')!; - expect(child.parentTopicId).toBe('t1'); - expect(child.title).toBe('Child'); - expect(child.status).toBe('open'); - store.close(); - }); - - test('close topic with summary', () => { - const store = createTestStore(); - store.appendEvent({ - occurredAt: 1000, - kind: 'topic-created', - key: 't1', - meta: JSON.stringify({ - topicId: 't1', - title: 'Bug triage', - createdBy: 'alice', - } satisfies TopicCreatedMeta), - }); - store.appendEvent({ - occurredAt: 2000, - kind: 'topic-closed', - key: 't1', - meta: JSON.stringify({ - topicId: 't1', - summary: 'Resolved all P0 bugs', - } satisfies TopicClosedMeta), - }); - - const topics = buildTopicsFromEvents(store); - const t = topics.get('t1')!; - expect(t.status).toBe('closed'); - expect(t.closedAt).toBe(2000); - expect(t.summary).toBe('Resolved all P0 bugs'); - store.close(); - }); - - test('empty event stream', () => { - const store = createTestStore(); - const topics = buildTopicsFromEvents(store); - expect(topics.size).toBe(0); - store.close(); - }); -}); diff --git a/packages/pulse/src/topic.ts b/packages/pulse/src/topic.ts deleted file mode 100644 index d0bf4dc..0000000 --- a/packages/pulse/src/topic.ts +++ /dev/null @@ -1,77 +0,0 @@ -import type { PulseStore } from './store.js'; - -export type TopicStatus = 'open' | 'closed'; - -export interface Topic { - topicId: string; - parentTopicId?: string; - title: string; - status: TopicStatus; - createdBy: string; - createdAt: number; - closedAt?: number; - summary?: string; -} - -export interface TopicCreatedMeta { - topicId: string; - parentTopicId?: string; - title: string; - createdBy: string; -} - -export interface TopicClosedMeta { - topicId: string; - summary?: string; -} - -/** - * Build a Map of Topic from topic-created and topic-closed events. - * - * - topic-created → open topic - * - topic-closed → close + attach summary - * - Events are merged in occurredAt order. - */ -export function buildTopicsFromEvents(store: PulseStore): Map { - const created = store.queryByKind('topic-created'); - const closed = store.queryByKind('topic-closed'); - - const allEvents = [...created, ...closed]; - allEvents.sort((a, b) => a.occurredAt - b.occurredAt); - - const topics = new Map(); - - for (const ev of allEvents) { - if (!ev.meta) continue; - let meta: Record; - try { - meta = JSON.parse(ev.meta); - } catch { - continue; - } - - const topicId = meta.topicId as string; - if (!topicId) continue; - - if (ev.kind === 'topic-created') { - const m = meta as unknown as TopicCreatedMeta; - topics.set(topicId, { - topicId: m.topicId, - parentTopicId: m.parentTopicId, - title: m.title, - status: 'open', - createdBy: m.createdBy, - createdAt: ev.occurredAt, - }); - } else if (ev.kind === 'topic-closed') { - const existing = topics.get(topicId); - if (!existing) continue; - const m = meta as unknown as TopicClosedMeta; - existing.status = 'closed'; - existing.closedAt = ev.occurredAt; - if (m.summary !== undefined) existing.summary = m.summary; - } - } - - return topics; -}