refactor: remove v1 Council code (Container/Binding/Task/Moderator)

This commit is contained in:
2026-04-17 03:24:36 +00:00
parent b645205ea5
commit b65e9d65cb
19 changed files with 10 additions and 2243 deletions
-63
View File
@@ -1,63 +0,0 @@
# Council Model
The Council is a multi-agent deliberation framework built on Pulse's
event-sourced store. Personas discuss Topics under the guidance of a Moderator,
each executing inside an isolated Container.
## Core Concepts
### Persona Registry (`src/persona.ts`)
A **Persona** is an agent identity carrying a name, container type, and
capability list. `buildPersonasFromEvents()` projects `persona-registered` and
`persona-updated` events (merged by `occurredAt`) into a `Map<personaId, PersonaState>`.
Re-registering the same ID is an idempotent overwrite; updates patch only the
fields provided.
### Container Registry (`src/container.ts`)
A **Container** is a runtime environment (`cursor`, `claude-code`, `hermes`, …)
described by `containerId`, `type`, `host`, `tools[]`, and `status`
(`online | offline`). `buildContainersFromEvents()` replays
`container-registered` and `container-status-changed` events into a live map.
### Persona × Container Binding (`src/binding.ts`)
A **Binding** ties one Persona to exactly one Container at a time.
`persona-bound` creates or overwrites the link; `persona-unbound` deletes it.
`resolveRole(personaId, personas, bindings, containers)` returns the full
`{ persona, container }` pair needed for dispatch, or `null` if any lookup fails.
### Topic / Sub-Topic (`src/topic.ts`)
A **Topic** is a named discussion thread (`topicId`, `title`, `createdBy`)
that transitions through `open → closed` and may carry a `summary`.
`parentTopicId` enables nesting — the Moderator can spawn sub-topics, creating
recursive councils whose summaries fold back into the parent.
### Moderator (`src/moderator.ts`)
The Moderator is a pure async function:
`(participants: PersonaState[], history: CouncilMessage[]) → ModeratorDecision`.
Four decision types:
| Decision | Effect |
|----------|--------|
| `speak` | Select the next speaker by `personaId` |
| `add` | Invite a new Persona into the council |
| `spawn` | Create a sub-topic with its own participant set |
| `close` | End deliberation, optionally attaching a summary |
Built-in strategies:
| Strategy | File | Behavior |
|----------|------|----------|
| Round-robin | `src/moderators/round-robin.ts` | Each participant speaks once in registration order, then closes |
| LLM-driven | `src/moderators/llm-moderator.ts` | An LLM picks the next action via tool calls (`next_speaker`, `add_member`, `spawn_sub_topic`, `close_council`) |
### Council Execution Loop (`src/council.ts`)
`runCouncil(opts: CouncilOptions): Promise<CouncilResult>` drives deliberation:
1. Call `moderator(participants, history)` to obtain a `ModeratorDecision`.
2. Execute the decision:
- **speak** → invoke `onSpeak(personaId)`, append result to history.
- **add** → push persona into participants, invoke `onAddMember` callback.
- **spawn** → invoke `onSpawn(topicId, title, participants)` recursively;
the sub-council's summary is appended to parent history.
- **close** → return `{ history, summary, rounds }`.
3. Repeat until `close` is returned or `maxRounds` (default 10) is exceeded.
-1
View File
@@ -1 +0,0 @@
Council v2 serialization test passed.
-88
View File
@@ -1,88 +0,0 @@
import type { Container } from './container.js';
import type { PulseStore } from './store.js';
import type { PersonaState } from './task-events.js';
export interface PersonaBinding {
personaId: string;
containerId: string;
boundAt: number;
}
export interface PersonaBoundMeta {
personaId: string;
containerId: string;
}
export interface PersonaUnboundMeta {
personaId: string;
containerId: string;
reason?: string;
}
/**
* Build a Map of PersonaBinding from persona-bound and persona-unbound events.
*
* A Persona can only be bound to one Container at a time.
* persona-bound overwrites any previous binding; persona-unbound removes it.
*/
export function buildBindingsFromEvents(
store: PulseStore,
): Map<string, PersonaBinding> {
const bound = store.queryByKind('persona-bound');
const unbound = store.queryByKind('persona-unbound');
const allEvents = [...bound, ...unbound];
allEvents.sort((a, b) => a.occurredAt - b.occurredAt);
const bindings = new Map<string, PersonaBinding>();
for (const ev of allEvents) {
if (!ev.meta) continue;
let meta: Record<string, unknown>;
try {
meta = JSON.parse(ev.meta);
} catch {
continue;
}
const personaId = meta.personaId as string;
if (!personaId) continue;
if (ev.kind === 'persona-bound') {
const m = meta as unknown as PersonaBoundMeta;
bindings.set(personaId, {
personaId: m.personaId,
containerId: m.containerId,
boundAt: ev.occurredAt,
});
} else if (ev.kind === 'persona-unbound') {
bindings.delete(personaId);
}
}
return bindings;
}
/**
* Resolve a Role = Persona × Container.
*
* Looks up the persona, finds its current binding, then resolves the container.
* Returns null if persona not found, not bound, or container doesn't exist.
*/
export function resolveRole(
personaId: string,
personas: Map<string, PersonaState>,
bindings: Map<string, PersonaBinding>,
containers: Map<string, Container>,
): { persona: PersonaState; container: Container } | null {
const persona = personas.get(personaId);
if (!persona) return null;
const binding = bindings.get(personaId);
if (!binding) return null;
const container = containers.get(binding.containerId);
if (!container) return null;
return { persona, container };
}
-73
View File
@@ -1,73 +0,0 @@
import type { PulseStore } from './store.js';
import type { ContainerStatus, ContainerType } from './task-events.js';
export interface Container {
containerId: string;
type: ContainerType;
host: string;
status: ContainerStatus;
tools: string[];
registeredAt: number;
updatedAt: number;
}
export interface ContainerRegisteredMeta {
containerId: string;
type: ContainerType;
host: string;
tools: string[];
}
export interface ContainerStatusChangedMeta {
containerId: string;
status: ContainerStatus;
}
/**
* Build a Map of Container from container-registered and container-status-changed events.
*/
export function buildContainersFromEvents(
store: PulseStore,
): Map<string, Container> {
const registered = store.queryByKind('container-registered');
const statusChanged = store.queryByKind('container-status-changed');
const allEvents = [...registered, ...statusChanged];
allEvents.sort((a, b) => a.occurredAt - b.occurredAt);
const containers = new Map<string, Container>();
for (const ev of allEvents) {
if (!ev.meta) continue;
let meta: Record<string, unknown>;
try {
meta = JSON.parse(ev.meta);
} catch {
continue;
}
const containerId = meta.containerId as string;
if (!containerId) continue;
if (ev.kind === 'container-registered') {
const m = meta as unknown as ContainerRegisteredMeta;
containers.set(containerId, {
containerId: m.containerId,
type: m.type,
host: m.host,
status: 'online',
tools: m.tools,
registeredAt: ev.occurredAt,
updatedAt: ev.occurredAt,
});
} else if (ev.kind === 'container-status-changed') {
const existing = containers.get(containerId);
if (!existing) continue;
const m = meta as unknown as ContainerStatusChangedMeta;
existing.status = m.status;
existing.updatedAt = ev.occurredAt;
}
}
return containers;
}
-92
View File
@@ -1,92 +0,0 @@
import type { CouncilMessage, ModeratorFn } from './moderator.js';
import type { PersonaState } from './task-events.js';
export interface CouncilOptions {
moderator: ModeratorFn;
participants: PersonaState[];
/** Pre-existing council history (e.g. from task-responded events) */
history?: CouncilMessage[];
/** Callback when a participant speaks — returns their speech content */
onSpeak: (personaId: string) => Promise<string>;
/** Callback when a new member is added */
onAddMember?: (persona: PersonaState) => Promise<void>;
/** Callback when a sub-topic is spawned — returns the sub-council result */
onSpawn?: (
topicId: string,
title: string,
participants: PersonaState[],
) => Promise<CouncilResult>;
/** Maximum rounds to prevent infinite loops (default 10) */
maxRounds?: number;
}
export interface CouncilResult {
history: CouncilMessage[];
summary?: string;
rounds: number;
}
/**
* Run a Council deliberation loop.
*
* 1. Call moderator(participants, history)
* 2. Based on decision:
* - speak → onSpeak → append to history → back to 1
* - add → onAddMember → append to participants → back to 1
* - close → return final history + summary
* 3. Force close after maxRounds
*/
export async function runCouncil(opts: CouncilOptions): Promise<CouncilResult> {
const { moderator, onSpeak, onAddMember, onSpawn, maxRounds = 10 } = opts;
const participants = [...opts.participants];
const history: CouncilMessage[] = [...(opts.history ?? [])];
let rounds = 0;
while (rounds < maxRounds) {
rounds++;
const decision = await moderator(participants, history);
switch (decision.action) {
case 'speak': {
const content = await onSpeak(decision.personaId);
history.push({
personaId: decision.personaId,
content,
timestamp: Date.now(),
});
break;
}
case 'add': {
participants.push(decision.persona);
if (onAddMember) {
await onAddMember(decision.persona);
}
break;
}
case 'close': {
return { history, summary: decision.summary, rounds };
}
case 'spawn': {
if (onSpawn) {
const subResult = await onSpawn(
decision.topicId,
decision.title,
decision.participants,
);
history.push({
personaId: `sub-council:${decision.topicId}`,
content:
subResult.summary ?? 'Sub-council completed without summary',
timestamp: Date.now(),
});
}
break;
}
}
}
// maxRounds exceeded — force close
return { history, summary: 'Max rounds exceeded', rounds };
}
+9 -495
View File
@@ -1,9 +1,9 @@
#!/usr/bin/env bun
/**
* Pulse Council Demo — Full Pipeline
* Pulse Council Demo — v2 TopicType Pipeline
*
* Executable script (not a test) that simulates the complete
* Council dispatch chain driven by the Rules engine.
* Council v2 dispatch chain driven by TopicType rules.
*
* Run: cd ~/repos/pulse && bun packages/pulse/src/e2e/council-demo.ts
*
@@ -13,22 +13,11 @@
import { mkdtempSync, rmSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { createBrokerExecutor } from '../executors/broker.js';
import type { Sensed } from '../index.js';
import {
buildBindingsFromEvents,
buildContainersFromEvents,
buildPersonasFromEvents,
buildTopicsFromEvents,
createRoundRobinModerator,
createStore,
runCouncil,
} from '../index.js';
import type { LlmClient } from '../llm-client.js';
import { createOpenAiLlmClient } from '../llm-client.js';
import { createTaskRule } from '../rules/task-rule.js';
import type { PendingTasksData } from '../task-events.js';
import { buildPendingTasksFromEvents } from '../watchers/pending-tasks-projection.js';
import { createStore } from '../store.js';
import { createCodingTaskType } from '../topics/coding-task.js';
import { createArchitectRole } from '../topics/roles/architect-llm.js';
import { createTopicRule } from '../topics/topic-rule-adapter.js';
// ── Helpers ────────────────────────────────────────────────────
@@ -42,17 +31,6 @@ function logItem(msg: string) {
console.log(`${msg}`);
}
function logSpeech(persona: string, content: string) {
console.log(` ${persona}: "${content}"`);
}
function assert(condition: boolean, msg: string) {
if (!condition) {
console.error(` ❌ ASSERTION FAILED: ${msg}`);
process.exit(1);
}
}
// ── CLI Args ───────────────────────────────────────────────────
const LIVE = process.argv.includes('--live');
@@ -62,437 +40,6 @@ const KEEP = process.argv.includes('--keep');
// ── Main ───────────────────────────────────────────────────────
async function main() {
const useCustomDb = !!DB_PATH;
const tmpDir = useCustomDb
? undefined
: mkdtempSync(join(tmpdir(), 'pulse-council-demo-'));
const dbPath = DB_PATH ?? join(tmpDir!, 'events.db');
const objDir = DB_PATH
? join(require('node:path').dirname(DB_PATH), 'objects')
: join(tmpDir!, 'objects');
const store = createStore({
eventsDbPath: dbPath,
objectsDir: objDir,
});
try {
console.log();
console.log(`🍊 === Pulse Council Demo — Full Pipeline ===`);
console.log(SEP);
// ── Step 1: Register Personas ──────────────────────────────
console.log();
log('📝', 'Step 1: Register Personas');
const now = Date.now();
store.appendEvent({
occurredAt: now,
kind: 'persona-registered',
key: 'xiaoju',
meta: JSON.stringify({
personaId: 'xiaoju',
name: '小橘',
container: 'openclaw',
capabilities: ['coordination', 'review'],
}),
});
logItem('xiaoju (coordination, review)');
store.appendEvent({
occurredAt: now + 1,
kind: 'persona-registered',
key: 'cursor-agent',
meta: JSON.stringify({
personaId: 'cursor-agent',
name: 'Cursor Agent',
container: 'cursor',
capabilities: ['coding', 'refactor'],
}),
});
logItem('cursor-agent (coding, refactor)');
const personas = buildPersonasFromEvents(store);
assert(personas.size === 2, 'Expected 2 personas');
// ── Step 2: Register Containers ────────────────────────────
console.log();
log('📦', 'Step 2: Register Containers');
store.appendEvent({
occurredAt: now + 2,
kind: 'container-registered',
key: 'openclaw-neko',
meta: JSON.stringify({
containerId: 'openclaw-neko',
type: 'openclaw',
host: 'neko-vm',
tools: ['a2a', 'feishu'],
}),
});
logItem('openclaw-neko (openclaw @ neko-vm)');
store.appendEvent({
occurredAt: now + 3,
kind: 'container-registered',
key: 'cursor-neko',
meta: JSON.stringify({
containerId: 'cursor-neko',
type: 'cursor',
host: 'neko-vm',
tools: ['file-edit', 'terminal'],
}),
});
logItem('cursor-neko (cursor @ neko-vm)');
const containers = buildContainersFromEvents(store);
assert(containers.size === 2, 'Expected 2 containers');
// ── Step 3: Bind Persona × Container ───────────────────────
console.log();
log('🔗', 'Step 3: Bind Persona × Container');
store.appendEvent({
occurredAt: now + 4,
kind: 'persona-bound',
key: 'xiaoju',
meta: JSON.stringify({
personaId: 'xiaoju',
containerId: 'openclaw-neko',
}),
});
logItem('xiaoju → openclaw-neko');
store.appendEvent({
occurredAt: now + 5,
kind: 'persona-bound',
key: 'cursor-agent',
meta: JSON.stringify({
personaId: 'cursor-agent',
containerId: 'cursor-neko',
}),
});
logItem('cursor-agent → cursor-neko');
const bindings = buildBindingsFromEvents(store);
assert(bindings.size === 2, 'Expected 2 bindings');
// ── Step 4: Create Task ────────────────────────────────────
console.log();
log('📋', 'Step 4: Create Task');
store.appendEvent({
occurredAt: now + 10,
kind: 'task-created',
key: 'task-fix-login-bug',
meta: JSON.stringify({
taskId: 'task-fix-login-bug',
projectId: 'proj-neko',
title: 'Fix login redirect bug',
description: 'Login page redirects to /undefined after auth',
type: 'bug',
priority: 5,
creatorId: 'xiaoju',
}),
});
logItem('task-fix-login-bug: "Fix login redirect bug"');
// ── Step 5: Rules Engine — detect pending task ─────────────
console.log();
log('⚙️', 'Step 5: Rules Engine — detect pending task');
// Build pending tasks from events to construct snapshot
const pendingData = buildPendingTasksFromEvents(store);
assert(pendingData.pendingCount > 0, 'Expected pending tasks');
// Construct snapshot with pending-tasks sensed data
type Snapshot = Record<string, unknown> & { timestamp: number };
type Effect = Record<string, unknown>;
const currSnapshot: Snapshot = {
timestamp: Date.now(),
'pending-tasks': {
data: pendingData,
refreshedAt: Date.now(),
} as Sensed<PendingTasksData>,
};
const prevSnapshot: Snapshot = { timestamp: now - 1000 };
const taskRule = createTaskRule<Snapshot, Effect>();
const baseInner = async (
_p: Snapshot,
_c: Snapshot,
): Promise<[Effect[], number]> => [[], 15000];
const [effects] = await taskRule(prevSnapshot, currSnapshot, baseInner);
const brokerEffect = effects.find((e) => e.kind === 'broker');
assert(!!brokerEffect, 'Expected broker effect from task rule');
logItem(
`Task Rule produced broker effect: taskIds=${JSON.stringify((brokerEffect as { taskIds: string[] }).taskIds)}`,
);
// ── Step 6: Broker Executor — assign task ──────────────────
console.log();
const llmMode = LIVE ? 'LIVE LLM' : 'mock LLM';
log('🤖', `Step 6: Broker Executor — assign task (${llmMode})`);
let llmClient: LlmClient;
if (LIVE) {
const baseUrl =
process.env.PULSE_LLM_BASE_URL ?? process.env.OPENAI_BASE_URL;
const apiKey =
process.env.PULSE_LLM_API_KEY ?? process.env.OPENAI_API_KEY;
const model = process.env.PULSE_LLM_MODEL ?? 'gpt-4o-mini';
if (!baseUrl || !apiKey) {
console.error(
' ❌ --live requires PULSE_LLM_BASE_URL + PULSE_LLM_API_KEY (or OPENAI_BASE_URL + OPENAI_API_KEY)',
);
process.exit(1);
}
logItem(`LLM: ${model} @ ${baseUrl}`);
llmClient = createOpenAiLlmClient({ baseUrl, apiKey, model });
} else {
llmClient = {
chat: async (_req) => ({
tool_calls: [
{
id: 'call_1',
function: {
name: 'assign_task',
arguments: JSON.stringify({
taskId: 'task-fix-login-bug',
assigneeId: 'cursor-agent',
}),
},
},
],
}),
};
}
const executeBroker = createBrokerExecutor({
llmClient,
routineStore: store,
getPersonas: () => buildPersonasFromEvents(store),
});
await executeBroker(brokerEffect as { kind: 'broker'; taskIds: string[] });
// Verify events written
const routingEvents = store.queryByKind('task-routing');
assert(routingEvents.length > 0, 'Expected task-routing event');
logItem('task-routing event written');
logItem('LLM decided: assign task-fix-login-bug → cursor-agent');
const assignedEvents = store.queryByKind('task-assigned');
assert(assignedEvents.length > 0, 'Expected task-assigned event');
const assignedMeta = JSON.parse(assignedEvents[0].meta!);
assert(
assignedMeta.assigneeId === 'cursor-agent',
'Expected cursor-agent assignment',
);
logItem('task-assigned event written');
// ── Step 7: Cursor completes coding ────────────────────────
console.log();
log('💻', 'Step 7: Cursor completes coding');
store.appendEvent({
occurredAt: Date.now(),
kind: 'task-responded',
key: 'task-fix-login-bug',
meta: JSON.stringify({
taskId: 'task-fix-login-bug',
assigneeId: 'cursor-agent',
result:
'Fixed auth.ts line 42, added 3 test cases. PR #123 ready for review.',
}),
});
logItem('task-responded: "Fixed auth.ts line 42..."');
// ── Step 8: Council Deliberation ───────────────────────────
console.log();
log('🗣️', 'Step 8: Council Deliberation');
// Create topic for the task
store.appendEvent({
occurredAt: Date.now(),
kind: 'topic-created',
key: 'task-fix-login-bug',
meta: JSON.stringify({
topicId: 'task-fix-login-bug',
title: 'Fix login redirect bug',
createdBy: 'xiaoju',
}),
});
const councilSpeeches: Record<string, string[]> = {
xiaoju: ['Login bug fixed, please review PR #123'],
'cursor-agent': ['All tests pass, ready for merge'],
};
const councilIdx: Record<string, number> = { xiaoju: 0, 'cursor-agent': 0 };
const councilResult = await runCouncil({
moderator: createRoundRobinModerator(),
participants: [personas.get('xiaoju')!, personas.get('cursor-agent')!],
onSpeak: async (personaId: string) => {
const idx = councilIdx[personaId] ?? 0;
councilIdx[personaId] = idx + 1;
return councilSpeeches[personaId]?.[idx] ?? '...';
},
});
for (const msg of councilResult.history) {
logSpeech(msg.personaId, msg.content);
}
logItem(`Council closed: ${councilResult.rounds} rounds`);
// ── Step 9: Review Sub-Council ─────────────────────────────
console.log();
log('🔍', 'Step 9: Review Sub-Council');
store.appendEvent({
occurredAt: Date.now(),
kind: 'topic-created',
key: 'review-pr-123',
meta: JSON.stringify({
topicId: 'review-pr-123',
parentTopicId: 'task-fix-login-bug',
title: 'Review PR #123',
createdBy: 'xiaoju',
}),
});
logItem('Sub-topic: review-pr-123 (parent: task-fix-login-bug)');
const reviewSpeeches: Record<string, string[]> = {
xiaoju: ['Edge case: expired tokens?'],
'cursor-agent': ['Added token expiry check'],
};
const reviewIdx: Record<string, number> = { xiaoju: 0, 'cursor-agent': 0 };
const reviewResult = await runCouncil({
moderator: createRoundRobinModerator(),
participants: [personas.get('xiaoju')!, personas.get('cursor-agent')!],
onSpeak: async (personaId: string) => {
const idx = reviewIdx[personaId] ?? 0;
reviewIdx[personaId] = idx + 1;
return reviewSpeeches[personaId]?.[idx] ?? '...';
},
});
for (const msg of reviewResult.history) {
logSpeech(msg.personaId, msg.content);
}
logItem(`Review closed: ${reviewResult.rounds} rounds`);
// ── Step 10: Close Topics ──────────────────────────────────
console.log();
log('🏁', 'Step 10: Close Topics');
store.appendEvent({
occurredAt: Date.now(),
kind: 'topic-closed',
key: 'review-pr-123',
meta: JSON.stringify({
topicId: 'review-pr-123',
summary: 'PR approved with token expiry fix',
}),
});
logItem('review-pr-123: closed (PR approved with token expiry fix)');
store.appendEvent({
occurredAt: Date.now() + 1,
kind: 'topic-closed',
key: 'task-fix-login-bug',
meta: JSON.stringify({
topicId: 'task-fix-login-bug',
summary: 'Login bug fixed, PR merged',
}),
});
logItem('task-fix-login-bug: closed (Login bug fixed, PR merged)');
// ── Event Log ──────────────────────────────────────────────
console.log();
console.log(`📊 === Event Log (chronological) ===`);
const allEvents = store.getAfter(0);
allEvents.sort((a, b) => a.occurredAt - b.occurredAt);
for (const ev of allEvents) {
const key =
ev.key ??
(ev.meta
? (JSON.parse(ev.meta).taskId ??
JSON.parse(ev.meta).personaId ??
JSON.parse(ev.meta).containerId ??
'')
: '');
console.log(
` #${String(ev.id).padStart(2)} ${ev.kind.padEnd(22)} ${key}`,
);
}
// ── Final State ────────────────────────────────────────────
console.log();
console.log(`📊 === Final State ===`);
const finalPersonas = buildPersonasFromEvents(store);
const finalContainers = buildContainersFromEvents(store);
const finalBindings = buildBindingsFromEvents(store);
const finalTopics = buildTopicsFromEvents(store);
const personaNames = Array.from(finalPersonas.keys()).join(', ');
const containerNames = Array.from(finalContainers.keys()).join(', ');
const allClosed = Array.from(finalTopics.values()).every(
(t) => t.status === 'closed',
);
console.log(` Personas: ${finalPersonas.size} (${personaNames})`);
console.log(` Containers: ${finalContainers.size} (${containerNames})`);
console.log(` Bindings: ${finalBindings.size}`);
console.log(
` Topics: ${finalTopics.size} (${allClosed ? 'all closed' : 'some open'})`,
);
console.log();
console.log(SEP);
console.log(`✅ All steps completed successfully!`);
console.log();
} finally {
store.close();
if (KEEP || useCustomDb) {
console.log(`📁 Events DB preserved: ${dbPath}`);
} else if (tmpDir) {
rmSync(tmpDir, { recursive: true, force: true });
}
}
}
// ── V2 Demo ────────────────────────────────────────────────────
async function mainV2() {
const { createCodingTaskType } = await import('../topics/coding-task.js');
const { createTopicRule } = await import('../topics/topic-rule-adapter.js');
const _V2 = true;
const useCustomDb = !!DB_PATH;
const tmpDir = useCustomDb
? null
@@ -524,45 +71,13 @@ async function mainV2() {
const model = process.env.PULSE_LLM_MODEL ?? 'qwen-plus';
if (!baseUrl || !apiKey) {
console.error(
'❌ --live --v2 requires PULSE_LLM_BASE_URL + PULSE_LLM_API_KEY',
'❌ --live requires PULSE_LLM_BASE_URL + PULSE_LLM_API_KEY',
);
process.exit(1);
}
const llm = createOpenAiLlmClient({ baseUrl, apiKey, model });
codingTask = createCodingTaskType({
architectFn: async (ctx, s) => {
const resp = await llm.chat({
messages: [
{
role: 'system',
content:
'You are a software architect. Analyze the task and suggest target files and approach. Reply in JSON: {"analysis": "...", "targetFiles": ["..."]}',
},
{
role: 'user',
content: `Task: ${ctx.title}\nDescription: ${ctx.description}\nRepo: ${ctx.repoDir}`,
},
],
});
let parsed: { analysis?: string; targetFiles?: string[] };
try {
parsed = JSON.parse(resp.content ?? '{}');
} catch {
parsed = {
analysis: resp.content ?? 'No analysis',
targetFiles: [],
};
}
s.appendEvent({
occurredAt: Date.now(),
kind: 'coding.analyzed',
meta: JSON.stringify({
topicId: ctx.topicId,
analysis: parsed.analysis ?? 'No analysis',
targetFiles: parsed.targetFiles ?? [],
}),
});
},
architectFn: createArchitectRole(llm),
});
} else {
codingTask = createCodingTaskType();
@@ -650,8 +165,7 @@ async function mainV2() {
}
}
const isV2 = process.argv.includes('--v2');
(isV2 ? mainV2 : main)().catch((err) => {
main().catch((err) => {
console.error('❌ Demo failed:', err);
process.exit(1);
});
@@ -1,366 +0,0 @@
/**
* E2E T10 — Council full pipeline: task → code → review → close
*
* Simulates the real coding task lifecycle:
* Persona registration → Container registration → Binding →
* Topic creation → Council deliberation → Coding response →
* Review sub-topic → Close
*
* Uses real SQLite store (no mocks).
*
* Run: bun test packages/pulse/src/e2e/t10-council-e2e.test.ts
*/
import { afterAll, beforeAll, describe, expect, it } from 'bun:test';
import { mkdtempSync, rmSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import type { CouncilResult } from '../council.js';
import {
buildBindingsFromEvents,
buildContainersFromEvents,
buildPersonasFromEvents,
buildTopicsFromEvents,
createRoundRobinModerator,
createStore,
type PulseStore,
resolveRole,
runCouncil,
} from '../index.js';
// ── Shared Context ─────────────────────────────────────────────
interface Ctx {
tmpDir: string;
store: PulseStore;
councilResult?: CouncilResult;
reviewResult?: CouncilResult;
}
const ctx: Ctx = {} as Ctx;
// ── Test Suite ─────────────────────────────────────────────────
describe('E2E: Council full pipeline — task → code → review → close', () => {
beforeAll(() => {
ctx.tmpDir = mkdtempSync(join(tmpdir(), 'pulse-council-e2e-'));
ctx.store = createStore({
eventsDbPath: join(ctx.tmpDir, 'events.db'),
objectsDir: join(ctx.tmpDir, 'objects'),
});
});
afterAll(() => {
ctx.store.close();
rmSync(ctx.tmpDir, { recursive: true, force: true });
});
// ── Step 1: Register Personas ──────────────────────────────────
it('Step 1: register personas — xiaoju + cursor-agent', () => {
const now = Date.now();
ctx.store.appendEvent({
occurredAt: now,
kind: 'persona-registered',
key: 'xiaoju',
meta: JSON.stringify({
personaId: 'xiaoju',
name: '小橘',
container: 'openclaw',
capabilities: ['coordination', 'review'],
}),
});
ctx.store.appendEvent({
occurredAt: now + 1,
kind: 'persona-registered',
key: 'cursor-agent',
meta: JSON.stringify({
personaId: 'cursor-agent',
name: 'Cursor Agent',
container: 'cursor',
capabilities: ['coding', 'refactor'],
}),
});
const personas = buildPersonasFromEvents(ctx.store);
expect(personas.size).toBe(2);
expect(personas.get('xiaoju')?.capabilities).toEqual([
'coordination',
'review',
]);
expect(personas.get('cursor-agent')?.capabilities).toEqual([
'coding',
'refactor',
]);
});
// ── Step 2: Register Containers ────────────────────────────────
it('Step 2: register containers — openclaw-neko + cursor-neko', () => {
const now = Date.now();
ctx.store.appendEvent({
occurredAt: now,
kind: 'container-registered',
key: 'openclaw-neko',
meta: JSON.stringify({
containerId: 'openclaw-neko',
type: 'openclaw',
host: 'neko-vm',
tools: ['a2a', 'feishu'],
}),
});
ctx.store.appendEvent({
occurredAt: now + 1,
kind: 'container-registered',
key: 'cursor-neko',
meta: JSON.stringify({
containerId: 'cursor-neko',
type: 'cursor',
host: 'neko-vm',
tools: ['file-edit', 'terminal'],
}),
});
const containers = buildContainersFromEvents(ctx.store);
expect(containers.size).toBe(2);
expect(containers.get('openclaw-neko')?.type).toBe('openclaw');
expect(containers.get('cursor-neko')?.tools).toEqual([
'file-edit',
'terminal',
]);
});
// ── Step 3: Bind Persona × Container ───────────────────────────
it('Step 3: bind personas to containers + resolveRole', () => {
const now = Date.now();
ctx.store.appendEvent({
occurredAt: now,
kind: 'persona-bound',
key: 'xiaoju',
meta: JSON.stringify({
personaId: 'xiaoju',
containerId: 'openclaw-neko',
}),
});
ctx.store.appendEvent({
occurredAt: now + 1,
kind: 'persona-bound',
key: 'cursor-agent',
meta: JSON.stringify({
personaId: 'cursor-agent',
containerId: 'cursor-neko',
}),
});
const bindings = buildBindingsFromEvents(ctx.store);
expect(bindings.size).toBe(2);
expect(bindings.get('xiaoju')?.containerId).toBe('openclaw-neko');
expect(bindings.get('cursor-agent')?.containerId).toBe('cursor-neko');
// Verify Role resolution
const personas = buildPersonasFromEvents(ctx.store);
const containers = buildContainersFromEvents(ctx.store);
const xiaojuRole = resolveRole('xiaoju', personas, bindings, containers);
expect(xiaojuRole).not.toBeNull();
expect(xiaojuRole!.persona.personaId).toBe('xiaoju');
expect(xiaojuRole!.container.containerId).toBe('openclaw-neko');
const cursorRole = resolveRole(
'cursor-agent',
personas,
bindings,
containers,
);
expect(cursorRole).not.toBeNull();
expect(cursorRole!.container.type).toBe('cursor');
});
// ── Step 4: Create Topic (coding task) ─────────────────────────
it('Step 4: create topic — task-fix-login-bug', () => {
ctx.store.appendEvent({
occurredAt: Date.now(),
kind: 'topic-created',
key: 'task-fix-login-bug',
meta: JSON.stringify({
topicId: 'task-fix-login-bug',
title: 'Fix login redirect bug',
createdBy: 'xiaoju',
}),
});
const topics = buildTopicsFromEvents(ctx.store);
expect(topics.size).toBe(1);
const topic = topics.get('task-fix-login-bug');
expect(topic).not.toBeNull();
expect(topic!.title).toBe('Fix login redirect bug');
expect(topic!.status).toBe('open');
expect(topic!.createdBy).toBe('xiaoju');
});
// ── Step 5: Council deliberation — assign task ─────────────────
it('Step 5: council deliberation — round-robin assignment', async () => {
const personas = buildPersonasFromEvents(ctx.store);
const participants = [
personas.get('xiaoju')!,
personas.get('cursor-agent')!,
];
const speeches: Record<string, string> = {
xiaoju: 'Login redirect bug in auth.ts line 42. cursor-agent please fix.',
'cursor-agent': "Understood. I'll fix auth.ts and add tests.",
};
ctx.councilResult = await runCouncil({
moderator: createRoundRobinModerator(),
participants,
onSpeak: async (personaId: string) => speeches[personaId]!,
});
expect(ctx.councilResult.history).toHaveLength(2);
expect(ctx.councilResult.history[0]!.personaId).toBe('xiaoju');
expect(ctx.councilResult.history[1]!.personaId).toBe('cursor-agent');
// 2 speak rounds + 1 close decision = 3 rounds
expect(ctx.councilResult.rounds).toBe(3);
expect(ctx.councilResult.summary).toBe('All participants have spoken');
});
// ── Step 6: Simulate coding completion ─────────────────────────
it('Step 6: task-responded — cursor-agent completes coding', () => {
const ev = ctx.store.appendEvent({
occurredAt: Date.now(),
kind: 'task-responded',
key: 'task-fix-login-bug',
meta: JSON.stringify({
taskId: 'task-fix-login-bug',
assigneeId: 'cursor-agent',
result:
'Fixed auth.ts line 42, added 3 test cases. PR #123 ready for review.',
}),
});
expect(ev.id).toBeGreaterThan(0);
expect(ev.kind).toBe('task-responded');
});
// ── Step 7: Review Council — sub-topic ─────────────────────────
it('Step 7: review council — sub-topic review-pr-123', async () => {
// Create review sub-topic
ctx.store.appendEvent({
occurredAt: Date.now(),
kind: 'topic-created',
key: 'review-pr-123',
meta: JSON.stringify({
topicId: 'review-pr-123',
parentTopicId: 'task-fix-login-bug',
title: 'Review PR #123',
createdBy: 'xiaoju',
}),
});
const topics = buildTopicsFromEvents(ctx.store);
const reviewTopic = topics.get('review-pr-123');
expect(reviewTopic).not.toBeNull();
expect(reviewTopic!.parentTopicId).toBe('task-fix-login-bug');
// Run review council
const personas = buildPersonasFromEvents(ctx.store);
const participants = [
personas.get('xiaoju')!,
personas.get('cursor-agent')!,
];
const reviewSpeeches: Record<string, string> = {
xiaoju: 'Edge case: what about expired tokens?',
'cursor-agent': 'Good catch, added token expiry check and test.',
};
ctx.reviewResult = await runCouncil({
moderator: createRoundRobinModerator(),
participants,
onSpeak: async (personaId: string) => reviewSpeeches[personaId]!,
});
expect(ctx.reviewResult.history).toHaveLength(2);
expect(ctx.reviewResult.rounds).toBe(3);
expect(ctx.reviewResult.summary).toBe('All participants have spoken');
});
// ── Step 8: Close topics ───────────────────────────────────────
it('Step 8: close both topics with summaries', () => {
const now = Date.now();
ctx.store.appendEvent({
occurredAt: now,
kind: 'topic-closed',
key: 'review-pr-123',
meta: JSON.stringify({
topicId: 'review-pr-123',
summary: 'PR approved with token expiry fix',
}),
});
ctx.store.appendEvent({
occurredAt: now + 1,
kind: 'topic-closed',
key: 'task-fix-login-bug',
meta: JSON.stringify({
topicId: 'task-fix-login-bug',
summary: 'Login bug fixed, PR merged',
}),
});
const topics = buildTopicsFromEvents(ctx.store);
const mainTopic = topics.get('task-fix-login-bug');
expect(mainTopic!.status).toBe('closed');
expect(mainTopic!.summary).toBe('Login bug fixed, PR merged');
const reviewTopic = topics.get('review-pr-123');
expect(reviewTopic!.status).toBe('closed');
expect(reviewTopic!.summary).toBe('PR approved with token expiry fix');
});
// ── Step 9: Final consistency check ────────────────────────────
it('Step 9: full state consistency — all entities intact', () => {
const personas = buildPersonasFromEvents(ctx.store);
expect(personas.size).toBe(2);
expect(personas.has('xiaoju')).toBe(true);
expect(personas.has('cursor-agent')).toBe(true);
const containers = buildContainersFromEvents(ctx.store);
expect(containers.size).toBe(2);
expect(containers.has('openclaw-neko')).toBe(true);
expect(containers.has('cursor-neko')).toBe(true);
const bindings = buildBindingsFromEvents(ctx.store);
expect(bindings.size).toBe(2);
expect(bindings.get('xiaoju')?.containerId).toBe('openclaw-neko');
expect(bindings.get('cursor-agent')?.containerId).toBe('cursor-neko');
const topics = buildTopicsFromEvents(ctx.store);
expect(topics.size).toBe(2);
for (const [, topic] of topics) {
expect(topic.status).toBe('closed');
expect(topic.summary).toBeTruthy();
}
// Council histories are complete
expect(ctx.councilResult!.history).toHaveLength(2);
expect(ctx.reviewResult!.history).toHaveLength(2);
});
});
-181
View File
@@ -1,181 +0,0 @@
import type { LlmClient, LlmResponse } from '../llm-client.js';
import type { BrokerEffect } from '../rules/task-rule.js';
import type { PulseStore } from '../store.js';
import type { PersonaState, TaskState } from '../task-events.js';
export interface BrokerExecutorOptions {
llmClient: LlmClient;
routineStore: PulseStore;
getPersonas?: () => Map<string, PersonaState>;
}
const DEFAULT_AGENT_IDS = ['cursor'];
function buildBrokerTools(agentIds: string[]) {
return [
{
type: 'function' as const,
function: {
name: 'assign_task',
description: 'Assign a task to an agent',
parameters: {
type: 'object',
properties: {
taskId: { type: 'string' },
assigneeId: {
type: 'string',
enum: agentIds,
},
},
required: ['taskId', 'assigneeId'],
},
},
},
];
}
function buildBrokerSystemPrompt(
personas: Map<string, PersonaState> | null,
): string {
if (personas && personas.size > 0) {
const agentLines = Array.from(personas.values())
.map(
(p) =>
`- ${p.personaId} (${p.name}): container=${p.container}, capabilities=[${p.capabilities.join(', ')}]`,
)
.join('\n');
return `You are a task broker. Given a list of pending tasks, decide which agent to assign each task to.
Available agents:
${agentLines}
For each task, call assign_task with the taskId and assigneeId.
Assign all tasks — do not skip any. Match tasks to agents based on their capabilities.`;
}
return `You are a task broker. Given a list of pending tasks, decide which agent to assign each task to.
Currently available agents: ["cursor"]
For each task, call assign_task with the taskId and assigneeId.
Assign all tasks — do not skip any.`;
}
/**
* Create a broker executor that uses an LLM to assign pending tasks.
*
* Flow:
* 1. Read pending task details from routineStore
* 2. Read project details from routineStore
* 3. Call LLM to decide assigneeId for each task
* 4. Write task-assigned events to routineStore
*/
export function createBrokerExecutor(opts: BrokerExecutorOptions) {
const { llmClient, routineStore, getPersonas } = opts;
return async function executeBroker(effect: BrokerEffect): Promise<void> {
if (effect.kind !== 'broker') return;
const brokerSessionId = `broker-${Date.now()}`;
const taskEvents = routineStore.queryByKind('task-created');
taskEvents.sort((a, b) => a.occurredAt - b.occurredAt);
const taskMap = new Map<string, TaskState>();
for (const ev of taskEvents) {
if (!ev.meta) continue;
const meta = JSON.parse(ev.meta);
taskMap.set(meta.taskId, {
taskId: meta.taskId,
projectId: meta.projectId,
title: meta.title,
description: meta.description ?? '',
type: meta.type ?? 'action',
priority: meta.priority ?? 0,
creatorId: meta.creatorId ?? '',
status: 'pending',
createdAt: ev.occurredAt,
updatedAt: ev.occurredAt,
});
}
const tasks = effect.taskIds
.map((id) => taskMap.get(id))
.filter((t): t is TaskState => t !== undefined);
if (tasks.length === 0) return;
for (const task of tasks) {
routineStore.appendEvent({
occurredAt: Date.now(),
kind: 'task-routing',
meta: JSON.stringify({
taskId: task.taskId,
brokerSessionId,
}),
});
}
const taskSummary = tasks
.map(
(t) =>
`- taskId=${t.taskId} type=${t.type} priority=${t.priority} title="${t.title}"`,
)
.join('\n');
// Build dynamic tools and prompt from personas (or fallback to defaults)
const personas = getPersonas ? getPersonas() : null;
const agentIds =
personas && personas.size > 0
? Array.from(personas.keys())
: DEFAULT_AGENT_IDS;
const brokerTools = buildBrokerTools(agentIds);
const brokerSystemPrompt = buildBrokerSystemPrompt(personas);
let response: LlmResponse;
try {
response = await llmClient.chat({
messages: [
{ role: 'system', content: brokerSystemPrompt },
{ role: 'user', content: `Pending tasks:\n${taskSummary}` },
],
tools: brokerTools,
});
} catch (err) {
for (const task of tasks) {
routineStore.appendEvent({
occurredAt: Date.now(),
kind: 'task-responded',
meta: JSON.stringify({
taskId: task.taskId,
assigneeId: 'broker',
result: `Routing failed: ${err instanceof Error ? err.message : String(err)}`,
}),
});
}
return;
}
if (!response.tool_calls) return;
for (const tc of response.tool_calls) {
if (tc.function.name !== 'assign_task') continue;
let args: { taskId?: string; assigneeId?: string };
try {
args = JSON.parse(tc.function.arguments);
} catch {
continue;
}
if (!args.taskId || !args.assigneeId) continue;
routineStore.appendEvent({
occurredAt: Date.now(),
kind: 'task-assigned',
meta: JSON.stringify({
taskId: args.taskId,
assigneeId: args.assigneeId,
assignedBy: 'broker',
}),
});
}
};
}
-4
View File
@@ -1,7 +1,3 @@
export {
type BrokerExecutorOptions,
createBrokerExecutor,
} from './broker.js';
export {
executeSurvivalEffect,
type SurvivalEffect,
-56
View File
@@ -932,66 +932,10 @@ export {
createAgentLoopRule,
} from './rules/agent-loop.js';
// ── Task Rule ───────────────────────────────────────────────────
export {
type BrokerEffect,
type CursorTaskEffect,
createTaskRule,
} from './rules/task-rule.js';
// ── Broker Executor ─────────────────────────────────────────────
export {
type BrokerExecutorOptions,
createBrokerExecutor,
} from './executors/broker.js';
// ── Persona Registry ────────────────────────────────────────────
export { buildPersonasFromEvents } from './persona.js';
// ── Container Registry ──────────────────────────────────────────
export type {
Container,
ContainerRegisteredMeta,
ContainerStatusChangedMeta,
} from './container.js';
export { buildContainersFromEvents } from './container.js';
// ── Persona × Container Binding ─────────────────────────────────
export type {
PersonaBinding,
PersonaBoundMeta,
PersonaUnboundMeta,
} from './binding.js';
export { buildBindingsFromEvents, resolveRole } from './binding.js';
// ── Council / Moderator ─────────────────────────────────────────
export type { CouncilOptions, CouncilResult } from './council.js';
export { runCouncil } from './council.js';
export type {
CouncilMessage,
ModeratorDecision,
ModeratorFn,
} from './moderator.js';
export type { LlmModeratorOptions } from './moderators/llm-moderator.js';
export { createLlmModerator } from './moderators/llm-moderator.js';
export { createRoundRobinModerator } from './moderators/round-robin.js';
// ── Topic ───────────────────────────────────────────────────────
export type {
Topic,
TopicClosedMeta,
TopicCreatedMeta,
TopicStatus,
} from './topic.js';
export { buildTopicsFromEvents } from './topic.js';
// ── Council v2: TopicType ────────────────────────────────────────
export type { CodingTaskContext } from './topics/coding-task.js';
-26
View File
@@ -1,26 +0,0 @@
import type { PersonaState } from './task-events.js';
/** A message in the Council context — only Role speech, no Moderator actions */
export interface CouncilMessage {
personaId: string;
content: string;
timestamp: number;
}
/** Moderator 决策结果 */
export type ModeratorDecision =
| { action: 'speak'; personaId: string }
| { action: 'add'; persona: PersonaState }
| { action: 'close'; summary?: string }
| {
action: 'spawn';
topicId: string;
title: string;
participants: PersonaState[];
};
/** Moderator 纯函数签名 */
export type ModeratorFn = (
participants: PersonaState[],
history: CouncilMessage[],
) => Promise<ModeratorDecision>;
@@ -1,193 +0,0 @@
import type { LlmClient } from '../llm-client.js';
import type {
CouncilMessage,
ModeratorDecision,
ModeratorFn,
} from '../moderator.js';
import type { ContainerType, PersonaState } from '../task-events.js';
function buildModeratorTools(participantIds: string[]) {
return [
{
type: 'function' as const,
function: {
name: 'next_speaker',
description: 'Select the next participant to speak',
parameters: {
type: 'object',
properties: {
personaId: {
type: 'string',
enum: participantIds,
description: 'The persona ID of the next speaker',
},
},
required: ['personaId'],
},
},
},
{
type: 'function' as const,
function: {
name: 'add_member',
description: 'Add a new member to the council',
parameters: {
type: 'object',
properties: {
personaId: { type: 'string', description: 'Unique persona ID' },
name: { type: 'string', description: 'Display name' },
container: {
type: 'string',
enum: ['openclaw', 'cursor', 'claude-code', 'hermes'],
description: 'Container type',
},
capabilities: {
type: 'array',
items: { type: 'string' },
description: 'List of capabilities',
},
},
required: ['personaId', 'name', 'container', 'capabilities'],
},
},
},
{
type: 'function' as const,
function: {
name: 'close_council',
description: 'Close the council session',
parameters: {
type: 'object',
properties: {
summary: { type: 'string', description: 'Optional summary' },
},
},
},
},
{
type: 'function' as const,
function: {
name: 'spawn_sub_topic',
description:
'Create a sub-topic and spawn a nested council to discuss it',
parameters: {
type: 'object',
properties: {
topicId: {
type: 'string',
description: 'Unique ID for the sub-topic',
},
title: { type: 'string', description: 'Title of the sub-topic' },
participantIds: {
type: 'array',
items: { type: 'string', enum: participantIds },
description: 'IDs of participants to include in the sub-council',
},
},
required: ['topicId', 'title', 'participantIds'],
},
},
},
];
}
function buildSystemPrompt(
participants: PersonaState[],
history: CouncilMessage[],
): string {
const participantLines = participants
.map(
(p) =>
`- ${p.personaId} (${p.name}): container=${p.container}, capabilities=[${p.capabilities.join(', ')}]`,
)
.join('\n');
const historyLines =
history.length > 0
? history.map((m) => `[${m.personaId}]: ${m.content}`).join('\n')
: '(no messages yet)';
return `You are a council moderator. Your job is to decide who speaks next, whether to add new members, or whether to close the council.
Participants:
${participantLines}
Conversation so far:
${historyLines}
Decide the next action by calling one of the available tools.`;
}
export interface LlmModeratorOptions {
llmClient: LlmClient;
}
/**
* Create an LLM-driven moderator.
*
* Uses tool calls to decide: next_speaker, add_member, or close_council.
*/
export function createLlmModerator(opts: LlmModeratorOptions): ModeratorFn {
const { llmClient } = opts;
return async (
participants: PersonaState[],
history: CouncilMessage[],
): Promise<ModeratorDecision> => {
const participantIds = participants.map((p) => p.personaId);
const tools = buildModeratorTools(participantIds);
const systemPrompt = buildSystemPrompt(participants, history);
const response = await llmClient.chat({
messages: [
{ role: 'system', content: systemPrompt },
{ role: 'user', content: 'Decide the next action for this council.' },
],
tools,
});
// Parse tool call
if (response.tool_calls && response.tool_calls.length > 0) {
const call = response.tool_calls[0]!;
const args = JSON.parse(call.function.arguments);
switch (call.function.name) {
case 'next_speaker':
return { action: 'speak', personaId: args.personaId };
case 'add_member':
return {
action: 'add',
persona: {
personaId: args.personaId,
name: args.name,
container: args.container as ContainerType,
capabilities: args.capabilities,
registeredAt: Date.now(),
updatedAt: Date.now(),
},
};
case 'close_council':
return { action: 'close', summary: args.summary };
case 'spawn_sub_topic': {
const subParticipants = participants.filter((p) =>
(args.participantIds as string[]).includes(p.personaId),
);
return {
action: 'spawn',
topicId: args.topicId,
title: args.title,
participants: subParticipants,
};
}
default:
return { action: 'close', summary: 'Unknown tool call — closing' };
}
}
// No tool call — default to close
return {
action: 'close',
summary: response.content || 'LLM did not call a tool',
};
};
}
@@ -1,52 +0,0 @@
import { describe, expect, test } from 'bun:test';
import type { PersonaState } from '../task-events.js';
import { createRoundRobinModerator } from './round-robin.js';
function makePersona(id: string): PersonaState {
return {
personaId: id,
name: id,
container: 'openclaw',
capabilities: [],
registeredAt: Date.now(),
updatedAt: Date.now(),
};
}
describe('round-robin moderator', () => {
test('cycles through participants in order', async () => {
const mod = createRoundRobinModerator();
const participants = [makePersona('a'), makePersona('b'), makePersona('c')];
const d1 = await mod(participants, []);
expect(d1).toEqual({ action: 'speak', personaId: 'a' });
const d2 = await mod(participants, [
{ personaId: 'a', content: 'hi', timestamp: 1 },
]);
expect(d2).toEqual({ action: 'speak', personaId: 'b' });
const d3 = await mod(participants, [
{ personaId: 'a', content: 'hi', timestamp: 1 },
{ personaId: 'b', content: 'hey', timestamp: 2 },
]);
expect(d3).toEqual({ action: 'speak', personaId: 'c' });
});
test('closes after all have spoken', async () => {
const mod = createRoundRobinModerator();
const participants = [makePersona('a'), makePersona('b')];
const history = [
{ personaId: 'a', content: 'hi', timestamp: 1 },
{ personaId: 'b', content: 'hey', timestamp: 2 },
];
const decision = await mod(participants, history);
expect(decision.action).toBe('close');
});
test('closes immediately with no participants', async () => {
const mod = createRoundRobinModerator();
const decision = await mod([], []);
expect(decision.action).toBe('close');
});
});
@@ -1,36 +0,0 @@
import type {
CouncilMessage,
ModeratorDecision,
ModeratorFn,
} from '../moderator.js';
import type { PersonaState } from '../task-events.js';
/**
* Create a round-robin moderator.
*
* Each participant speaks once in order, then the council closes.
* Never adds new members.
*/
export function createRoundRobinModerator(): ModeratorFn {
return async (
participants: PersonaState[],
history: CouncilMessage[],
): Promise<ModeratorDecision> => {
if (participants.length === 0) {
return { action: 'close', summary: 'No participants' };
}
// Find who has already spoken
const spoken = new Set(history.map((m) => m.personaId));
// Find the first participant who hasn't spoken yet
for (const p of participants) {
if (!spoken.has(p.personaId)) {
return { action: 'speak', personaId: p.personaId };
}
}
// Everyone has spoken — close
return { action: 'close', summary: 'All participants have spoken' };
};
}
+1 -1
View File
@@ -47,7 +47,7 @@ export function buildPersonasFromEvents(
});
} else if (ev.kind === 'persona-updated') {
const existing = personas.get(personaId);
if (!existing) continue; // can't update what doesn't exist
if (!existing) continue;
const m = meta as unknown as PersonaUpdatedMeta;
if (m.container !== undefined) existing.container = m.container;
if (m.capabilities !== undefined) existing.capabilities = m.capabilities;
-251
View File
@@ -1,251 +0,0 @@
import { beforeEach, describe, expect, jest, test } from 'bun:test';
import type { Sensed } from '../index.js';
import type { PendingTasksData, TaskState } from '../task-events.js';
import { createTaskRule } from './task-rule.js';
type Snapshot = Record<string, unknown> & { timestamp: number };
type Effect = Record<string, unknown>;
function makeTask(overrides: Partial<TaskState> = {}): TaskState {
return {
taskId: 'task-1',
projectId: 'proj-1',
title: 'Fix bug',
description: 'Fix the login bug',
type: 'bug',
priority: 0,
creatorId: 'user-1',
status: 'pending',
createdAt: Date.now(),
updatedAt: Date.now(),
...overrides,
};
}
function makeSnapshot(overrides: Record<string, unknown> = {}): Snapshot {
return { timestamp: Date.now(), ...overrides };
}
function makePendingTasksSlice(tasks: TaskState[]): Record<string, unknown> {
return {
'pending-tasks': {
data: {
pendingCount: tasks.length,
tasks,
byProject: {},
checkedAt: Date.now(),
},
} as Sensed<PendingTasksData>,
};
}
const mockInner = jest.fn(async () => [[], 15000] as [Effect[], number]);
describe('createTaskRule', () => {
beforeEach(() => {
mockInner.mockClear();
mockInner.mockResolvedValue([[], 15000]);
});
test('no pending-tasks → pass through, no new effects', async () => {
const rule = createTaskRule<Snapshot, Effect>();
const prev = makeSnapshot();
const curr = makeSnapshot();
const [effects, tickMs] = await rule(prev, curr, mockInner);
expect(effects).toEqual([]);
expect(tickMs).toBe(15000);
});
test('first tick: pending tasks → emit broker effect', async () => {
const rule = createTaskRule<Snapshot, Effect>();
const tasks = [makeTask({ taskId: 't1' }), makeTask({ taskId: 't2' })];
const curr = makeSnapshot(makePendingTasksSlice(tasks));
const [effects] = await rule(makeSnapshot(), curr, mockInner);
expect(effects).toContainEqual(
expect.objectContaining({
kind: 'broker',
taskIds: ['t1', 't2'],
}),
);
});
test('first tick: assigned tasks → emit cursor effect per task', async () => {
const rule = createTaskRule<Snapshot, Effect>();
const tasks = [
makeTask({
taskId: 't1',
projectId: 'proj-a',
status: 'assigned',
assigneeId: 'cursor',
}),
makeTask({
taskId: 't2',
projectId: 'proj-b',
status: 'assigned',
assigneeId: 'cursor',
}),
];
const curr = makeSnapshot(makePendingTasksSlice(tasks));
const [effects] = await rule(makeSnapshot(), curr, mockInner);
expect(effects).toContainEqual(
expect.objectContaining({
kind: 'cursor',
taskId: 't1',
projectId: 'proj-a',
}),
);
expect(effects).toContainEqual(
expect.objectContaining({
kind: 'cursor',
taskId: 't2',
projectId: 'proj-b',
}),
);
});
test('first tick: mix of pending and assigned → both broker and cursor effects', async () => {
const rule = createTaskRule<Snapshot, Effect>();
const tasks = [
makeTask({ taskId: 't1', status: 'pending' }),
makeTask({
taskId: 't2',
status: 'assigned',
assigneeId: 'cursor',
projectId: 'proj-2',
}),
];
const curr = makeSnapshot(makePendingTasksSlice(tasks));
const [effects] = await rule(makeSnapshot(), curr, mockInner);
expect(effects).toContainEqual(
expect.objectContaining({ kind: 'broker', taskIds: ['t1'] }),
);
expect(effects).toContainEqual(
expect.objectContaining({ kind: 'cursor', taskId: 't2' }),
);
});
test('already pending task → no broker effect (diff-driven)', async () => {
const rule = createTaskRule<Snapshot, Effect>();
const tasks = [makeTask({ taskId: 't1', status: 'pending' })];
const prev = makeSnapshot(makePendingTasksSlice(tasks));
const curr = makeSnapshot(makePendingTasksSlice(tasks));
const [effects] = await rule(prev, curr, mockInner);
const brokerEffects = effects.filter((e) => e.kind === 'broker');
expect(brokerEffects).toHaveLength(0);
});
test('already assigned task → no cursor effect (diff-driven)', async () => {
const rule = createTaskRule<Snapshot, Effect>();
const tasks = [
makeTask({ taskId: 't1', status: 'assigned', assigneeId: 'cursor' }),
];
const prev = makeSnapshot(makePendingTasksSlice(tasks));
const curr = makeSnapshot(makePendingTasksSlice(tasks));
const [effects] = await rule(prev, curr, mockInner);
const cursorEffects = effects.filter((e) => e.kind === 'cursor');
expect(cursorEffects).toHaveLength(0);
});
test('new pending task appears → emit broker', async () => {
const rule = createTaskRule<Snapshot, Effect>();
const prevTasks = [makeTask({ taskId: 't1', status: 'pending' })];
const currTasks = [
makeTask({ taskId: 't1', status: 'pending' }),
makeTask({ taskId: 't2', status: 'pending' }),
];
const prev = makeSnapshot(makePendingTasksSlice(prevTasks));
const curr = makeSnapshot(makePendingTasksSlice(currTasks));
const [effects] = await rule(prev, curr, mockInner);
expect(effects).toContainEqual(
expect.objectContaining({ kind: 'broker', taskIds: ['t2'] }),
);
});
test('task transitions from pending to assigned → emit cursor', async () => {
const rule = createTaskRule<Snapshot, Effect>();
const prevTasks = [
makeTask({ taskId: 't1', status: 'pending', projectId: 'proj-a' }),
];
const currTasks = [
makeTask({
taskId: 't1',
status: 'assigned',
assigneeId: 'cursor',
projectId: 'proj-a',
}),
];
const prev = makeSnapshot(makePendingTasksSlice(prevTasks));
const curr = makeSnapshot(makePendingTasksSlice(currTasks));
const [effects] = await rule(prev, curr, mockInner);
expect(effects).toContainEqual(
expect.objectContaining({
kind: 'cursor',
taskId: 't1',
projectId: 'proj-a',
}),
);
// No broker since t1 is no longer pending in curr
const brokerEffects = effects.filter((e) => e.kind === 'broker');
expect(brokerEffects).toHaveLength(0);
});
test('task transitions from routing to assigned → emit cursor', async () => {
const rule = createTaskRule<Snapshot, Effect>();
const prevTasks = [
makeTask({ taskId: 't1', status: 'routing', projectId: 'proj-a' }),
];
const currTasks = [
makeTask({
taskId: 't1',
status: 'assigned',
assigneeId: 'cursor',
projectId: 'proj-a',
}),
];
const prev = makeSnapshot(makePendingTasksSlice(prevTasks));
const curr = makeSnapshot(makePendingTasksSlice(currTasks));
const [effects] = await rule(prev, curr, mockInner);
expect(effects).toContainEqual(
expect.objectContaining({
kind: 'cursor',
taskId: 't1',
projectId: 'proj-a',
}),
);
});
test('inner rule effects are preserved', async () => {
const innerEffect = { kind: 'collect', key: 'system' };
mockInner.mockResolvedValueOnce([[innerEffect], 10000]);
const rule = createTaskRule<Snapshot, Effect>();
const tasks = [
makeTask({ taskId: 't1', status: 'assigned', assigneeId: 'cursor' }),
];
const curr = makeSnapshot(makePendingTasksSlice(tasks));
const [effects, tickMs] = await rule(makeSnapshot(), curr, mockInner);
expect(effects[0]).toEqual(innerEffect);
expect(effects.length).toBe(2);
expect(tickMs).toBe(10000);
});
});
-75
View File
@@ -1,75 +0,0 @@
import type { Rule, Sensed } from '../index.js';
import type { PendingTasksData, TaskState } from '../task-events.js';
export interface BrokerEffect {
kind: 'broker';
taskIds: string[];
}
export interface CursorTaskEffect {
kind: 'cursor';
taskId: string;
projectId: string;
}
type Snapshot = Record<string, unknown> & { timestamp: number };
type Effect = Record<string, unknown>;
/**
* Lightweight task rule — no LLM call.
*
* - pending tasks → emit broker effect (idempotent: skip if any task is routing)
* - assigned tasks → emit cursor effect per task
*/
export function createTaskRule<
S extends Snapshot = Snapshot,
E extends Effect = Effect,
>(): Rule<S, E> {
return async (prev, curr, inner) => {
const [effects, tickMs] = await inner(prev, curr);
const newEffects: E[] = [];
const currTasks =
(curr['pending-tasks'] as Sensed<PendingTasksData> | undefined)?.data
?.tasks ?? [];
const prevTasks =
(prev['pending-tasks'] as Sensed<PendingTasksData> | undefined)?.data
?.tasks ?? [];
// Broker: only emit for tasks that newly entered pending
const prevPendingIds = new Set(
prevTasks
.filter((t: TaskState) => t.status === 'pending')
.map((t: TaskState) => t.taskId),
);
const newPending = currTasks.filter(
(t: TaskState) => t.status === 'pending' && !prevPendingIds.has(t.taskId),
);
if (newPending.length > 0) {
newEffects.push({
kind: 'broker',
taskIds: newPending.map((t: TaskState) => t.taskId),
} as unknown as E);
}
// Cursor: only emit for tasks that newly became assigned
const prevAssignedIds = new Set(
prevTasks
.filter((t: TaskState) => t.status === 'assigned')
.map((t: TaskState) => t.taskId),
);
const newAssigned = currTasks.filter(
(t: TaskState) =>
t.status === 'assigned' && !prevAssignedIds.has(t.taskId),
);
for (const task of newAssigned) {
newEffects.push({
kind: 'cursor',
taskId: task.taskId,
projectId: task.projectId,
} as unknown as E);
}
return [[...effects, ...newEffects], tickMs];
};
}
-113
View File
@@ -1,113 +0,0 @@
import { describe, expect, test } from 'bun:test';
import { mkdtempSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { createStore } from './store.js';
import type { TopicClosedMeta, TopicCreatedMeta } from './topic.js';
import { buildTopicsFromEvents } from './topic.js';
function createTestStore() {
const tmp = mkdtempSync(join(tmpdir(), 'topic-test-'));
return createStore({
eventsDbPath: join(tmp, 'events.db'),
objectsDir: join(tmp, 'objects'),
});
}
describe('buildTopicsFromEvents', () => {
test('create topic', () => {
const store = createTestStore();
const meta: TopicCreatedMeta = {
topicId: 't1',
title: 'Design review',
createdBy: 'alice',
};
store.appendEvent({
occurredAt: 1000,
kind: 'topic-created',
key: 't1',
meta: JSON.stringify(meta),
});
const topics = buildTopicsFromEvents(store);
expect(topics.size).toBe(1);
const t = topics.get('t1')!;
expect(t.topicId).toBe('t1');
expect(t.title).toBe('Design review');
expect(t.createdBy).toBe('alice');
expect(t.status).toBe('open');
expect(t.createdAt).toBe(1000);
expect(t.parentTopicId).toBeUndefined();
store.close();
});
test('create child topic with parentTopicId', () => {
const store = createTestStore();
store.appendEvent({
occurredAt: 1000,
kind: 'topic-created',
key: 't1',
meta: JSON.stringify({
topicId: 't1',
title: 'Parent',
createdBy: 'alice',
} satisfies TopicCreatedMeta),
});
store.appendEvent({
occurredAt: 2000,
kind: 'topic-created',
key: 't2',
meta: JSON.stringify({
topicId: 't2',
parentTopicId: 't1',
title: 'Child',
createdBy: 'bob',
} satisfies TopicCreatedMeta),
});
const topics = buildTopicsFromEvents(store);
expect(topics.size).toBe(2);
const child = topics.get('t2')!;
expect(child.parentTopicId).toBe('t1');
expect(child.title).toBe('Child');
expect(child.status).toBe('open');
store.close();
});
test('close topic with summary', () => {
const store = createTestStore();
store.appendEvent({
occurredAt: 1000,
kind: 'topic-created',
key: 't1',
meta: JSON.stringify({
topicId: 't1',
title: 'Bug triage',
createdBy: 'alice',
} satisfies TopicCreatedMeta),
});
store.appendEvent({
occurredAt: 2000,
kind: 'topic-closed',
key: 't1',
meta: JSON.stringify({
topicId: 't1',
summary: 'Resolved all P0 bugs',
} satisfies TopicClosedMeta),
});
const topics = buildTopicsFromEvents(store);
const t = topics.get('t1')!;
expect(t.status).toBe('closed');
expect(t.closedAt).toBe(2000);
expect(t.summary).toBe('Resolved all P0 bugs');
store.close();
});
test('empty event stream', () => {
const store = createTestStore();
const topics = buildTopicsFromEvents(store);
expect(topics.size).toBe(0);
store.close();
});
});
-77
View File
@@ -1,77 +0,0 @@
import type { PulseStore } from './store.js';
export type TopicStatus = 'open' | 'closed';
export interface Topic {
topicId: string;
parentTopicId?: string;
title: string;
status: TopicStatus;
createdBy: string;
createdAt: number;
closedAt?: number;
summary?: string;
}
export interface TopicCreatedMeta {
topicId: string;
parentTopicId?: string;
title: string;
createdBy: string;
}
export interface TopicClosedMeta {
topicId: string;
summary?: string;
}
/**
* Build a Map of Topic from topic-created and topic-closed events.
*
* - topic-created → open topic
* - topic-closed → close + attach summary
* - Events are merged in occurredAt order.
*/
export function buildTopicsFromEvents(store: PulseStore): Map<string, Topic> {
const created = store.queryByKind('topic-created');
const closed = store.queryByKind('topic-closed');
const allEvents = [...created, ...closed];
allEvents.sort((a, b) => a.occurredAt - b.occurredAt);
const topics = new Map<string, Topic>();
for (const ev of allEvents) {
if (!ev.meta) continue;
let meta: Record<string, unknown>;
try {
meta = JSON.parse(ev.meta);
} catch {
continue;
}
const topicId = meta.topicId as string;
if (!topicId) continue;
if (ev.kind === 'topic-created') {
const m = meta as unknown as TopicCreatedMeta;
topics.set(topicId, {
topicId: m.topicId,
parentTopicId: m.parentTopicId,
title: m.title,
status: 'open',
createdBy: m.createdBy,
createdAt: ev.occurredAt,
});
} else if (ev.kind === 'topic-closed') {
const existing = topics.get(topicId);
if (!existing) continue;
const m = meta as unknown as TopicClosedMeta;
existing.status = 'closed';
existing.closedAt = ev.occurredAt;
if (m.summary !== undefined) existing.summary = m.summary;
}
}
return topics;
}