refactor: remove v1 Council code (Container/Binding/Task/Moderator)
This commit is contained in:
@@ -1,63 +0,0 @@
|
||||
# Council Model
|
||||
|
||||
The Council is a multi-agent deliberation framework built on Pulse's
|
||||
event-sourced store. Personas discuss Topics under the guidance of a Moderator,
|
||||
each executing inside an isolated Container.
|
||||
|
||||
## Core Concepts
|
||||
|
||||
### Persona Registry (`src/persona.ts`)
|
||||
A **Persona** is an agent identity carrying a name, container type, and
|
||||
capability list. `buildPersonasFromEvents()` projects `persona-registered` and
|
||||
`persona-updated` events (merged by `occurredAt`) into a `Map<personaId, PersonaState>`.
|
||||
Re-registering the same ID is an idempotent overwrite; updates patch only the
|
||||
fields provided.
|
||||
|
||||
### Container Registry (`src/container.ts`)
|
||||
A **Container** is a runtime environment (`cursor`, `claude-code`, `hermes`, …)
|
||||
described by `containerId`, `type`, `host`, `tools[]`, and `status`
|
||||
(`online | offline`). `buildContainersFromEvents()` replays
|
||||
`container-registered` and `container-status-changed` events into a live map.
|
||||
|
||||
### Persona × Container Binding (`src/binding.ts`)
|
||||
A **Binding** ties one Persona to exactly one Container at a time.
|
||||
`persona-bound` creates or overwrites the link; `persona-unbound` deletes it.
|
||||
`resolveRole(personaId, personas, bindings, containers)` returns the full
|
||||
`{ persona, container }` pair needed for dispatch, or `null` if any lookup fails.
|
||||
|
||||
### Topic / Sub-Topic (`src/topic.ts`)
|
||||
A **Topic** is a named discussion thread (`topicId`, `title`, `createdBy`)
|
||||
that transitions through `open → closed` and may carry a `summary`.
|
||||
`parentTopicId` enables nesting — the Moderator can spawn sub-topics, creating
|
||||
recursive councils whose summaries fold back into the parent.
|
||||
|
||||
### Moderator (`src/moderator.ts`)
|
||||
The Moderator is a pure async function:
|
||||
`(participants: PersonaState[], history: CouncilMessage[]) → ModeratorDecision`.
|
||||
Four decision types:
|
||||
|
||||
| Decision | Effect |
|
||||
|----------|--------|
|
||||
| `speak` | Select the next speaker by `personaId` |
|
||||
| `add` | Invite a new Persona into the council |
|
||||
| `spawn` | Create a sub-topic with its own participant set |
|
||||
| `close` | End deliberation, optionally attaching a summary |
|
||||
|
||||
Built-in strategies:
|
||||
|
||||
| Strategy | File | Behavior |
|
||||
|----------|------|----------|
|
||||
| Round-robin | `src/moderators/round-robin.ts` | Each participant speaks once in registration order, then closes |
|
||||
| LLM-driven | `src/moderators/llm-moderator.ts` | An LLM picks the next action via tool calls (`next_speaker`, `add_member`, `spawn_sub_topic`, `close_council`) |
|
||||
|
||||
### Council Execution Loop (`src/council.ts`)
|
||||
`runCouncil(opts: CouncilOptions): Promise<CouncilResult>` drives deliberation:
|
||||
|
||||
1. Call `moderator(participants, history)` to obtain a `ModeratorDecision`.
|
||||
2. Execute the decision:
|
||||
- **speak** → invoke `onSpeak(personaId)`, append result to history.
|
||||
- **add** → push persona into participants, invoke `onAddMember` callback.
|
||||
- **spawn** → invoke `onSpawn(topicId, title, participants)` recursively;
|
||||
the sub-council's summary is appended to parent history.
|
||||
- **close** → return `{ history, summary, rounds }`.
|
||||
3. Repeat until `close` is returned or `maxRounds` (default 10) is exceeded.
|
||||
@@ -1 +0,0 @@
|
||||
Council v2 serialization test passed.
|
||||
@@ -1,88 +0,0 @@
|
||||
import type { Container } from './container.js';
|
||||
import type { PulseStore } from './store.js';
|
||||
import type { PersonaState } from './task-events.js';
|
||||
|
||||
export interface PersonaBinding {
|
||||
personaId: string;
|
||||
containerId: string;
|
||||
boundAt: number;
|
||||
}
|
||||
|
||||
export interface PersonaBoundMeta {
|
||||
personaId: string;
|
||||
containerId: string;
|
||||
}
|
||||
|
||||
export interface PersonaUnboundMeta {
|
||||
personaId: string;
|
||||
containerId: string;
|
||||
reason?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a Map of PersonaBinding from persona-bound and persona-unbound events.
|
||||
*
|
||||
* A Persona can only be bound to one Container at a time.
|
||||
* persona-bound overwrites any previous binding; persona-unbound removes it.
|
||||
*/
|
||||
export function buildBindingsFromEvents(
|
||||
store: PulseStore,
|
||||
): Map<string, PersonaBinding> {
|
||||
const bound = store.queryByKind('persona-bound');
|
||||
const unbound = store.queryByKind('persona-unbound');
|
||||
|
||||
const allEvents = [...bound, ...unbound];
|
||||
allEvents.sort((a, b) => a.occurredAt - b.occurredAt);
|
||||
|
||||
const bindings = new Map<string, PersonaBinding>();
|
||||
|
||||
for (const ev of allEvents) {
|
||||
if (!ev.meta) continue;
|
||||
let meta: Record<string, unknown>;
|
||||
try {
|
||||
meta = JSON.parse(ev.meta);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
|
||||
const personaId = meta.personaId as string;
|
||||
if (!personaId) continue;
|
||||
|
||||
if (ev.kind === 'persona-bound') {
|
||||
const m = meta as unknown as PersonaBoundMeta;
|
||||
bindings.set(personaId, {
|
||||
personaId: m.personaId,
|
||||
containerId: m.containerId,
|
||||
boundAt: ev.occurredAt,
|
||||
});
|
||||
} else if (ev.kind === 'persona-unbound') {
|
||||
bindings.delete(personaId);
|
||||
}
|
||||
}
|
||||
|
||||
return bindings;
|
||||
}
|
||||
|
||||
/**
|
||||
* Resolve a Role = Persona × Container.
|
||||
*
|
||||
* Looks up the persona, finds its current binding, then resolves the container.
|
||||
* Returns null if persona not found, not bound, or container doesn't exist.
|
||||
*/
|
||||
export function resolveRole(
|
||||
personaId: string,
|
||||
personas: Map<string, PersonaState>,
|
||||
bindings: Map<string, PersonaBinding>,
|
||||
containers: Map<string, Container>,
|
||||
): { persona: PersonaState; container: Container } | null {
|
||||
const persona = personas.get(personaId);
|
||||
if (!persona) return null;
|
||||
|
||||
const binding = bindings.get(personaId);
|
||||
if (!binding) return null;
|
||||
|
||||
const container = containers.get(binding.containerId);
|
||||
if (!container) return null;
|
||||
|
||||
return { persona, container };
|
||||
}
|
||||
@@ -1,73 +0,0 @@
|
||||
import type { PulseStore } from './store.js';
|
||||
import type { ContainerStatus, ContainerType } from './task-events.js';
|
||||
|
||||
export interface Container {
|
||||
containerId: string;
|
||||
type: ContainerType;
|
||||
host: string;
|
||||
status: ContainerStatus;
|
||||
tools: string[];
|
||||
registeredAt: number;
|
||||
updatedAt: number;
|
||||
}
|
||||
|
||||
export interface ContainerRegisteredMeta {
|
||||
containerId: string;
|
||||
type: ContainerType;
|
||||
host: string;
|
||||
tools: string[];
|
||||
}
|
||||
|
||||
export interface ContainerStatusChangedMeta {
|
||||
containerId: string;
|
||||
status: ContainerStatus;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a Map of Container from container-registered and container-status-changed events.
|
||||
*/
|
||||
export function buildContainersFromEvents(
|
||||
store: PulseStore,
|
||||
): Map<string, Container> {
|
||||
const registered = store.queryByKind('container-registered');
|
||||
const statusChanged = store.queryByKind('container-status-changed');
|
||||
|
||||
const allEvents = [...registered, ...statusChanged];
|
||||
allEvents.sort((a, b) => a.occurredAt - b.occurredAt);
|
||||
|
||||
const containers = new Map<string, Container>();
|
||||
|
||||
for (const ev of allEvents) {
|
||||
if (!ev.meta) continue;
|
||||
let meta: Record<string, unknown>;
|
||||
try {
|
||||
meta = JSON.parse(ev.meta);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
|
||||
const containerId = meta.containerId as string;
|
||||
if (!containerId) continue;
|
||||
|
||||
if (ev.kind === 'container-registered') {
|
||||
const m = meta as unknown as ContainerRegisteredMeta;
|
||||
containers.set(containerId, {
|
||||
containerId: m.containerId,
|
||||
type: m.type,
|
||||
host: m.host,
|
||||
status: 'online',
|
||||
tools: m.tools,
|
||||
registeredAt: ev.occurredAt,
|
||||
updatedAt: ev.occurredAt,
|
||||
});
|
||||
} else if (ev.kind === 'container-status-changed') {
|
||||
const existing = containers.get(containerId);
|
||||
if (!existing) continue;
|
||||
const m = meta as unknown as ContainerStatusChangedMeta;
|
||||
existing.status = m.status;
|
||||
existing.updatedAt = ev.occurredAt;
|
||||
}
|
||||
}
|
||||
|
||||
return containers;
|
||||
}
|
||||
@@ -1,92 +0,0 @@
|
||||
import type { CouncilMessage, ModeratorFn } from './moderator.js';
|
||||
import type { PersonaState } from './task-events.js';
|
||||
|
||||
export interface CouncilOptions {
|
||||
moderator: ModeratorFn;
|
||||
participants: PersonaState[];
|
||||
/** Pre-existing council history (e.g. from task-responded events) */
|
||||
history?: CouncilMessage[];
|
||||
/** Callback when a participant speaks — returns their speech content */
|
||||
onSpeak: (personaId: string) => Promise<string>;
|
||||
/** Callback when a new member is added */
|
||||
onAddMember?: (persona: PersonaState) => Promise<void>;
|
||||
/** Callback when a sub-topic is spawned — returns the sub-council result */
|
||||
onSpawn?: (
|
||||
topicId: string,
|
||||
title: string,
|
||||
participants: PersonaState[],
|
||||
) => Promise<CouncilResult>;
|
||||
/** Maximum rounds to prevent infinite loops (default 10) */
|
||||
maxRounds?: number;
|
||||
}
|
||||
|
||||
export interface CouncilResult {
|
||||
history: CouncilMessage[];
|
||||
summary?: string;
|
||||
rounds: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a Council deliberation loop.
|
||||
*
|
||||
* 1. Call moderator(participants, history)
|
||||
* 2. Based on decision:
|
||||
* - speak → onSpeak → append to history → back to 1
|
||||
* - add → onAddMember → append to participants → back to 1
|
||||
* - close → return final history + summary
|
||||
* 3. Force close after maxRounds
|
||||
*/
|
||||
export async function runCouncil(opts: CouncilOptions): Promise<CouncilResult> {
|
||||
const { moderator, onSpeak, onAddMember, onSpawn, maxRounds = 10 } = opts;
|
||||
|
||||
const participants = [...opts.participants];
|
||||
const history: CouncilMessage[] = [...(opts.history ?? [])];
|
||||
let rounds = 0;
|
||||
|
||||
while (rounds < maxRounds) {
|
||||
rounds++;
|
||||
|
||||
const decision = await moderator(participants, history);
|
||||
|
||||
switch (decision.action) {
|
||||
case 'speak': {
|
||||
const content = await onSpeak(decision.personaId);
|
||||
history.push({
|
||||
personaId: decision.personaId,
|
||||
content,
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
break;
|
||||
}
|
||||
case 'add': {
|
||||
participants.push(decision.persona);
|
||||
if (onAddMember) {
|
||||
await onAddMember(decision.persona);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'close': {
|
||||
return { history, summary: decision.summary, rounds };
|
||||
}
|
||||
case 'spawn': {
|
||||
if (onSpawn) {
|
||||
const subResult = await onSpawn(
|
||||
decision.topicId,
|
||||
decision.title,
|
||||
decision.participants,
|
||||
);
|
||||
history.push({
|
||||
personaId: `sub-council:${decision.topicId}`,
|
||||
content:
|
||||
subResult.summary ?? 'Sub-council completed without summary',
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// maxRounds exceeded — force close
|
||||
return { history, summary: 'Max rounds exceeded', rounds };
|
||||
}
|
||||
@@ -1,9 +1,9 @@
|
||||
#!/usr/bin/env bun
|
||||
/**
|
||||
* Pulse Council Demo — Full Pipeline
|
||||
* Pulse Council Demo — v2 TopicType Pipeline
|
||||
*
|
||||
* Executable script (not a test) that simulates the complete
|
||||
* Council dispatch chain driven by the Rules engine.
|
||||
* Council v2 dispatch chain driven by TopicType rules.
|
||||
*
|
||||
* Run: cd ~/repos/pulse && bun packages/pulse/src/e2e/council-demo.ts
|
||||
*
|
||||
@@ -13,22 +13,11 @@
|
||||
import { mkdtempSync, rmSync } from 'node:fs';
|
||||
import { tmpdir } from 'node:os';
|
||||
import { join } from 'node:path';
|
||||
import { createBrokerExecutor } from '../executors/broker.js';
|
||||
import type { Sensed } from '../index.js';
|
||||
import {
|
||||
buildBindingsFromEvents,
|
||||
buildContainersFromEvents,
|
||||
buildPersonasFromEvents,
|
||||
buildTopicsFromEvents,
|
||||
createRoundRobinModerator,
|
||||
createStore,
|
||||
runCouncil,
|
||||
} from '../index.js';
|
||||
import type { LlmClient } from '../llm-client.js';
|
||||
import { createOpenAiLlmClient } from '../llm-client.js';
|
||||
import { createTaskRule } from '../rules/task-rule.js';
|
||||
import type { PendingTasksData } from '../task-events.js';
|
||||
import { buildPendingTasksFromEvents } from '../watchers/pending-tasks-projection.js';
|
||||
import { createStore } from '../store.js';
|
||||
import { createCodingTaskType } from '../topics/coding-task.js';
|
||||
import { createArchitectRole } from '../topics/roles/architect-llm.js';
|
||||
import { createTopicRule } from '../topics/topic-rule-adapter.js';
|
||||
|
||||
// ── Helpers ────────────────────────────────────────────────────
|
||||
|
||||
@@ -42,17 +31,6 @@ function logItem(msg: string) {
|
||||
console.log(` ✅ ${msg}`);
|
||||
}
|
||||
|
||||
function logSpeech(persona: string, content: string) {
|
||||
console.log(` ${persona}: "${content}"`);
|
||||
}
|
||||
|
||||
function assert(condition: boolean, msg: string) {
|
||||
if (!condition) {
|
||||
console.error(` ❌ ASSERTION FAILED: ${msg}`);
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
// ── CLI Args ───────────────────────────────────────────────────
|
||||
|
||||
const LIVE = process.argv.includes('--live');
|
||||
@@ -62,437 +40,6 @@ const KEEP = process.argv.includes('--keep');
|
||||
// ── Main ───────────────────────────────────────────────────────
|
||||
|
||||
async function main() {
|
||||
const useCustomDb = !!DB_PATH;
|
||||
const tmpDir = useCustomDb
|
||||
? undefined
|
||||
: mkdtempSync(join(tmpdir(), 'pulse-council-demo-'));
|
||||
const dbPath = DB_PATH ?? join(tmpDir!, 'events.db');
|
||||
const objDir = DB_PATH
|
||||
? join(require('node:path').dirname(DB_PATH), 'objects')
|
||||
: join(tmpDir!, 'objects');
|
||||
|
||||
const store = createStore({
|
||||
eventsDbPath: dbPath,
|
||||
objectsDir: objDir,
|
||||
});
|
||||
|
||||
try {
|
||||
console.log();
|
||||
console.log(`🍊 === Pulse Council Demo — Full Pipeline ===`);
|
||||
console.log(SEP);
|
||||
|
||||
// ── Step 1: Register Personas ──────────────────────────────
|
||||
|
||||
console.log();
|
||||
log('📝', 'Step 1: Register Personas');
|
||||
|
||||
const now = Date.now();
|
||||
store.appendEvent({
|
||||
occurredAt: now,
|
||||
kind: 'persona-registered',
|
||||
key: 'xiaoju',
|
||||
meta: JSON.stringify({
|
||||
personaId: 'xiaoju',
|
||||
name: '小橘',
|
||||
container: 'openclaw',
|
||||
capabilities: ['coordination', 'review'],
|
||||
}),
|
||||
});
|
||||
logItem('xiaoju (coordination, review)');
|
||||
|
||||
store.appendEvent({
|
||||
occurredAt: now + 1,
|
||||
kind: 'persona-registered',
|
||||
key: 'cursor-agent',
|
||||
meta: JSON.stringify({
|
||||
personaId: 'cursor-agent',
|
||||
name: 'Cursor Agent',
|
||||
container: 'cursor',
|
||||
capabilities: ['coding', 'refactor'],
|
||||
}),
|
||||
});
|
||||
logItem('cursor-agent (coding, refactor)');
|
||||
|
||||
const personas = buildPersonasFromEvents(store);
|
||||
assert(personas.size === 2, 'Expected 2 personas');
|
||||
|
||||
// ── Step 2: Register Containers ────────────────────────────
|
||||
|
||||
console.log();
|
||||
log('📦', 'Step 2: Register Containers');
|
||||
|
||||
store.appendEvent({
|
||||
occurredAt: now + 2,
|
||||
kind: 'container-registered',
|
||||
key: 'openclaw-neko',
|
||||
meta: JSON.stringify({
|
||||
containerId: 'openclaw-neko',
|
||||
type: 'openclaw',
|
||||
host: 'neko-vm',
|
||||
tools: ['a2a', 'feishu'],
|
||||
}),
|
||||
});
|
||||
logItem('openclaw-neko (openclaw @ neko-vm)');
|
||||
|
||||
store.appendEvent({
|
||||
occurredAt: now + 3,
|
||||
kind: 'container-registered',
|
||||
key: 'cursor-neko',
|
||||
meta: JSON.stringify({
|
||||
containerId: 'cursor-neko',
|
||||
type: 'cursor',
|
||||
host: 'neko-vm',
|
||||
tools: ['file-edit', 'terminal'],
|
||||
}),
|
||||
});
|
||||
logItem('cursor-neko (cursor @ neko-vm)');
|
||||
|
||||
const containers = buildContainersFromEvents(store);
|
||||
assert(containers.size === 2, 'Expected 2 containers');
|
||||
|
||||
// ── Step 3: Bind Persona × Container ───────────────────────
|
||||
|
||||
console.log();
|
||||
log('🔗', 'Step 3: Bind Persona × Container');
|
||||
|
||||
store.appendEvent({
|
||||
occurredAt: now + 4,
|
||||
kind: 'persona-bound',
|
||||
key: 'xiaoju',
|
||||
meta: JSON.stringify({
|
||||
personaId: 'xiaoju',
|
||||
containerId: 'openclaw-neko',
|
||||
}),
|
||||
});
|
||||
logItem('xiaoju → openclaw-neko');
|
||||
|
||||
store.appendEvent({
|
||||
occurredAt: now + 5,
|
||||
kind: 'persona-bound',
|
||||
key: 'cursor-agent',
|
||||
meta: JSON.stringify({
|
||||
personaId: 'cursor-agent',
|
||||
containerId: 'cursor-neko',
|
||||
}),
|
||||
});
|
||||
logItem('cursor-agent → cursor-neko');
|
||||
|
||||
const bindings = buildBindingsFromEvents(store);
|
||||
assert(bindings.size === 2, 'Expected 2 bindings');
|
||||
|
||||
// ── Step 4: Create Task ────────────────────────────────────
|
||||
|
||||
console.log();
|
||||
log('📋', 'Step 4: Create Task');
|
||||
|
||||
store.appendEvent({
|
||||
occurredAt: now + 10,
|
||||
kind: 'task-created',
|
||||
key: 'task-fix-login-bug',
|
||||
meta: JSON.stringify({
|
||||
taskId: 'task-fix-login-bug',
|
||||
projectId: 'proj-neko',
|
||||
title: 'Fix login redirect bug',
|
||||
description: 'Login page redirects to /undefined after auth',
|
||||
type: 'bug',
|
||||
priority: 5,
|
||||
creatorId: 'xiaoju',
|
||||
}),
|
||||
});
|
||||
logItem('task-fix-login-bug: "Fix login redirect bug"');
|
||||
|
||||
// ── Step 5: Rules Engine — detect pending task ─────────────
|
||||
|
||||
console.log();
|
||||
log('⚙️', 'Step 5: Rules Engine — detect pending task');
|
||||
|
||||
// Build pending tasks from events to construct snapshot
|
||||
const pendingData = buildPendingTasksFromEvents(store);
|
||||
assert(pendingData.pendingCount > 0, 'Expected pending tasks');
|
||||
|
||||
// Construct snapshot with pending-tasks sensed data
|
||||
type Snapshot = Record<string, unknown> & { timestamp: number };
|
||||
type Effect = Record<string, unknown>;
|
||||
|
||||
const currSnapshot: Snapshot = {
|
||||
timestamp: Date.now(),
|
||||
'pending-tasks': {
|
||||
data: pendingData,
|
||||
refreshedAt: Date.now(),
|
||||
} as Sensed<PendingTasksData>,
|
||||
};
|
||||
|
||||
const prevSnapshot: Snapshot = { timestamp: now - 1000 };
|
||||
|
||||
const taskRule = createTaskRule<Snapshot, Effect>();
|
||||
const baseInner = async (
|
||||
_p: Snapshot,
|
||||
_c: Snapshot,
|
||||
): Promise<[Effect[], number]> => [[], 15000];
|
||||
|
||||
const [effects] = await taskRule(prevSnapshot, currSnapshot, baseInner);
|
||||
|
||||
const brokerEffect = effects.find((e) => e.kind === 'broker');
|
||||
assert(!!brokerEffect, 'Expected broker effect from task rule');
|
||||
logItem(
|
||||
`Task Rule produced broker effect: taskIds=${JSON.stringify((brokerEffect as { taskIds: string[] }).taskIds)}`,
|
||||
);
|
||||
|
||||
// ── Step 6: Broker Executor — assign task ──────────────────
|
||||
|
||||
console.log();
|
||||
const llmMode = LIVE ? 'LIVE LLM' : 'mock LLM';
|
||||
log('🤖', `Step 6: Broker Executor — assign task (${llmMode})`);
|
||||
|
||||
let llmClient: LlmClient;
|
||||
|
||||
if (LIVE) {
|
||||
const baseUrl =
|
||||
process.env.PULSE_LLM_BASE_URL ?? process.env.OPENAI_BASE_URL;
|
||||
const apiKey =
|
||||
process.env.PULSE_LLM_API_KEY ?? process.env.OPENAI_API_KEY;
|
||||
const model = process.env.PULSE_LLM_MODEL ?? 'gpt-4o-mini';
|
||||
|
||||
if (!baseUrl || !apiKey) {
|
||||
console.error(
|
||||
' ❌ --live requires PULSE_LLM_BASE_URL + PULSE_LLM_API_KEY (or OPENAI_BASE_URL + OPENAI_API_KEY)',
|
||||
);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
logItem(`LLM: ${model} @ ${baseUrl}`);
|
||||
llmClient = createOpenAiLlmClient({ baseUrl, apiKey, model });
|
||||
} else {
|
||||
llmClient = {
|
||||
chat: async (_req) => ({
|
||||
tool_calls: [
|
||||
{
|
||||
id: 'call_1',
|
||||
function: {
|
||||
name: 'assign_task',
|
||||
arguments: JSON.stringify({
|
||||
taskId: 'task-fix-login-bug',
|
||||
assigneeId: 'cursor-agent',
|
||||
}),
|
||||
},
|
||||
},
|
||||
],
|
||||
}),
|
||||
};
|
||||
}
|
||||
|
||||
const executeBroker = createBrokerExecutor({
|
||||
llmClient,
|
||||
routineStore: store,
|
||||
getPersonas: () => buildPersonasFromEvents(store),
|
||||
});
|
||||
|
||||
await executeBroker(brokerEffect as { kind: 'broker'; taskIds: string[] });
|
||||
|
||||
// Verify events written
|
||||
const routingEvents = store.queryByKind('task-routing');
|
||||
assert(routingEvents.length > 0, 'Expected task-routing event');
|
||||
logItem('task-routing event written');
|
||||
|
||||
logItem('LLM decided: assign task-fix-login-bug → cursor-agent');
|
||||
|
||||
const assignedEvents = store.queryByKind('task-assigned');
|
||||
assert(assignedEvents.length > 0, 'Expected task-assigned event');
|
||||
const assignedMeta = JSON.parse(assignedEvents[0].meta!);
|
||||
assert(
|
||||
assignedMeta.assigneeId === 'cursor-agent',
|
||||
'Expected cursor-agent assignment',
|
||||
);
|
||||
logItem('task-assigned event written');
|
||||
|
||||
// ── Step 7: Cursor completes coding ────────────────────────
|
||||
|
||||
console.log();
|
||||
log('💻', 'Step 7: Cursor completes coding');
|
||||
|
||||
store.appendEvent({
|
||||
occurredAt: Date.now(),
|
||||
kind: 'task-responded',
|
||||
key: 'task-fix-login-bug',
|
||||
meta: JSON.stringify({
|
||||
taskId: 'task-fix-login-bug',
|
||||
assigneeId: 'cursor-agent',
|
||||
result:
|
||||
'Fixed auth.ts line 42, added 3 test cases. PR #123 ready for review.',
|
||||
}),
|
||||
});
|
||||
logItem('task-responded: "Fixed auth.ts line 42..."');
|
||||
|
||||
// ── Step 8: Council Deliberation ───────────────────────────
|
||||
|
||||
console.log();
|
||||
log('🗣️', 'Step 8: Council Deliberation');
|
||||
|
||||
// Create topic for the task
|
||||
store.appendEvent({
|
||||
occurredAt: Date.now(),
|
||||
kind: 'topic-created',
|
||||
key: 'task-fix-login-bug',
|
||||
meta: JSON.stringify({
|
||||
topicId: 'task-fix-login-bug',
|
||||
title: 'Fix login redirect bug',
|
||||
createdBy: 'xiaoju',
|
||||
}),
|
||||
});
|
||||
|
||||
const councilSpeeches: Record<string, string[]> = {
|
||||
xiaoju: ['Login bug fixed, please review PR #123'],
|
||||
'cursor-agent': ['All tests pass, ready for merge'],
|
||||
};
|
||||
const councilIdx: Record<string, number> = { xiaoju: 0, 'cursor-agent': 0 };
|
||||
|
||||
const councilResult = await runCouncil({
|
||||
moderator: createRoundRobinModerator(),
|
||||
participants: [personas.get('xiaoju')!, personas.get('cursor-agent')!],
|
||||
onSpeak: async (personaId: string) => {
|
||||
const idx = councilIdx[personaId] ?? 0;
|
||||
councilIdx[personaId] = idx + 1;
|
||||
return councilSpeeches[personaId]?.[idx] ?? '...';
|
||||
},
|
||||
});
|
||||
|
||||
for (const msg of councilResult.history) {
|
||||
logSpeech(msg.personaId, msg.content);
|
||||
}
|
||||
logItem(`Council closed: ${councilResult.rounds} rounds`);
|
||||
|
||||
// ── Step 9: Review Sub-Council ─────────────────────────────
|
||||
|
||||
console.log();
|
||||
log('🔍', 'Step 9: Review Sub-Council');
|
||||
|
||||
store.appendEvent({
|
||||
occurredAt: Date.now(),
|
||||
kind: 'topic-created',
|
||||
key: 'review-pr-123',
|
||||
meta: JSON.stringify({
|
||||
topicId: 'review-pr-123',
|
||||
parentTopicId: 'task-fix-login-bug',
|
||||
title: 'Review PR #123',
|
||||
createdBy: 'xiaoju',
|
||||
}),
|
||||
});
|
||||
logItem('Sub-topic: review-pr-123 (parent: task-fix-login-bug)');
|
||||
|
||||
const reviewSpeeches: Record<string, string[]> = {
|
||||
xiaoju: ['Edge case: expired tokens?'],
|
||||
'cursor-agent': ['Added token expiry check'],
|
||||
};
|
||||
const reviewIdx: Record<string, number> = { xiaoju: 0, 'cursor-agent': 0 };
|
||||
|
||||
const reviewResult = await runCouncil({
|
||||
moderator: createRoundRobinModerator(),
|
||||
participants: [personas.get('xiaoju')!, personas.get('cursor-agent')!],
|
||||
onSpeak: async (personaId: string) => {
|
||||
const idx = reviewIdx[personaId] ?? 0;
|
||||
reviewIdx[personaId] = idx + 1;
|
||||
return reviewSpeeches[personaId]?.[idx] ?? '...';
|
||||
},
|
||||
});
|
||||
|
||||
for (const msg of reviewResult.history) {
|
||||
logSpeech(msg.personaId, msg.content);
|
||||
}
|
||||
logItem(`Review closed: ${reviewResult.rounds} rounds`);
|
||||
|
||||
// ── Step 10: Close Topics ──────────────────────────────────
|
||||
|
||||
console.log();
|
||||
log('🏁', 'Step 10: Close Topics');
|
||||
|
||||
store.appendEvent({
|
||||
occurredAt: Date.now(),
|
||||
kind: 'topic-closed',
|
||||
key: 'review-pr-123',
|
||||
meta: JSON.stringify({
|
||||
topicId: 'review-pr-123',
|
||||
summary: 'PR approved with token expiry fix',
|
||||
}),
|
||||
});
|
||||
logItem('review-pr-123: closed (PR approved with token expiry fix)');
|
||||
|
||||
store.appendEvent({
|
||||
occurredAt: Date.now() + 1,
|
||||
kind: 'topic-closed',
|
||||
key: 'task-fix-login-bug',
|
||||
meta: JSON.stringify({
|
||||
topicId: 'task-fix-login-bug',
|
||||
summary: 'Login bug fixed, PR merged',
|
||||
}),
|
||||
});
|
||||
logItem('task-fix-login-bug: closed (Login bug fixed, PR merged)');
|
||||
|
||||
// ── Event Log ──────────────────────────────────────────────
|
||||
|
||||
console.log();
|
||||
console.log(`📊 === Event Log (chronological) ===`);
|
||||
|
||||
const allEvents = store.getAfter(0);
|
||||
allEvents.sort((a, b) => a.occurredAt - b.occurredAt);
|
||||
|
||||
for (const ev of allEvents) {
|
||||
const key =
|
||||
ev.key ??
|
||||
(ev.meta
|
||||
? (JSON.parse(ev.meta).taskId ??
|
||||
JSON.parse(ev.meta).personaId ??
|
||||
JSON.parse(ev.meta).containerId ??
|
||||
'')
|
||||
: '');
|
||||
console.log(
|
||||
` #${String(ev.id).padStart(2)} ${ev.kind.padEnd(22)} ${key}`,
|
||||
);
|
||||
}
|
||||
|
||||
// ── Final State ────────────────────────────────────────────
|
||||
|
||||
console.log();
|
||||
console.log(`📊 === Final State ===`);
|
||||
|
||||
const finalPersonas = buildPersonasFromEvents(store);
|
||||
const finalContainers = buildContainersFromEvents(store);
|
||||
const finalBindings = buildBindingsFromEvents(store);
|
||||
const finalTopics = buildTopicsFromEvents(store);
|
||||
|
||||
const personaNames = Array.from(finalPersonas.keys()).join(', ');
|
||||
const containerNames = Array.from(finalContainers.keys()).join(', ');
|
||||
const allClosed = Array.from(finalTopics.values()).every(
|
||||
(t) => t.status === 'closed',
|
||||
);
|
||||
|
||||
console.log(` Personas: ${finalPersonas.size} (${personaNames})`);
|
||||
console.log(` Containers: ${finalContainers.size} (${containerNames})`);
|
||||
console.log(` Bindings: ${finalBindings.size}`);
|
||||
console.log(
|
||||
` Topics: ${finalTopics.size} (${allClosed ? 'all closed' : 'some open'})`,
|
||||
);
|
||||
|
||||
console.log();
|
||||
console.log(SEP);
|
||||
console.log(`✅ All steps completed successfully!`);
|
||||
console.log();
|
||||
} finally {
|
||||
store.close();
|
||||
if (KEEP || useCustomDb) {
|
||||
console.log(`📁 Events DB preserved: ${dbPath}`);
|
||||
} else if (tmpDir) {
|
||||
rmSync(tmpDir, { recursive: true, force: true });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── V2 Demo ────────────────────────────────────────────────────
|
||||
|
||||
async function mainV2() {
|
||||
const { createCodingTaskType } = await import('../topics/coding-task.js');
|
||||
const { createTopicRule } = await import('../topics/topic-rule-adapter.js');
|
||||
|
||||
const _V2 = true;
|
||||
const useCustomDb = !!DB_PATH;
|
||||
const tmpDir = useCustomDb
|
||||
? null
|
||||
@@ -524,45 +71,13 @@ async function mainV2() {
|
||||
const model = process.env.PULSE_LLM_MODEL ?? 'qwen-plus';
|
||||
if (!baseUrl || !apiKey) {
|
||||
console.error(
|
||||
'❌ --live --v2 requires PULSE_LLM_BASE_URL + PULSE_LLM_API_KEY',
|
||||
'❌ --live requires PULSE_LLM_BASE_URL + PULSE_LLM_API_KEY',
|
||||
);
|
||||
process.exit(1);
|
||||
}
|
||||
const llm = createOpenAiLlmClient({ baseUrl, apiKey, model });
|
||||
codingTask = createCodingTaskType({
|
||||
architectFn: async (ctx, s) => {
|
||||
const resp = await llm.chat({
|
||||
messages: [
|
||||
{
|
||||
role: 'system',
|
||||
content:
|
||||
'You are a software architect. Analyze the task and suggest target files and approach. Reply in JSON: {"analysis": "...", "targetFiles": ["..."]}',
|
||||
},
|
||||
{
|
||||
role: 'user',
|
||||
content: `Task: ${ctx.title}\nDescription: ${ctx.description}\nRepo: ${ctx.repoDir}`,
|
||||
},
|
||||
],
|
||||
});
|
||||
let parsed: { analysis?: string; targetFiles?: string[] };
|
||||
try {
|
||||
parsed = JSON.parse(resp.content ?? '{}');
|
||||
} catch {
|
||||
parsed = {
|
||||
analysis: resp.content ?? 'No analysis',
|
||||
targetFiles: [],
|
||||
};
|
||||
}
|
||||
s.appendEvent({
|
||||
occurredAt: Date.now(),
|
||||
kind: 'coding.analyzed',
|
||||
meta: JSON.stringify({
|
||||
topicId: ctx.topicId,
|
||||
analysis: parsed.analysis ?? 'No analysis',
|
||||
targetFiles: parsed.targetFiles ?? [],
|
||||
}),
|
||||
});
|
||||
},
|
||||
architectFn: createArchitectRole(llm),
|
||||
});
|
||||
} else {
|
||||
codingTask = createCodingTaskType();
|
||||
@@ -650,8 +165,7 @@ async function mainV2() {
|
||||
}
|
||||
}
|
||||
|
||||
const isV2 = process.argv.includes('--v2');
|
||||
(isV2 ? mainV2 : main)().catch((err) => {
|
||||
main().catch((err) => {
|
||||
console.error('❌ Demo failed:', err);
|
||||
process.exit(1);
|
||||
});
|
||||
|
||||
@@ -1,366 +0,0 @@
|
||||
/**
|
||||
* E2E T10 — Council full pipeline: task → code → review → close
|
||||
*
|
||||
* Simulates the real coding task lifecycle:
|
||||
* Persona registration → Container registration → Binding →
|
||||
* Topic creation → Council deliberation → Coding response →
|
||||
* Review sub-topic → Close
|
||||
*
|
||||
* Uses real SQLite store (no mocks).
|
||||
*
|
||||
* Run: bun test packages/pulse/src/e2e/t10-council-e2e.test.ts
|
||||
*/
|
||||
|
||||
import { afterAll, beforeAll, describe, expect, it } from 'bun:test';
|
||||
import { mkdtempSync, rmSync } from 'node:fs';
|
||||
import { tmpdir } from 'node:os';
|
||||
import { join } from 'node:path';
|
||||
import type { CouncilResult } from '../council.js';
|
||||
import {
|
||||
buildBindingsFromEvents,
|
||||
buildContainersFromEvents,
|
||||
buildPersonasFromEvents,
|
||||
buildTopicsFromEvents,
|
||||
createRoundRobinModerator,
|
||||
createStore,
|
||||
type PulseStore,
|
||||
resolveRole,
|
||||
runCouncil,
|
||||
} from '../index.js';
|
||||
|
||||
// ── Shared Context ─────────────────────────────────────────────
|
||||
|
||||
interface Ctx {
|
||||
tmpDir: string;
|
||||
store: PulseStore;
|
||||
councilResult?: CouncilResult;
|
||||
reviewResult?: CouncilResult;
|
||||
}
|
||||
|
||||
const ctx: Ctx = {} as Ctx;
|
||||
|
||||
// ── Test Suite ─────────────────────────────────────────────────
|
||||
|
||||
describe('E2E: Council full pipeline — task → code → review → close', () => {
|
||||
beforeAll(() => {
|
||||
ctx.tmpDir = mkdtempSync(join(tmpdir(), 'pulse-council-e2e-'));
|
||||
ctx.store = createStore({
|
||||
eventsDbPath: join(ctx.tmpDir, 'events.db'),
|
||||
objectsDir: join(ctx.tmpDir, 'objects'),
|
||||
});
|
||||
});
|
||||
|
||||
afterAll(() => {
|
||||
ctx.store.close();
|
||||
rmSync(ctx.tmpDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
// ── Step 1: Register Personas ──────────────────────────────────
|
||||
|
||||
it('Step 1: register personas — xiaoju + cursor-agent', () => {
|
||||
const now = Date.now();
|
||||
|
||||
ctx.store.appendEvent({
|
||||
occurredAt: now,
|
||||
kind: 'persona-registered',
|
||||
key: 'xiaoju',
|
||||
meta: JSON.stringify({
|
||||
personaId: 'xiaoju',
|
||||
name: '小橘',
|
||||
container: 'openclaw',
|
||||
capabilities: ['coordination', 'review'],
|
||||
}),
|
||||
});
|
||||
|
||||
ctx.store.appendEvent({
|
||||
occurredAt: now + 1,
|
||||
kind: 'persona-registered',
|
||||
key: 'cursor-agent',
|
||||
meta: JSON.stringify({
|
||||
personaId: 'cursor-agent',
|
||||
name: 'Cursor Agent',
|
||||
container: 'cursor',
|
||||
capabilities: ['coding', 'refactor'],
|
||||
}),
|
||||
});
|
||||
|
||||
const personas = buildPersonasFromEvents(ctx.store);
|
||||
expect(personas.size).toBe(2);
|
||||
expect(personas.get('xiaoju')?.capabilities).toEqual([
|
||||
'coordination',
|
||||
'review',
|
||||
]);
|
||||
expect(personas.get('cursor-agent')?.capabilities).toEqual([
|
||||
'coding',
|
||||
'refactor',
|
||||
]);
|
||||
});
|
||||
|
||||
// ── Step 2: Register Containers ────────────────────────────────
|
||||
|
||||
it('Step 2: register containers — openclaw-neko + cursor-neko', () => {
|
||||
const now = Date.now();
|
||||
|
||||
ctx.store.appendEvent({
|
||||
occurredAt: now,
|
||||
kind: 'container-registered',
|
||||
key: 'openclaw-neko',
|
||||
meta: JSON.stringify({
|
||||
containerId: 'openclaw-neko',
|
||||
type: 'openclaw',
|
||||
host: 'neko-vm',
|
||||
tools: ['a2a', 'feishu'],
|
||||
}),
|
||||
});
|
||||
|
||||
ctx.store.appendEvent({
|
||||
occurredAt: now + 1,
|
||||
kind: 'container-registered',
|
||||
key: 'cursor-neko',
|
||||
meta: JSON.stringify({
|
||||
containerId: 'cursor-neko',
|
||||
type: 'cursor',
|
||||
host: 'neko-vm',
|
||||
tools: ['file-edit', 'terminal'],
|
||||
}),
|
||||
});
|
||||
|
||||
const containers = buildContainersFromEvents(ctx.store);
|
||||
expect(containers.size).toBe(2);
|
||||
expect(containers.get('openclaw-neko')?.type).toBe('openclaw');
|
||||
expect(containers.get('cursor-neko')?.tools).toEqual([
|
||||
'file-edit',
|
||||
'terminal',
|
||||
]);
|
||||
});
|
||||
|
||||
// ── Step 3: Bind Persona × Container ───────────────────────────
|
||||
|
||||
it('Step 3: bind personas to containers + resolveRole', () => {
|
||||
const now = Date.now();
|
||||
|
||||
ctx.store.appendEvent({
|
||||
occurredAt: now,
|
||||
kind: 'persona-bound',
|
||||
key: 'xiaoju',
|
||||
meta: JSON.stringify({
|
||||
personaId: 'xiaoju',
|
||||
containerId: 'openclaw-neko',
|
||||
}),
|
||||
});
|
||||
|
||||
ctx.store.appendEvent({
|
||||
occurredAt: now + 1,
|
||||
kind: 'persona-bound',
|
||||
key: 'cursor-agent',
|
||||
meta: JSON.stringify({
|
||||
personaId: 'cursor-agent',
|
||||
containerId: 'cursor-neko',
|
||||
}),
|
||||
});
|
||||
|
||||
const bindings = buildBindingsFromEvents(ctx.store);
|
||||
expect(bindings.size).toBe(2);
|
||||
expect(bindings.get('xiaoju')?.containerId).toBe('openclaw-neko');
|
||||
expect(bindings.get('cursor-agent')?.containerId).toBe('cursor-neko');
|
||||
|
||||
// Verify Role resolution
|
||||
const personas = buildPersonasFromEvents(ctx.store);
|
||||
const containers = buildContainersFromEvents(ctx.store);
|
||||
|
||||
const xiaojuRole = resolveRole('xiaoju', personas, bindings, containers);
|
||||
expect(xiaojuRole).not.toBeNull();
|
||||
expect(xiaojuRole!.persona.personaId).toBe('xiaoju');
|
||||
expect(xiaojuRole!.container.containerId).toBe('openclaw-neko');
|
||||
|
||||
const cursorRole = resolveRole(
|
||||
'cursor-agent',
|
||||
personas,
|
||||
bindings,
|
||||
containers,
|
||||
);
|
||||
expect(cursorRole).not.toBeNull();
|
||||
expect(cursorRole!.container.type).toBe('cursor');
|
||||
});
|
||||
|
||||
// ── Step 4: Create Topic (coding task) ─────────────────────────
|
||||
|
||||
it('Step 4: create topic — task-fix-login-bug', () => {
|
||||
ctx.store.appendEvent({
|
||||
occurredAt: Date.now(),
|
||||
kind: 'topic-created',
|
||||
key: 'task-fix-login-bug',
|
||||
meta: JSON.stringify({
|
||||
topicId: 'task-fix-login-bug',
|
||||
title: 'Fix login redirect bug',
|
||||
createdBy: 'xiaoju',
|
||||
}),
|
||||
});
|
||||
|
||||
const topics = buildTopicsFromEvents(ctx.store);
|
||||
expect(topics.size).toBe(1);
|
||||
|
||||
const topic = topics.get('task-fix-login-bug');
|
||||
expect(topic).not.toBeNull();
|
||||
expect(topic!.title).toBe('Fix login redirect bug');
|
||||
expect(topic!.status).toBe('open');
|
||||
expect(topic!.createdBy).toBe('xiaoju');
|
||||
});
|
||||
|
||||
// ── Step 5: Council deliberation — assign task ─────────────────
|
||||
|
||||
it('Step 5: council deliberation — round-robin assignment', async () => {
|
||||
const personas = buildPersonasFromEvents(ctx.store);
|
||||
const participants = [
|
||||
personas.get('xiaoju')!,
|
||||
personas.get('cursor-agent')!,
|
||||
];
|
||||
|
||||
const speeches: Record<string, string> = {
|
||||
xiaoju: 'Login redirect bug in auth.ts line 42. cursor-agent please fix.',
|
||||
'cursor-agent': "Understood. I'll fix auth.ts and add tests.",
|
||||
};
|
||||
|
||||
ctx.councilResult = await runCouncil({
|
||||
moderator: createRoundRobinModerator(),
|
||||
participants,
|
||||
onSpeak: async (personaId: string) => speeches[personaId]!,
|
||||
});
|
||||
|
||||
expect(ctx.councilResult.history).toHaveLength(2);
|
||||
expect(ctx.councilResult.history[0]!.personaId).toBe('xiaoju');
|
||||
expect(ctx.councilResult.history[1]!.personaId).toBe('cursor-agent');
|
||||
// 2 speak rounds + 1 close decision = 3 rounds
|
||||
expect(ctx.councilResult.rounds).toBe(3);
|
||||
expect(ctx.councilResult.summary).toBe('All participants have spoken');
|
||||
});
|
||||
|
||||
// ── Step 6: Simulate coding completion ─────────────────────────
|
||||
|
||||
it('Step 6: task-responded — cursor-agent completes coding', () => {
|
||||
const ev = ctx.store.appendEvent({
|
||||
occurredAt: Date.now(),
|
||||
kind: 'task-responded',
|
||||
key: 'task-fix-login-bug',
|
||||
meta: JSON.stringify({
|
||||
taskId: 'task-fix-login-bug',
|
||||
assigneeId: 'cursor-agent',
|
||||
result:
|
||||
'Fixed auth.ts line 42, added 3 test cases. PR #123 ready for review.',
|
||||
}),
|
||||
});
|
||||
|
||||
expect(ev.id).toBeGreaterThan(0);
|
||||
expect(ev.kind).toBe('task-responded');
|
||||
});
|
||||
|
||||
// ── Step 7: Review Council — sub-topic ─────────────────────────
|
||||
|
||||
it('Step 7: review council — sub-topic review-pr-123', async () => {
|
||||
// Create review sub-topic
|
||||
ctx.store.appendEvent({
|
||||
occurredAt: Date.now(),
|
||||
kind: 'topic-created',
|
||||
key: 'review-pr-123',
|
||||
meta: JSON.stringify({
|
||||
topicId: 'review-pr-123',
|
||||
parentTopicId: 'task-fix-login-bug',
|
||||
title: 'Review PR #123',
|
||||
createdBy: 'xiaoju',
|
||||
}),
|
||||
});
|
||||
|
||||
const topics = buildTopicsFromEvents(ctx.store);
|
||||
const reviewTopic = topics.get('review-pr-123');
|
||||
expect(reviewTopic).not.toBeNull();
|
||||
expect(reviewTopic!.parentTopicId).toBe('task-fix-login-bug');
|
||||
|
||||
// Run review council
|
||||
const personas = buildPersonasFromEvents(ctx.store);
|
||||
const participants = [
|
||||
personas.get('xiaoju')!,
|
||||
personas.get('cursor-agent')!,
|
||||
];
|
||||
|
||||
const reviewSpeeches: Record<string, string> = {
|
||||
xiaoju: 'Edge case: what about expired tokens?',
|
||||
'cursor-agent': 'Good catch, added token expiry check and test.',
|
||||
};
|
||||
|
||||
ctx.reviewResult = await runCouncil({
|
||||
moderator: createRoundRobinModerator(),
|
||||
participants,
|
||||
onSpeak: async (personaId: string) => reviewSpeeches[personaId]!,
|
||||
});
|
||||
|
||||
expect(ctx.reviewResult.history).toHaveLength(2);
|
||||
expect(ctx.reviewResult.rounds).toBe(3);
|
||||
expect(ctx.reviewResult.summary).toBe('All participants have spoken');
|
||||
});
|
||||
|
||||
// ── Step 8: Close topics ───────────────────────────────────────
|
||||
|
||||
it('Step 8: close both topics with summaries', () => {
|
||||
const now = Date.now();
|
||||
|
||||
ctx.store.appendEvent({
|
||||
occurredAt: now,
|
||||
kind: 'topic-closed',
|
||||
key: 'review-pr-123',
|
||||
meta: JSON.stringify({
|
||||
topicId: 'review-pr-123',
|
||||
summary: 'PR approved with token expiry fix',
|
||||
}),
|
||||
});
|
||||
|
||||
ctx.store.appendEvent({
|
||||
occurredAt: now + 1,
|
||||
kind: 'topic-closed',
|
||||
key: 'task-fix-login-bug',
|
||||
meta: JSON.stringify({
|
||||
topicId: 'task-fix-login-bug',
|
||||
summary: 'Login bug fixed, PR merged',
|
||||
}),
|
||||
});
|
||||
|
||||
const topics = buildTopicsFromEvents(ctx.store);
|
||||
|
||||
const mainTopic = topics.get('task-fix-login-bug');
|
||||
expect(mainTopic!.status).toBe('closed');
|
||||
expect(mainTopic!.summary).toBe('Login bug fixed, PR merged');
|
||||
|
||||
const reviewTopic = topics.get('review-pr-123');
|
||||
expect(reviewTopic!.status).toBe('closed');
|
||||
expect(reviewTopic!.summary).toBe('PR approved with token expiry fix');
|
||||
});
|
||||
|
||||
// ── Step 9: Final consistency check ────────────────────────────
|
||||
|
||||
it('Step 9: full state consistency — all entities intact', () => {
|
||||
const personas = buildPersonasFromEvents(ctx.store);
|
||||
expect(personas.size).toBe(2);
|
||||
expect(personas.has('xiaoju')).toBe(true);
|
||||
expect(personas.has('cursor-agent')).toBe(true);
|
||||
|
||||
const containers = buildContainersFromEvents(ctx.store);
|
||||
expect(containers.size).toBe(2);
|
||||
expect(containers.has('openclaw-neko')).toBe(true);
|
||||
expect(containers.has('cursor-neko')).toBe(true);
|
||||
|
||||
const bindings = buildBindingsFromEvents(ctx.store);
|
||||
expect(bindings.size).toBe(2);
|
||||
expect(bindings.get('xiaoju')?.containerId).toBe('openclaw-neko');
|
||||
expect(bindings.get('cursor-agent')?.containerId).toBe('cursor-neko');
|
||||
|
||||
const topics = buildTopicsFromEvents(ctx.store);
|
||||
expect(topics.size).toBe(2);
|
||||
for (const [, topic] of topics) {
|
||||
expect(topic.status).toBe('closed');
|
||||
expect(topic.summary).toBeTruthy();
|
||||
}
|
||||
|
||||
// Council histories are complete
|
||||
expect(ctx.councilResult!.history).toHaveLength(2);
|
||||
expect(ctx.reviewResult!.history).toHaveLength(2);
|
||||
});
|
||||
});
|
||||
@@ -1,181 +0,0 @@
|
||||
import type { LlmClient, LlmResponse } from '../llm-client.js';
|
||||
import type { BrokerEffect } from '../rules/task-rule.js';
|
||||
import type { PulseStore } from '../store.js';
|
||||
import type { PersonaState, TaskState } from '../task-events.js';
|
||||
|
||||
export interface BrokerExecutorOptions {
|
||||
llmClient: LlmClient;
|
||||
routineStore: PulseStore;
|
||||
getPersonas?: () => Map<string, PersonaState>;
|
||||
}
|
||||
|
||||
const DEFAULT_AGENT_IDS = ['cursor'];
|
||||
|
||||
function buildBrokerTools(agentIds: string[]) {
|
||||
return [
|
||||
{
|
||||
type: 'function' as const,
|
||||
function: {
|
||||
name: 'assign_task',
|
||||
description: 'Assign a task to an agent',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
taskId: { type: 'string' },
|
||||
assigneeId: {
|
||||
type: 'string',
|
||||
enum: agentIds,
|
||||
},
|
||||
},
|
||||
required: ['taskId', 'assigneeId'],
|
||||
},
|
||||
},
|
||||
},
|
||||
];
|
||||
}
|
||||
|
||||
function buildBrokerSystemPrompt(
|
||||
personas: Map<string, PersonaState> | null,
|
||||
): string {
|
||||
if (personas && personas.size > 0) {
|
||||
const agentLines = Array.from(personas.values())
|
||||
.map(
|
||||
(p) =>
|
||||
`- ${p.personaId} (${p.name}): container=${p.container}, capabilities=[${p.capabilities.join(', ')}]`,
|
||||
)
|
||||
.join('\n');
|
||||
return `You are a task broker. Given a list of pending tasks, decide which agent to assign each task to.
|
||||
|
||||
Available agents:
|
||||
${agentLines}
|
||||
|
||||
For each task, call assign_task with the taskId and assigneeId.
|
||||
Assign all tasks — do not skip any. Match tasks to agents based on their capabilities.`;
|
||||
}
|
||||
return `You are a task broker. Given a list of pending tasks, decide which agent to assign each task to.
|
||||
|
||||
Currently available agents: ["cursor"]
|
||||
|
||||
For each task, call assign_task with the taskId and assigneeId.
|
||||
Assign all tasks — do not skip any.`;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a broker executor that uses an LLM to assign pending tasks.
|
||||
*
|
||||
* Flow:
|
||||
* 1. Read pending task details from routineStore
|
||||
* 2. Read project details from routineStore
|
||||
* 3. Call LLM to decide assigneeId for each task
|
||||
* 4. Write task-assigned events to routineStore
|
||||
*/
|
||||
export function createBrokerExecutor(opts: BrokerExecutorOptions) {
|
||||
const { llmClient, routineStore, getPersonas } = opts;
|
||||
|
||||
return async function executeBroker(effect: BrokerEffect): Promise<void> {
|
||||
if (effect.kind !== 'broker') return;
|
||||
|
||||
const brokerSessionId = `broker-${Date.now()}`;
|
||||
|
||||
const taskEvents = routineStore.queryByKind('task-created');
|
||||
taskEvents.sort((a, b) => a.occurredAt - b.occurredAt);
|
||||
|
||||
const taskMap = new Map<string, TaskState>();
|
||||
for (const ev of taskEvents) {
|
||||
if (!ev.meta) continue;
|
||||
const meta = JSON.parse(ev.meta);
|
||||
taskMap.set(meta.taskId, {
|
||||
taskId: meta.taskId,
|
||||
projectId: meta.projectId,
|
||||
title: meta.title,
|
||||
description: meta.description ?? '',
|
||||
type: meta.type ?? 'action',
|
||||
priority: meta.priority ?? 0,
|
||||
creatorId: meta.creatorId ?? '',
|
||||
status: 'pending',
|
||||
createdAt: ev.occurredAt,
|
||||
updatedAt: ev.occurredAt,
|
||||
});
|
||||
}
|
||||
|
||||
const tasks = effect.taskIds
|
||||
.map((id) => taskMap.get(id))
|
||||
.filter((t): t is TaskState => t !== undefined);
|
||||
|
||||
if (tasks.length === 0) return;
|
||||
|
||||
for (const task of tasks) {
|
||||
routineStore.appendEvent({
|
||||
occurredAt: Date.now(),
|
||||
kind: 'task-routing',
|
||||
meta: JSON.stringify({
|
||||
taskId: task.taskId,
|
||||
brokerSessionId,
|
||||
}),
|
||||
});
|
||||
}
|
||||
|
||||
const taskSummary = tasks
|
||||
.map(
|
||||
(t) =>
|
||||
`- taskId=${t.taskId} type=${t.type} priority=${t.priority} title="${t.title}"`,
|
||||
)
|
||||
.join('\n');
|
||||
|
||||
// Build dynamic tools and prompt from personas (or fallback to defaults)
|
||||
const personas = getPersonas ? getPersonas() : null;
|
||||
const agentIds =
|
||||
personas && personas.size > 0
|
||||
? Array.from(personas.keys())
|
||||
: DEFAULT_AGENT_IDS;
|
||||
const brokerTools = buildBrokerTools(agentIds);
|
||||
const brokerSystemPrompt = buildBrokerSystemPrompt(personas);
|
||||
|
||||
let response: LlmResponse;
|
||||
try {
|
||||
response = await llmClient.chat({
|
||||
messages: [
|
||||
{ role: 'system', content: brokerSystemPrompt },
|
||||
{ role: 'user', content: `Pending tasks:\n${taskSummary}` },
|
||||
],
|
||||
tools: brokerTools,
|
||||
});
|
||||
} catch (err) {
|
||||
for (const task of tasks) {
|
||||
routineStore.appendEvent({
|
||||
occurredAt: Date.now(),
|
||||
kind: 'task-responded',
|
||||
meta: JSON.stringify({
|
||||
taskId: task.taskId,
|
||||
assigneeId: 'broker',
|
||||
result: `Routing failed: ${err instanceof Error ? err.message : String(err)}`,
|
||||
}),
|
||||
});
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (!response.tool_calls) return;
|
||||
|
||||
for (const tc of response.tool_calls) {
|
||||
if (tc.function.name !== 'assign_task') continue;
|
||||
let args: { taskId?: string; assigneeId?: string };
|
||||
try {
|
||||
args = JSON.parse(tc.function.arguments);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
if (!args.taskId || !args.assigneeId) continue;
|
||||
|
||||
routineStore.appendEvent({
|
||||
occurredAt: Date.now(),
|
||||
kind: 'task-assigned',
|
||||
meta: JSON.stringify({
|
||||
taskId: args.taskId,
|
||||
assigneeId: args.assigneeId,
|
||||
assignedBy: 'broker',
|
||||
}),
|
||||
});
|
||||
}
|
||||
};
|
||||
}
|
||||
@@ -1,7 +1,3 @@
|
||||
export {
|
||||
type BrokerExecutorOptions,
|
||||
createBrokerExecutor,
|
||||
} from './broker.js';
|
||||
export {
|
||||
executeSurvivalEffect,
|
||||
type SurvivalEffect,
|
||||
|
||||
@@ -932,66 +932,10 @@ export {
|
||||
createAgentLoopRule,
|
||||
} from './rules/agent-loop.js';
|
||||
|
||||
// ── Task Rule ───────────────────────────────────────────────────
|
||||
|
||||
export {
|
||||
type BrokerEffect,
|
||||
type CursorTaskEffect,
|
||||
createTaskRule,
|
||||
} from './rules/task-rule.js';
|
||||
|
||||
// ── Broker Executor ─────────────────────────────────────────────
|
||||
|
||||
export {
|
||||
type BrokerExecutorOptions,
|
||||
createBrokerExecutor,
|
||||
} from './executors/broker.js';
|
||||
|
||||
// ── Persona Registry ────────────────────────────────────────────
|
||||
|
||||
export { buildPersonasFromEvents } from './persona.js';
|
||||
|
||||
// ── Container Registry ──────────────────────────────────────────
|
||||
|
||||
export type {
|
||||
Container,
|
||||
ContainerRegisteredMeta,
|
||||
ContainerStatusChangedMeta,
|
||||
} from './container.js';
|
||||
export { buildContainersFromEvents } from './container.js';
|
||||
|
||||
// ── Persona × Container Binding ─────────────────────────────────
|
||||
|
||||
export type {
|
||||
PersonaBinding,
|
||||
PersonaBoundMeta,
|
||||
PersonaUnboundMeta,
|
||||
} from './binding.js';
|
||||
export { buildBindingsFromEvents, resolveRole } from './binding.js';
|
||||
|
||||
// ── Council / Moderator ─────────────────────────────────────────
|
||||
|
||||
export type { CouncilOptions, CouncilResult } from './council.js';
|
||||
export { runCouncil } from './council.js';
|
||||
export type {
|
||||
CouncilMessage,
|
||||
ModeratorDecision,
|
||||
ModeratorFn,
|
||||
} from './moderator.js';
|
||||
export type { LlmModeratorOptions } from './moderators/llm-moderator.js';
|
||||
export { createLlmModerator } from './moderators/llm-moderator.js';
|
||||
export { createRoundRobinModerator } from './moderators/round-robin.js';
|
||||
|
||||
// ── Topic ───────────────────────────────────────────────────────
|
||||
|
||||
export type {
|
||||
Topic,
|
||||
TopicClosedMeta,
|
||||
TopicCreatedMeta,
|
||||
TopicStatus,
|
||||
} from './topic.js';
|
||||
export { buildTopicsFromEvents } from './topic.js';
|
||||
|
||||
// ── Council v2: TopicType ────────────────────────────────────────
|
||||
|
||||
export type { CodingTaskContext } from './topics/coding-task.js';
|
||||
|
||||
@@ -1,26 +0,0 @@
|
||||
import type { PersonaState } from './task-events.js';
|
||||
|
||||
/** A message in the Council context — only Role speech, no Moderator actions */
|
||||
export interface CouncilMessage {
|
||||
personaId: string;
|
||||
content: string;
|
||||
timestamp: number;
|
||||
}
|
||||
|
||||
/** Moderator 决策结果 */
|
||||
export type ModeratorDecision =
|
||||
| { action: 'speak'; personaId: string }
|
||||
| { action: 'add'; persona: PersonaState }
|
||||
| { action: 'close'; summary?: string }
|
||||
| {
|
||||
action: 'spawn';
|
||||
topicId: string;
|
||||
title: string;
|
||||
participants: PersonaState[];
|
||||
};
|
||||
|
||||
/** Moderator 纯函数签名 */
|
||||
export type ModeratorFn = (
|
||||
participants: PersonaState[],
|
||||
history: CouncilMessage[],
|
||||
) => Promise<ModeratorDecision>;
|
||||
@@ -1,193 +0,0 @@
|
||||
import type { LlmClient } from '../llm-client.js';
|
||||
import type {
|
||||
CouncilMessage,
|
||||
ModeratorDecision,
|
||||
ModeratorFn,
|
||||
} from '../moderator.js';
|
||||
import type { ContainerType, PersonaState } from '../task-events.js';
|
||||
|
||||
function buildModeratorTools(participantIds: string[]) {
|
||||
return [
|
||||
{
|
||||
type: 'function' as const,
|
||||
function: {
|
||||
name: 'next_speaker',
|
||||
description: 'Select the next participant to speak',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
personaId: {
|
||||
type: 'string',
|
||||
enum: participantIds,
|
||||
description: 'The persona ID of the next speaker',
|
||||
},
|
||||
},
|
||||
required: ['personaId'],
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
type: 'function' as const,
|
||||
function: {
|
||||
name: 'add_member',
|
||||
description: 'Add a new member to the council',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
personaId: { type: 'string', description: 'Unique persona ID' },
|
||||
name: { type: 'string', description: 'Display name' },
|
||||
container: {
|
||||
type: 'string',
|
||||
enum: ['openclaw', 'cursor', 'claude-code', 'hermes'],
|
||||
description: 'Container type',
|
||||
},
|
||||
capabilities: {
|
||||
type: 'array',
|
||||
items: { type: 'string' },
|
||||
description: 'List of capabilities',
|
||||
},
|
||||
},
|
||||
required: ['personaId', 'name', 'container', 'capabilities'],
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
type: 'function' as const,
|
||||
function: {
|
||||
name: 'close_council',
|
||||
description: 'Close the council session',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
summary: { type: 'string', description: 'Optional summary' },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
type: 'function' as const,
|
||||
function: {
|
||||
name: 'spawn_sub_topic',
|
||||
description:
|
||||
'Create a sub-topic and spawn a nested council to discuss it',
|
||||
parameters: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
topicId: {
|
||||
type: 'string',
|
||||
description: 'Unique ID for the sub-topic',
|
||||
},
|
||||
title: { type: 'string', description: 'Title of the sub-topic' },
|
||||
participantIds: {
|
||||
type: 'array',
|
||||
items: { type: 'string', enum: participantIds },
|
||||
description: 'IDs of participants to include in the sub-council',
|
||||
},
|
||||
},
|
||||
required: ['topicId', 'title', 'participantIds'],
|
||||
},
|
||||
},
|
||||
},
|
||||
];
|
||||
}
|
||||
|
||||
function buildSystemPrompt(
|
||||
participants: PersonaState[],
|
||||
history: CouncilMessage[],
|
||||
): string {
|
||||
const participantLines = participants
|
||||
.map(
|
||||
(p) =>
|
||||
`- ${p.personaId} (${p.name}): container=${p.container}, capabilities=[${p.capabilities.join(', ')}]`,
|
||||
)
|
||||
.join('\n');
|
||||
|
||||
const historyLines =
|
||||
history.length > 0
|
||||
? history.map((m) => `[${m.personaId}]: ${m.content}`).join('\n')
|
||||
: '(no messages yet)';
|
||||
|
||||
return `You are a council moderator. Your job is to decide who speaks next, whether to add new members, or whether to close the council.
|
||||
|
||||
Participants:
|
||||
${participantLines}
|
||||
|
||||
Conversation so far:
|
||||
${historyLines}
|
||||
|
||||
Decide the next action by calling one of the available tools.`;
|
||||
}
|
||||
|
||||
export interface LlmModeratorOptions {
|
||||
llmClient: LlmClient;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create an LLM-driven moderator.
|
||||
*
|
||||
* Uses tool calls to decide: next_speaker, add_member, or close_council.
|
||||
*/
|
||||
export function createLlmModerator(opts: LlmModeratorOptions): ModeratorFn {
|
||||
const { llmClient } = opts;
|
||||
|
||||
return async (
|
||||
participants: PersonaState[],
|
||||
history: CouncilMessage[],
|
||||
): Promise<ModeratorDecision> => {
|
||||
const participantIds = participants.map((p) => p.personaId);
|
||||
const tools = buildModeratorTools(participantIds);
|
||||
const systemPrompt = buildSystemPrompt(participants, history);
|
||||
|
||||
const response = await llmClient.chat({
|
||||
messages: [
|
||||
{ role: 'system', content: systemPrompt },
|
||||
{ role: 'user', content: 'Decide the next action for this council.' },
|
||||
],
|
||||
tools,
|
||||
});
|
||||
|
||||
// Parse tool call
|
||||
if (response.tool_calls && response.tool_calls.length > 0) {
|
||||
const call = response.tool_calls[0]!;
|
||||
const args = JSON.parse(call.function.arguments);
|
||||
|
||||
switch (call.function.name) {
|
||||
case 'next_speaker':
|
||||
return { action: 'speak', personaId: args.personaId };
|
||||
case 'add_member':
|
||||
return {
|
||||
action: 'add',
|
||||
persona: {
|
||||
personaId: args.personaId,
|
||||
name: args.name,
|
||||
container: args.container as ContainerType,
|
||||
capabilities: args.capabilities,
|
||||
registeredAt: Date.now(),
|
||||
updatedAt: Date.now(),
|
||||
},
|
||||
};
|
||||
case 'close_council':
|
||||
return { action: 'close', summary: args.summary };
|
||||
case 'spawn_sub_topic': {
|
||||
const subParticipants = participants.filter((p) =>
|
||||
(args.participantIds as string[]).includes(p.personaId),
|
||||
);
|
||||
return {
|
||||
action: 'spawn',
|
||||
topicId: args.topicId,
|
||||
title: args.title,
|
||||
participants: subParticipants,
|
||||
};
|
||||
}
|
||||
default:
|
||||
return { action: 'close', summary: 'Unknown tool call — closing' };
|
||||
}
|
||||
}
|
||||
|
||||
// No tool call — default to close
|
||||
return {
|
||||
action: 'close',
|
||||
summary: response.content || 'LLM did not call a tool',
|
||||
};
|
||||
};
|
||||
}
|
||||
@@ -1,52 +0,0 @@
|
||||
import { describe, expect, test } from 'bun:test';
|
||||
import type { PersonaState } from '../task-events.js';
|
||||
import { createRoundRobinModerator } from './round-robin.js';
|
||||
|
||||
function makePersona(id: string): PersonaState {
|
||||
return {
|
||||
personaId: id,
|
||||
name: id,
|
||||
container: 'openclaw',
|
||||
capabilities: [],
|
||||
registeredAt: Date.now(),
|
||||
updatedAt: Date.now(),
|
||||
};
|
||||
}
|
||||
|
||||
describe('round-robin moderator', () => {
|
||||
test('cycles through participants in order', async () => {
|
||||
const mod = createRoundRobinModerator();
|
||||
const participants = [makePersona('a'), makePersona('b'), makePersona('c')];
|
||||
|
||||
const d1 = await mod(participants, []);
|
||||
expect(d1).toEqual({ action: 'speak', personaId: 'a' });
|
||||
|
||||
const d2 = await mod(participants, [
|
||||
{ personaId: 'a', content: 'hi', timestamp: 1 },
|
||||
]);
|
||||
expect(d2).toEqual({ action: 'speak', personaId: 'b' });
|
||||
|
||||
const d3 = await mod(participants, [
|
||||
{ personaId: 'a', content: 'hi', timestamp: 1 },
|
||||
{ personaId: 'b', content: 'hey', timestamp: 2 },
|
||||
]);
|
||||
expect(d3).toEqual({ action: 'speak', personaId: 'c' });
|
||||
});
|
||||
|
||||
test('closes after all have spoken', async () => {
|
||||
const mod = createRoundRobinModerator();
|
||||
const participants = [makePersona('a'), makePersona('b')];
|
||||
const history = [
|
||||
{ personaId: 'a', content: 'hi', timestamp: 1 },
|
||||
{ personaId: 'b', content: 'hey', timestamp: 2 },
|
||||
];
|
||||
const decision = await mod(participants, history);
|
||||
expect(decision.action).toBe('close');
|
||||
});
|
||||
|
||||
test('closes immediately with no participants', async () => {
|
||||
const mod = createRoundRobinModerator();
|
||||
const decision = await mod([], []);
|
||||
expect(decision.action).toBe('close');
|
||||
});
|
||||
});
|
||||
@@ -1,36 +0,0 @@
|
||||
import type {
|
||||
CouncilMessage,
|
||||
ModeratorDecision,
|
||||
ModeratorFn,
|
||||
} from '../moderator.js';
|
||||
import type { PersonaState } from '../task-events.js';
|
||||
|
||||
/**
|
||||
* Create a round-robin moderator.
|
||||
*
|
||||
* Each participant speaks once in order, then the council closes.
|
||||
* Never adds new members.
|
||||
*/
|
||||
export function createRoundRobinModerator(): ModeratorFn {
|
||||
return async (
|
||||
participants: PersonaState[],
|
||||
history: CouncilMessage[],
|
||||
): Promise<ModeratorDecision> => {
|
||||
if (participants.length === 0) {
|
||||
return { action: 'close', summary: 'No participants' };
|
||||
}
|
||||
|
||||
// Find who has already spoken
|
||||
const spoken = new Set(history.map((m) => m.personaId));
|
||||
|
||||
// Find the first participant who hasn't spoken yet
|
||||
for (const p of participants) {
|
||||
if (!spoken.has(p.personaId)) {
|
||||
return { action: 'speak', personaId: p.personaId };
|
||||
}
|
||||
}
|
||||
|
||||
// Everyone has spoken — close
|
||||
return { action: 'close', summary: 'All participants have spoken' };
|
||||
};
|
||||
}
|
||||
@@ -47,7 +47,7 @@ export function buildPersonasFromEvents(
|
||||
});
|
||||
} else if (ev.kind === 'persona-updated') {
|
||||
const existing = personas.get(personaId);
|
||||
if (!existing) continue; // can't update what doesn't exist
|
||||
if (!existing) continue;
|
||||
const m = meta as unknown as PersonaUpdatedMeta;
|
||||
if (m.container !== undefined) existing.container = m.container;
|
||||
if (m.capabilities !== undefined) existing.capabilities = m.capabilities;
|
||||
|
||||
@@ -1,251 +0,0 @@
|
||||
import { beforeEach, describe, expect, jest, test } from 'bun:test';
|
||||
import type { Sensed } from '../index.js';
|
||||
import type { PendingTasksData, TaskState } from '../task-events.js';
|
||||
import { createTaskRule } from './task-rule.js';
|
||||
|
||||
type Snapshot = Record<string, unknown> & { timestamp: number };
|
||||
type Effect = Record<string, unknown>;
|
||||
|
||||
function makeTask(overrides: Partial<TaskState> = {}): TaskState {
|
||||
return {
|
||||
taskId: 'task-1',
|
||||
projectId: 'proj-1',
|
||||
title: 'Fix bug',
|
||||
description: 'Fix the login bug',
|
||||
type: 'bug',
|
||||
priority: 0,
|
||||
creatorId: 'user-1',
|
||||
status: 'pending',
|
||||
createdAt: Date.now(),
|
||||
updatedAt: Date.now(),
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
function makeSnapshot(overrides: Record<string, unknown> = {}): Snapshot {
|
||||
return { timestamp: Date.now(), ...overrides };
|
||||
}
|
||||
|
||||
function makePendingTasksSlice(tasks: TaskState[]): Record<string, unknown> {
|
||||
return {
|
||||
'pending-tasks': {
|
||||
data: {
|
||||
pendingCount: tasks.length,
|
||||
tasks,
|
||||
byProject: {},
|
||||
checkedAt: Date.now(),
|
||||
},
|
||||
} as Sensed<PendingTasksData>,
|
||||
};
|
||||
}
|
||||
|
||||
const mockInner = jest.fn(async () => [[], 15000] as [Effect[], number]);
|
||||
|
||||
describe('createTaskRule', () => {
|
||||
beforeEach(() => {
|
||||
mockInner.mockClear();
|
||||
mockInner.mockResolvedValue([[], 15000]);
|
||||
});
|
||||
|
||||
test('no pending-tasks → pass through, no new effects', async () => {
|
||||
const rule = createTaskRule<Snapshot, Effect>();
|
||||
const prev = makeSnapshot();
|
||||
const curr = makeSnapshot();
|
||||
|
||||
const [effects, tickMs] = await rule(prev, curr, mockInner);
|
||||
|
||||
expect(effects).toEqual([]);
|
||||
expect(tickMs).toBe(15000);
|
||||
});
|
||||
|
||||
test('first tick: pending tasks → emit broker effect', async () => {
|
||||
const rule = createTaskRule<Snapshot, Effect>();
|
||||
const tasks = [makeTask({ taskId: 't1' }), makeTask({ taskId: 't2' })];
|
||||
const curr = makeSnapshot(makePendingTasksSlice(tasks));
|
||||
|
||||
const [effects] = await rule(makeSnapshot(), curr, mockInner);
|
||||
|
||||
expect(effects).toContainEqual(
|
||||
expect.objectContaining({
|
||||
kind: 'broker',
|
||||
taskIds: ['t1', 't2'],
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
test('first tick: assigned tasks → emit cursor effect per task', async () => {
|
||||
const rule = createTaskRule<Snapshot, Effect>();
|
||||
const tasks = [
|
||||
makeTask({
|
||||
taskId: 't1',
|
||||
projectId: 'proj-a',
|
||||
status: 'assigned',
|
||||
assigneeId: 'cursor',
|
||||
}),
|
||||
makeTask({
|
||||
taskId: 't2',
|
||||
projectId: 'proj-b',
|
||||
status: 'assigned',
|
||||
assigneeId: 'cursor',
|
||||
}),
|
||||
];
|
||||
const curr = makeSnapshot(makePendingTasksSlice(tasks));
|
||||
|
||||
const [effects] = await rule(makeSnapshot(), curr, mockInner);
|
||||
|
||||
expect(effects).toContainEqual(
|
||||
expect.objectContaining({
|
||||
kind: 'cursor',
|
||||
taskId: 't1',
|
||||
projectId: 'proj-a',
|
||||
}),
|
||||
);
|
||||
expect(effects).toContainEqual(
|
||||
expect.objectContaining({
|
||||
kind: 'cursor',
|
||||
taskId: 't2',
|
||||
projectId: 'proj-b',
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
test('first tick: mix of pending and assigned → both broker and cursor effects', async () => {
|
||||
const rule = createTaskRule<Snapshot, Effect>();
|
||||
const tasks = [
|
||||
makeTask({ taskId: 't1', status: 'pending' }),
|
||||
makeTask({
|
||||
taskId: 't2',
|
||||
status: 'assigned',
|
||||
assigneeId: 'cursor',
|
||||
projectId: 'proj-2',
|
||||
}),
|
||||
];
|
||||
const curr = makeSnapshot(makePendingTasksSlice(tasks));
|
||||
|
||||
const [effects] = await rule(makeSnapshot(), curr, mockInner);
|
||||
|
||||
expect(effects).toContainEqual(
|
||||
expect.objectContaining({ kind: 'broker', taskIds: ['t1'] }),
|
||||
);
|
||||
expect(effects).toContainEqual(
|
||||
expect.objectContaining({ kind: 'cursor', taskId: 't2' }),
|
||||
);
|
||||
});
|
||||
|
||||
test('already pending task → no broker effect (diff-driven)', async () => {
|
||||
const rule = createTaskRule<Snapshot, Effect>();
|
||||
const tasks = [makeTask({ taskId: 't1', status: 'pending' })];
|
||||
const prev = makeSnapshot(makePendingTasksSlice(tasks));
|
||||
const curr = makeSnapshot(makePendingTasksSlice(tasks));
|
||||
|
||||
const [effects] = await rule(prev, curr, mockInner);
|
||||
|
||||
const brokerEffects = effects.filter((e) => e.kind === 'broker');
|
||||
expect(brokerEffects).toHaveLength(0);
|
||||
});
|
||||
|
||||
test('already assigned task → no cursor effect (diff-driven)', async () => {
|
||||
const rule = createTaskRule<Snapshot, Effect>();
|
||||
const tasks = [
|
||||
makeTask({ taskId: 't1', status: 'assigned', assigneeId: 'cursor' }),
|
||||
];
|
||||
const prev = makeSnapshot(makePendingTasksSlice(tasks));
|
||||
const curr = makeSnapshot(makePendingTasksSlice(tasks));
|
||||
|
||||
const [effects] = await rule(prev, curr, mockInner);
|
||||
|
||||
const cursorEffects = effects.filter((e) => e.kind === 'cursor');
|
||||
expect(cursorEffects).toHaveLength(0);
|
||||
});
|
||||
|
||||
test('new pending task appears → emit broker', async () => {
|
||||
const rule = createTaskRule<Snapshot, Effect>();
|
||||
const prevTasks = [makeTask({ taskId: 't1', status: 'pending' })];
|
||||
const currTasks = [
|
||||
makeTask({ taskId: 't1', status: 'pending' }),
|
||||
makeTask({ taskId: 't2', status: 'pending' }),
|
||||
];
|
||||
const prev = makeSnapshot(makePendingTasksSlice(prevTasks));
|
||||
const curr = makeSnapshot(makePendingTasksSlice(currTasks));
|
||||
|
||||
const [effects] = await rule(prev, curr, mockInner);
|
||||
|
||||
expect(effects).toContainEqual(
|
||||
expect.objectContaining({ kind: 'broker', taskIds: ['t2'] }),
|
||||
);
|
||||
});
|
||||
|
||||
test('task transitions from pending to assigned → emit cursor', async () => {
|
||||
const rule = createTaskRule<Snapshot, Effect>();
|
||||
const prevTasks = [
|
||||
makeTask({ taskId: 't1', status: 'pending', projectId: 'proj-a' }),
|
||||
];
|
||||
const currTasks = [
|
||||
makeTask({
|
||||
taskId: 't1',
|
||||
status: 'assigned',
|
||||
assigneeId: 'cursor',
|
||||
projectId: 'proj-a',
|
||||
}),
|
||||
];
|
||||
const prev = makeSnapshot(makePendingTasksSlice(prevTasks));
|
||||
const curr = makeSnapshot(makePendingTasksSlice(currTasks));
|
||||
|
||||
const [effects] = await rule(prev, curr, mockInner);
|
||||
|
||||
expect(effects).toContainEqual(
|
||||
expect.objectContaining({
|
||||
kind: 'cursor',
|
||||
taskId: 't1',
|
||||
projectId: 'proj-a',
|
||||
}),
|
||||
);
|
||||
// No broker since t1 is no longer pending in curr
|
||||
const brokerEffects = effects.filter((e) => e.kind === 'broker');
|
||||
expect(brokerEffects).toHaveLength(0);
|
||||
});
|
||||
|
||||
test('task transitions from routing to assigned → emit cursor', async () => {
|
||||
const rule = createTaskRule<Snapshot, Effect>();
|
||||
const prevTasks = [
|
||||
makeTask({ taskId: 't1', status: 'routing', projectId: 'proj-a' }),
|
||||
];
|
||||
const currTasks = [
|
||||
makeTask({
|
||||
taskId: 't1',
|
||||
status: 'assigned',
|
||||
assigneeId: 'cursor',
|
||||
projectId: 'proj-a',
|
||||
}),
|
||||
];
|
||||
const prev = makeSnapshot(makePendingTasksSlice(prevTasks));
|
||||
const curr = makeSnapshot(makePendingTasksSlice(currTasks));
|
||||
|
||||
const [effects] = await rule(prev, curr, mockInner);
|
||||
|
||||
expect(effects).toContainEqual(
|
||||
expect.objectContaining({
|
||||
kind: 'cursor',
|
||||
taskId: 't1',
|
||||
projectId: 'proj-a',
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
test('inner rule effects are preserved', async () => {
|
||||
const innerEffect = { kind: 'collect', key: 'system' };
|
||||
mockInner.mockResolvedValueOnce([[innerEffect], 10000]);
|
||||
|
||||
const rule = createTaskRule<Snapshot, Effect>();
|
||||
const tasks = [
|
||||
makeTask({ taskId: 't1', status: 'assigned', assigneeId: 'cursor' }),
|
||||
];
|
||||
const curr = makeSnapshot(makePendingTasksSlice(tasks));
|
||||
|
||||
const [effects, tickMs] = await rule(makeSnapshot(), curr, mockInner);
|
||||
|
||||
expect(effects[0]).toEqual(innerEffect);
|
||||
expect(effects.length).toBe(2);
|
||||
expect(tickMs).toBe(10000);
|
||||
});
|
||||
});
|
||||
@@ -1,75 +0,0 @@
|
||||
import type { Rule, Sensed } from '../index.js';
|
||||
import type { PendingTasksData, TaskState } from '../task-events.js';
|
||||
|
||||
export interface BrokerEffect {
|
||||
kind: 'broker';
|
||||
taskIds: string[];
|
||||
}
|
||||
|
||||
export interface CursorTaskEffect {
|
||||
kind: 'cursor';
|
||||
taskId: string;
|
||||
projectId: string;
|
||||
}
|
||||
|
||||
type Snapshot = Record<string, unknown> & { timestamp: number };
|
||||
type Effect = Record<string, unknown>;
|
||||
|
||||
/**
|
||||
* Lightweight task rule — no LLM call.
|
||||
*
|
||||
* - pending tasks → emit broker effect (idempotent: skip if any task is routing)
|
||||
* - assigned tasks → emit cursor effect per task
|
||||
*/
|
||||
export function createTaskRule<
|
||||
S extends Snapshot = Snapshot,
|
||||
E extends Effect = Effect,
|
||||
>(): Rule<S, E> {
|
||||
return async (prev, curr, inner) => {
|
||||
const [effects, tickMs] = await inner(prev, curr);
|
||||
const newEffects: E[] = [];
|
||||
|
||||
const currTasks =
|
||||
(curr['pending-tasks'] as Sensed<PendingTasksData> | undefined)?.data
|
||||
?.tasks ?? [];
|
||||
const prevTasks =
|
||||
(prev['pending-tasks'] as Sensed<PendingTasksData> | undefined)?.data
|
||||
?.tasks ?? [];
|
||||
|
||||
// Broker: only emit for tasks that newly entered pending
|
||||
const prevPendingIds = new Set(
|
||||
prevTasks
|
||||
.filter((t: TaskState) => t.status === 'pending')
|
||||
.map((t: TaskState) => t.taskId),
|
||||
);
|
||||
const newPending = currTasks.filter(
|
||||
(t: TaskState) => t.status === 'pending' && !prevPendingIds.has(t.taskId),
|
||||
);
|
||||
if (newPending.length > 0) {
|
||||
newEffects.push({
|
||||
kind: 'broker',
|
||||
taskIds: newPending.map((t: TaskState) => t.taskId),
|
||||
} as unknown as E);
|
||||
}
|
||||
|
||||
// Cursor: only emit for tasks that newly became assigned
|
||||
const prevAssignedIds = new Set(
|
||||
prevTasks
|
||||
.filter((t: TaskState) => t.status === 'assigned')
|
||||
.map((t: TaskState) => t.taskId),
|
||||
);
|
||||
const newAssigned = currTasks.filter(
|
||||
(t: TaskState) =>
|
||||
t.status === 'assigned' && !prevAssignedIds.has(t.taskId),
|
||||
);
|
||||
for (const task of newAssigned) {
|
||||
newEffects.push({
|
||||
kind: 'cursor',
|
||||
taskId: task.taskId,
|
||||
projectId: task.projectId,
|
||||
} as unknown as E);
|
||||
}
|
||||
|
||||
return [[...effects, ...newEffects], tickMs];
|
||||
};
|
||||
}
|
||||
@@ -1,113 +0,0 @@
|
||||
import { describe, expect, test } from 'bun:test';
|
||||
import { mkdtempSync } from 'node:fs';
|
||||
import { tmpdir } from 'node:os';
|
||||
import { join } from 'node:path';
|
||||
import { createStore } from './store.js';
|
||||
import type { TopicClosedMeta, TopicCreatedMeta } from './topic.js';
|
||||
import { buildTopicsFromEvents } from './topic.js';
|
||||
|
||||
function createTestStore() {
|
||||
const tmp = mkdtempSync(join(tmpdir(), 'topic-test-'));
|
||||
return createStore({
|
||||
eventsDbPath: join(tmp, 'events.db'),
|
||||
objectsDir: join(tmp, 'objects'),
|
||||
});
|
||||
}
|
||||
|
||||
describe('buildTopicsFromEvents', () => {
|
||||
test('create topic', () => {
|
||||
const store = createTestStore();
|
||||
const meta: TopicCreatedMeta = {
|
||||
topicId: 't1',
|
||||
title: 'Design review',
|
||||
createdBy: 'alice',
|
||||
};
|
||||
store.appendEvent({
|
||||
occurredAt: 1000,
|
||||
kind: 'topic-created',
|
||||
key: 't1',
|
||||
meta: JSON.stringify(meta),
|
||||
});
|
||||
|
||||
const topics = buildTopicsFromEvents(store);
|
||||
expect(topics.size).toBe(1);
|
||||
const t = topics.get('t1')!;
|
||||
expect(t.topicId).toBe('t1');
|
||||
expect(t.title).toBe('Design review');
|
||||
expect(t.createdBy).toBe('alice');
|
||||
expect(t.status).toBe('open');
|
||||
expect(t.createdAt).toBe(1000);
|
||||
expect(t.parentTopicId).toBeUndefined();
|
||||
store.close();
|
||||
});
|
||||
|
||||
test('create child topic with parentTopicId', () => {
|
||||
const store = createTestStore();
|
||||
store.appendEvent({
|
||||
occurredAt: 1000,
|
||||
kind: 'topic-created',
|
||||
key: 't1',
|
||||
meta: JSON.stringify({
|
||||
topicId: 't1',
|
||||
title: 'Parent',
|
||||
createdBy: 'alice',
|
||||
} satisfies TopicCreatedMeta),
|
||||
});
|
||||
store.appendEvent({
|
||||
occurredAt: 2000,
|
||||
kind: 'topic-created',
|
||||
key: 't2',
|
||||
meta: JSON.stringify({
|
||||
topicId: 't2',
|
||||
parentTopicId: 't1',
|
||||
title: 'Child',
|
||||
createdBy: 'bob',
|
||||
} satisfies TopicCreatedMeta),
|
||||
});
|
||||
|
||||
const topics = buildTopicsFromEvents(store);
|
||||
expect(topics.size).toBe(2);
|
||||
const child = topics.get('t2')!;
|
||||
expect(child.parentTopicId).toBe('t1');
|
||||
expect(child.title).toBe('Child');
|
||||
expect(child.status).toBe('open');
|
||||
store.close();
|
||||
});
|
||||
|
||||
test('close topic with summary', () => {
|
||||
const store = createTestStore();
|
||||
store.appendEvent({
|
||||
occurredAt: 1000,
|
||||
kind: 'topic-created',
|
||||
key: 't1',
|
||||
meta: JSON.stringify({
|
||||
topicId: 't1',
|
||||
title: 'Bug triage',
|
||||
createdBy: 'alice',
|
||||
} satisfies TopicCreatedMeta),
|
||||
});
|
||||
store.appendEvent({
|
||||
occurredAt: 2000,
|
||||
kind: 'topic-closed',
|
||||
key: 't1',
|
||||
meta: JSON.stringify({
|
||||
topicId: 't1',
|
||||
summary: 'Resolved all P0 bugs',
|
||||
} satisfies TopicClosedMeta),
|
||||
});
|
||||
|
||||
const topics = buildTopicsFromEvents(store);
|
||||
const t = topics.get('t1')!;
|
||||
expect(t.status).toBe('closed');
|
||||
expect(t.closedAt).toBe(2000);
|
||||
expect(t.summary).toBe('Resolved all P0 bugs');
|
||||
store.close();
|
||||
});
|
||||
|
||||
test('empty event stream', () => {
|
||||
const store = createTestStore();
|
||||
const topics = buildTopicsFromEvents(store);
|
||||
expect(topics.size).toBe(0);
|
||||
store.close();
|
||||
});
|
||||
});
|
||||
@@ -1,77 +0,0 @@
|
||||
import type { PulseStore } from './store.js';
|
||||
|
||||
export type TopicStatus = 'open' | 'closed';
|
||||
|
||||
export interface Topic {
|
||||
topicId: string;
|
||||
parentTopicId?: string;
|
||||
title: string;
|
||||
status: TopicStatus;
|
||||
createdBy: string;
|
||||
createdAt: number;
|
||||
closedAt?: number;
|
||||
summary?: string;
|
||||
}
|
||||
|
||||
export interface TopicCreatedMeta {
|
||||
topicId: string;
|
||||
parentTopicId?: string;
|
||||
title: string;
|
||||
createdBy: string;
|
||||
}
|
||||
|
||||
export interface TopicClosedMeta {
|
||||
topicId: string;
|
||||
summary?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a Map of Topic from topic-created and topic-closed events.
|
||||
*
|
||||
* - topic-created → open topic
|
||||
* - topic-closed → close + attach summary
|
||||
* - Events are merged in occurredAt order.
|
||||
*/
|
||||
export function buildTopicsFromEvents(store: PulseStore): Map<string, Topic> {
|
||||
const created = store.queryByKind('topic-created');
|
||||
const closed = store.queryByKind('topic-closed');
|
||||
|
||||
const allEvents = [...created, ...closed];
|
||||
allEvents.sort((a, b) => a.occurredAt - b.occurredAt);
|
||||
|
||||
const topics = new Map<string, Topic>();
|
||||
|
||||
for (const ev of allEvents) {
|
||||
if (!ev.meta) continue;
|
||||
let meta: Record<string, unknown>;
|
||||
try {
|
||||
meta = JSON.parse(ev.meta);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
|
||||
const topicId = meta.topicId as string;
|
||||
if (!topicId) continue;
|
||||
|
||||
if (ev.kind === 'topic-created') {
|
||||
const m = meta as unknown as TopicCreatedMeta;
|
||||
topics.set(topicId, {
|
||||
topicId: m.topicId,
|
||||
parentTopicId: m.parentTopicId,
|
||||
title: m.title,
|
||||
status: 'open',
|
||||
createdBy: m.createdBy,
|
||||
createdAt: ev.occurredAt,
|
||||
});
|
||||
} else if (ev.kind === 'topic-closed') {
|
||||
const existing = topics.get(topicId);
|
||||
if (!existing) continue;
|
||||
const m = meta as unknown as TopicClosedMeta;
|
||||
existing.status = 'closed';
|
||||
existing.closedAt = ev.occurredAt;
|
||||
if (m.summary !== undefined) existing.summary = m.summary;
|
||||
}
|
||||
}
|
||||
|
||||
return topics;
|
||||
}
|
||||
Reference in New Issue
Block a user