refactor: workflow message chain + CAS storage

This commit is contained in:
2026-04-17 05:02:58 +00:00
parent 59afcf6211
commit ff689ce3ce
19 changed files with 1114 additions and 963 deletions
+15 -22
View File
@@ -1,9 +1,6 @@
#!/usr/bin/env bun
/**
* Pulse Council Demo — v2 WorkflowType Pipeline
*
* Executable script (not a test) that simulates the complete
* Council v2 dispatch chain driven by WorkflowType rules.
* Pulse Council Demo — v2 WorkflowType Pipeline (message chain model)
*
* Run: cd ~/repos/pulse && bun packages/pulse/src/e2e/council-demo.ts
*
@@ -19,26 +16,18 @@ import { createCodingWorkflow } from '../workflows/coding-workflow.js';
import { createArchitectRole } from '../workflows/roles/architect-llm.js';
import { createWorkflowRule } from '../workflows/workflow-rule-adapter.js';
// ── Helpers ────────────────────────────────────────────────────
const SEP = '─'.repeat(50);
function log(emoji: string, msg: string) {
console.log(`${emoji} ${msg}`);
}
function logItem(msg: string) {
console.log(`${msg}`);
}
// ── CLI Args ───────────────────────────────────────────────────
const LIVE = process.argv.includes('--live');
const DB_PATH = process.argv.find((a) => a.startsWith('--db='))?.slice(5);
const KEEP = process.argv.includes('--keep');
// ── Main ───────────────────────────────────────────────────────
async function main() {
const useCustomDb = !!DB_PATH;
const tmpDir = useCustomDb
@@ -48,7 +37,7 @@ async function main() {
console.log();
console.log(SEP);
log('🚀', 'Pulse Council v2 Demo');
log('🚀', 'Pulse Council v2 Demo (Message Chain)');
console.log(SEP);
console.log();
log('📋', `Mode: ${LIVE ? 'LIVE (LLM)' : 'Mock'}`);
@@ -61,7 +50,6 @@ async function main() {
});
try {
// Build topic type (mock or live)
let codingTask: ReturnType<typeof createCodingWorkflow>;
if (LIVE) {
const baseUrl =
@@ -84,7 +72,7 @@ async function main() {
}
const rule = createWorkflowRule(codingTask, store);
// Step 1: Create coding tasks
// Step 1: Create coding tasks via coding.user events
log('📝', 'Step 1: Creating coding tasks');
const tasks = [
{
@@ -102,18 +90,22 @@ async function main() {
];
for (const t of tasks) {
const hash = store.putObject({
content: t.description,
artifacts: { title: t.title, repoDir: t.repoDir },
});
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.created',
kind: 'coding.user',
key: t.topicId,
meta: JSON.stringify(t),
hash,
});
logItem(`Created: ${t.title} (${t.topicId})`);
}
console.log();
// Step 2: Run tick loop
log('⚡', 'Step 2: Running topic rule ticks');
log('⚡', 'Step 2: Running workflow ticks');
console.log();
for (let tick = 1; tick <= 20; tick++) {
@@ -137,20 +129,21 @@ async function main() {
.getAfter(0)
.filter((e) => e.kind.startsWith('coding.'));
for (const e of allEvents) {
const meta = e.meta ? JSON.parse(e.meta) : {};
const ts = new Date(e.occurredAt).toISOString().slice(11, 23);
const _role = e.kind.split('.')[1];
const hasContent = e.hash ? '📦' : ' ';
const metaStr = e.meta ? ` meta=${e.meta}` : '';
console.log(
` [${ts}] ${e.kind.padEnd(18)} topic=${meta.topicId ?? '?'}`,
` [${ts}] ${e.kind.padEnd(20)} key=${e.key ?? '?'} ${hasContent}${metaStr}`,
);
}
console.log();
log('📈', `Total events: ${allEvents.length}`);
const closedCount = allEvents.filter(
(e) => e.kind === 'coding.closed',
(e) => e.kind === 'coding.closer',
).length;
log('🏁', `Closed topics: ${closedCount}/${tasks.length}`);
console.log();
console.log(SEP);
log('✅', 'Council v2 demo completed!');
+85 -61
View File
@@ -1,6 +1,7 @@
#!/usr/bin/env bun
/**
* Council v2 — Full LIVE: architect(LLM) + coder(Cursor) + reviewer(Cursor)
* Message chain + CAS model.
*
* Usage:
* PULSE_LLM_BASE_URL=... PULSE_LLM_API_KEY=... \
@@ -11,11 +12,9 @@ import { tmpdir } from 'node:os';
import { dirname, join } from 'node:path';
import { createStore, type PulseStore } from '../index.js';
import { createOpenAiLlmClient } from '../llm-client.js';
import {
type CodingWorkflowContext,
createCodingWorkflow,
} from '../workflows/coding-workflow.js';
import { createCodingWorkflow } from '../workflows/coding-workflow.js';
import { createWorkflowRule } from '../workflows/workflow-rule-adapter.js';
import type { WorkflowMessage } from '../workflows/workflow-type.js';
const DB_PATH = process.argv.find((a) => a.startsWith('--db='))?.slice(5);
const tmpDir = DB_PATH
@@ -93,13 +92,20 @@ async function runCursorAgent(
};
}
// ── Role implementations ───────────────────────────────────────
// ── Role implementations (message chain model) ─────────────────
async function architectFn(
ctx: CodingWorkflowContext,
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
): Promise<void> {
console.log(` 🏗️ architect analyzing "${ctx.title}"...`);
const userMsg = chain.find((m) => m.role === 'user');
const userContent = (userMsg?.content as any) ?? {};
const title = userContent.artifacts?.title ?? topicId;
const description = userContent.content ?? '';
const repoDir = userContent.artifacts?.repoDir ?? '/tmp';
console.log(` 🏗️ architect analyzing "${title}"...`);
const resp = await llm.chat({
messages: [
{
@@ -109,7 +115,7 @@ async function architectFn(
},
{
role: 'user',
content: `Task: ${ctx.title}\nDescription: ${ctx.description}\nRepo: ${ctx.repoDir}`,
content: `Task: ${title}\nDescription: ${description}\nRepo: ${repoDir}`,
},
],
});
@@ -120,94 +126,113 @@ async function architectFn(
parsed = { analysis: resp.content ?? 'No analysis', targetFiles: [] };
}
console.log(` 🏗️ analysis: ${(parsed.analysis ?? '').slice(0, 120)}...`);
const hash = store.putObject({
content: parsed.analysis ?? 'No analysis',
artifacts: { targetFiles: parsed.targetFiles ?? [] },
});
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.analyzed',
key: ctx.topicId,
meta: JSON.stringify({
topicId: ctx.topicId,
analysis: parsed.analysis ?? 'No analysis',
targetFiles: parsed.targetFiles ?? [],
}),
kind: 'coding.architect',
key: topicId,
hash,
});
}
async function coderFn(
ctx: CodingWorkflowContext,
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
): Promise<void> {
console.log(` 💻 coder working on "${ctx.title}"...`);
const prompt = `## Task: ${ctx.title}
const userMsg = chain.find((m) => m.role === 'user');
const userContent = (userMsg?.content as any) ?? {};
const title = userContent.artifacts?.title ?? topicId;
const description = userContent.content ?? '';
const repoDir = userContent.artifacts?.repoDir ?? '/tmp';
${ctx.description}
const architectMsg = chain.find((m) => m.role === 'architect');
const architectContent = (architectMsg?.content as any) ?? {};
console.log(` 💻 coder working on "${title}"...`);
const prompt = `## Task: ${title}
${description}
## Architect Analysis
${ctx.analysis?.analysis ?? 'None'}
${architectContent.content ?? 'None'}
## Target Files
${(ctx.analysis?.targetFiles ?? []).join(', ') || 'Not specified'}
${(architectContent.artifacts?.targetFiles ?? []).join(', ') || 'Not specified'}
## Instructions
Implement the changes. Do NOT modify any existing test files. Only create or modify source files as needed.
If the task asks to create a new file, create it. If it asks to modify existing files, modify them.
Run tests if applicable. Commit your changes.`;
const result = await runCursorAgent(prompt, ctx.repoDir);
const result = await runCursorAgent(prompt, repoDir);
console.log(
` 💻 coder ${result.success ? '✅' : '❌'} (${(result.durationMs / 1000).toFixed(1)}s)`,
);
const hash = store.putObject({
content: result.output,
artifacts: {
filesChanged: architectContent.artifacts?.targetFiles ?? [],
testsPassed: result.success,
},
});
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.coded',
key: ctx.topicId,
meta: JSON.stringify({
topicId: ctx.topicId,
filesChanged: ctx.analysis?.targetFiles ?? [],
testsPassed: result.success,
summary: result.output.slice(0, 500),
}),
kind: 'coding.coder',
key: topicId,
hash,
});
}
async function reviewerFn(
ctx: CodingWorkflowContext,
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
): Promise<void> {
console.log(` 🔍 reviewer reviewing "${ctx.title}"...`);
const prompt = `## Code Review: ${ctx.title}
const userMsg = chain.find((m) => m.role === 'user');
const userContent = (userMsg?.content as any) ?? {};
const title = userContent.artifacts?.title ?? topicId;
const repoDir = userContent.artifacts?.repoDir ?? '/tmp';
const coderMsg = [...chain].reverse().find((m) => m.role === 'coder');
const coderContent = (coderMsg?.content as any) ?? {};
console.log(` 🔍 reviewer reviewing "${title}"...`);
const prompt = `## Code Review: ${title}
## What was done
${ctx.codeResult?.summary ?? 'Unknown'}
${coderContent.content ?? 'Unknown'}
## Files changed
${(ctx.codeResult?.filesChanged ?? []).join(', ')}
${(coderContent.artifacts?.filesChanged ?? []).join(', ')}
## Instructions
Review the recent changes for correctness, security, and code quality.
Do NOT modify any files. Only output your review.
End with a clear verdict: APPROVED or REJECTED with reasons.`;
const result = await runCursorAgent(prompt, ctx.repoDir);
const result = await runCursorAgent(prompt, repoDir);
console.log(
` 🔍 reviewer ${result.success ? '✅' : '❌'} (${(result.durationMs / 1000).toFixed(1)}s)`,
);
// Parse verdict from output
const output = result.output.toLowerCase();
const verdict: 'approved' | 'rejected' = output.includes('rejected')
? 'rejected'
: 'approved';
const hash = store.putObject({ content: result.output });
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.reviewed',
key: ctx.topicId,
meta: JSON.stringify({
topicId: ctx.topicId,
verdict,
comments: result.output.slice(0, 500),
}),
kind: 'coding.reviewer',
key: topicId,
hash,
meta: JSON.stringify({ verdict }),
});
}
@@ -216,7 +241,7 @@ End with a clear verdict: APPROVED or REJECTED with reasons.`;
async function main() {
const store = createStore({ eventsDbPath: dbPath, objectsDir: objDir });
console.log(`\n🍊 === Council v2 — Full LIVE Pipeline ===\n`);
console.log(`\n🍊 === Council v2 — Full LIVE Pipeline (Message Chain) ===\n`);
console.log(` LLM: ${model} @ ${baseUrl}`);
console.log(` Cursor: ${AGENT_BIN}`);
console.log(` DB: ${dbPath}\n`);
@@ -228,18 +253,20 @@ async function main() {
});
const rule = createWorkflowRule(codingTask, store);
// Create a real task on pulse repo itself
// Create a real task via coding.user event with CAS
const hash = store.putObject({
content:
'Create packages/pulse/COUNCIL-V2.md — a concise (30-50 lines) overview of the Council v2 model. Cover: WorkflowType (events, projection, roles, moderator), Topic as Rule pattern, one-rule-per-type managing all instances, Moore machine diff-driven ticks. Reference source files in topics/ directory. Do NOT modify any existing files.',
artifacts: {
title: 'Add COUNCIL-V2.md overview',
repoDir: '/home/azureuser/repos/pulse',
},
});
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.created',
kind: 'coding.user',
key: 'add-v2-readme',
meta: JSON.stringify({
topicId: 'add-v2-readme',
title: 'Add COUNCIL-V2.md overview',
description:
'Create packages/pulse/COUNCIL-V2.md — a concise (30-50 lines) overview of the Council v2 model. Cover: WorkflowType (events, projection, roles, moderator), Topic as Rule pattern, one-rule-per-type managing all instances, Moore machine diff-driven ticks. Reference source files in topics/ directory. Do NOT modify any existing files.',
repoDir: '/home/azureuser/repos/pulse',
}),
hash,
});
console.log(ts(), 'Task created: add-v2-readme\n');
@@ -265,14 +292,11 @@ async function main() {
const t0ev = events[0]?.occurredAt ?? 0;
for (const ev of events) {
const delta = ((ev.occurredAt - t0ev) / 1000).toFixed(1);
const meta = ev.meta ? JSON.parse(ev.meta) : {};
const detail =
meta.analysis?.slice(0, 60) ??
meta.summary?.slice(0, 60) ??
meta.verdict ??
'';
const _role = ev.kind.split('.')[1] ?? '?';
const hasContent = ev.hash ? '📦' : '';
const metaStr = ev.meta ? ` meta=${ev.meta}` : '';
console.log(
` [+${delta}s] #${String(ev.id).padStart(2)} ${(ev.kind ?? '').padEnd(18)} topic=${meta.topicId ?? ev.key ?? ''} ${detail}`,
` [+${delta}s] #${String(ev.id).padStart(2)} ${(ev.kind ?? '').padEnd(20)} key=${ev.key ?? '?'} ${hasContent}${metaStr}`,
);
}
+39 -41
View File
@@ -11,6 +11,7 @@ import { join } from 'node:path';
import { createStore, type PulseStore } from '../store.js';
import { createCodingWorkflow } from '../workflows/coding-workflow.js';
import { createWorkflowRule } from '../workflows/workflow-rule-adapter.js';
import type { WorkflowMessage } from '../workflows/workflow-type.js';
describe('Council v2 E2E', () => {
let store: PulseStore;
@@ -31,24 +32,47 @@ describe('Council v2 E2E', () => {
if (tmpDir) rmSync(tmpDir, { recursive: true, force: true });
});
function createUserEvent(
topicId: string,
title: string,
description: string,
repoDir: string,
) {
const hash = store.putObject({
content: description,
artifacts: { title, repoDir },
});
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.user',
key: topicId,
hash,
});
}
it('parallel topics: one approved, one rejected then re-coded', async () => {
setup();
// Custom reviewer: rejects task-b on first review, approves everything else
const reviewCounts: Record<string, number> = {};
const codingTask = createCodingWorkflow({
reviewerFn: async (ctx, s) => {
reviewCounts[ctx.topicId] = (reviewCounts[ctx.topicId] ?? 0) + 1;
reviewerFn: async (
_chain: WorkflowMessage[],
topicId: string,
s: PulseStore,
) => {
reviewCounts[topicId] = (reviewCounts[topicId] ?? 0) + 1;
const shouldReject =
ctx.topicId === 'task-b' && reviewCounts[ctx.topicId] === 1;
topicId === 'task-b' && reviewCounts[topicId] === 1;
const hash = s.putObject({
content: shouldReject ? 'Needs work' : 'LGTM',
});
s.appendEvent({
occurredAt: Date.now(),
kind: 'coding.reviewed',
key: ctx.topicId,
kind: 'coding.reviewer',
key: topicId,
hash,
meta: JSON.stringify({
topicId: ctx.topicId,
verdict: shouldReject ? 'rejected' : 'approved',
comments: shouldReject ? 'Needs work' : 'LGTM',
}),
});
},
@@ -56,29 +80,8 @@ describe('Council v2 E2E', () => {
const rule = createWorkflowRule(codingTask, store);
// Create two topics
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.created',
key: 'task-a',
meta: JSON.stringify({
topicId: 'task-a',
title: 'Feature A',
description: 'Build feature A',
repoDir: '/tmp/a',
}),
});
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.created',
key: 'task-b',
meta: JSON.stringify({
topicId: 'task-b',
title: 'Feature B',
description: 'Build feature B',
repoDir: '/tmp/b',
}),
});
createUserEvent('task-a', 'Feature A', 'Build feature A', '/tmp/a');
createUserEvent('task-b', 'Feature B', 'Build feature B', '/tmp/b');
// Run ticks until both are closed
const allExecuted: { topicId: string; role: string }[] = [];
@@ -89,22 +92,18 @@ describe('Council v2 E2E', () => {
}
// Verify both closed
const closedEvents = store.queryByKind('coding.closed');
const closedEvents = store.queryByKind('coding.closer');
expect(closedEvents.length).toBe(2);
const closedIds = closedEvents.map((e) => JSON.parse(e.meta!).topicId);
const closedIds = closedEvents.map((e) => e.key);
expect(closedIds.sort()).toEqual(['task-a', 'task-b']);
// task-b should have been coded twice (rejected then re-coded)
const codedEvents = store.queryByKind('coding.coded');
const taskBCoded = codedEvents.filter(
(e) => JSON.parse(e.meta!).topicId === 'task-b',
);
const codedEvents = store.queryByKind('coding.coder');
const taskBCoded = codedEvents.filter((e) => e.key === 'task-b');
expect(taskBCoded.length).toBe(2);
// task-a should have been coded once
const taskACoded = codedEvents.filter(
(e) => JSON.parse(e.meta!).topicId === 'task-a',
);
const taskACoded = codedEvents.filter((e) => e.key === 'task-a');
expect(taskACoded.length).toBe(1);
// Verify complete event chain
@@ -112,6 +111,5 @@ describe('Council v2 E2E', () => {
.getAfter(0)
.filter((e) => e.kind.startsWith('coding.'));
expect(allEvents.length).toBeGreaterThanOrEqual(10);
// 2 created + 2 analyzed + 3 coded (a:1, b:2) + 3 reviewed (a:1, b:2) + 2 closed = 12
});
});
+2 -1
View File
@@ -938,7 +938,6 @@ export { buildPersonasFromEvents } from './persona.js';
// ── Council v2: WorkflowType ────────────────────────────────────────
export type { CodingWorkflowContext } from './workflows/coding-workflow.js';
export { createCodingWorkflow } from './workflows/coding-workflow.js';
export { createWorkflowTicker } from './workflows/index.js';
export { createArchitectRole } from './workflows/roles/architect-llm.js';
@@ -950,7 +949,9 @@ export type {
} from './workflows/workflow-rule-adapter.js';
export { createWorkflowRule } from './workflows/workflow-rule-adapter.js';
export type {
TopicSummary,
WorkflowAction,
WorkflowMessage,
WorkflowType,
} from './workflows/workflow-type.js';
@@ -1,5 +1,5 @@
/**
* CodingTask WorkflowType tests — full lifecycle with real SQLite store.
* CodingTask WorkflowType tests — message chain + CAS model.
*
* 小橘 🍊 (NEKO Team)
*/
@@ -11,6 +11,7 @@ import { join } from 'node:path';
import { createStore, type PulseStore } from '../store.js';
import { createCodingWorkflow } from './coding-workflow.js';
import { createWorkflowRule } from './workflow-rule-adapter.js';
import type { WorkflowMessage } from './workflow-type.js';
describe('CodingTask WorkflowType', () => {
let store: PulseStore;
@@ -31,52 +32,55 @@ describe('CodingTask WorkflowType', () => {
if (tmpDir) rmSync(tmpDir, { recursive: true, force: true });
});
it('full lifecycle: created → analyzed → coded → reviewed → closed', async () => {
function createUserEvent(
topicId: string,
title: string,
description: string,
repoDir: string,
) {
const hash = store.putObject({
content: description,
artifacts: { title, repoDir },
});
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.user',
key: topicId,
hash,
});
}
it('full lifecycle: user → architect → coder → reviewer → closer', async () => {
setup();
const codingTask = createCodingWorkflow();
const rule = createWorkflowRule(codingTask, store);
// Write coding.created event
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.created',
key: 'task-1',
meta: JSON.stringify({
topicId: 'task-1',
title: 'Fix login bug',
description: 'Users cannot log in with SSO',
repoDir: '/tmp/repo',
}),
});
createUserEvent(
'task-1',
'Fix login bug',
'Users cannot log in with SSO',
'/tmp/repo',
);
// Tick 1: architect
const r1 = await rule.tick();
expect(r1.executed).toEqual([{ topicId: 'task-1', role: 'architect' }]);
// Verify coding.analyzed was written
const events1 = store.queryByKind('coding.analyzed');
expect(events1.length).toBe(1);
expect(store.queryByKind('coding.architect').length).toBe(1);
// Tick 2: coder
const r2 = await rule.tick();
expect(r2.executed).toEqual([{ topicId: 'task-1', role: 'coder' }]);
const events2 = store.queryByKind('coding.coded');
expect(events2.length).toBe(1);
expect(store.queryByKind('coding.coder').length).toBe(1);
// Tick 3: reviewer
const r3 = await rule.tick();
expect(r3.executed).toEqual([{ topicId: 'task-1', role: 'reviewer' }]);
const events3 = store.queryByKind('coding.reviewed');
expect(events3.length).toBe(1);
expect(store.queryByKind('coding.reviewer').length).toBe(1);
// Tick 4: closer (approved → close)
const r4 = await rule.tick();
expect(r4.executed).toEqual([{ topicId: 'task-1', role: 'closer' }]);
const events4 = store.queryByKind('coding.closed');
expect(events4.length).toBe(1);
expect(store.queryByKind('coding.closer').length).toBe(1);
// Tick 5: nothing to do
const r5 = await rule.tick();
@@ -88,34 +92,29 @@ describe('CodingTask WorkflowType', () => {
let reviewCount = 0;
const codingTask = createCodingWorkflow({
reviewerFn: async (ctx, s) => {
reviewerFn: async (
_chain: WorkflowMessage[],
topicId: string,
s: PulseStore,
) => {
reviewCount++;
const hash = s.putObject({
content: reviewCount === 1 ? 'Needs fixes' : 'Looks good after fixes',
});
s.appendEvent({
occurredAt: Date.now(),
kind: 'coding.reviewed',
key: ctx.topicId,
kind: 'coding.reviewer',
key: topicId,
hash,
meta: JSON.stringify({
topicId: ctx.topicId,
verdict: reviewCount === 1 ? 'rejected' : 'approved',
comments:
reviewCount === 1 ? 'Needs fixes' : 'Looks good after fixes',
}),
});
},
});
const rule = createWorkflowRule(codingTask, store);
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.created',
key: 'task-2',
meta: JSON.stringify({
topicId: 'task-2',
title: 'Add feature X',
description: 'New feature',
repoDir: '/tmp/repo',
}),
});
createUserEvent('task-2', 'Add feature X', 'New feature', '/tmp/repo');
// architect → coder → reviewer (rejects)
await rule.tick(); // architect
@@ -151,34 +150,47 @@ describe('CodingTask WorkflowType', () => {
const codingTask = createCodingWorkflow();
const rule = createWorkflowRule(codingTask, store, logStore);
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.created',
key: 'task-3',
meta: JSON.stringify({
topicId: 'task-3',
title: 'Test log isolation',
description: 'Role logs go to logStore',
repoDir: '/tmp/repo',
}),
});
createUserEvent(
'task-3',
'Test log isolation',
'Role logs go to logStore',
'/tmp/repo',
);
// Tick: architect runs
await rule.tick();
// Role logs in logStore
expect(logStore.queryByKind('coding.role-started').length).toBe(1);
expect(logStore.queryByKind('coding.role-completed').length).toBe(1);
// No role logs in business store
expect(store.queryByKind('coding.role-started').length).toBe(0);
expect(store.queryByKind('coding.role-completed').length).toBe(0);
// Business event still written to store
expect(store.queryByKind('coding.analyzed').length).toBe(1);
expect(store.queryByKind('coding.architect').length).toBe(1);
} finally {
logStore.close();
rmSync(logTmpDir, { recursive: true, force: true });
}
});
it('CAS stores full content, events only have hash', async () => {
setup();
const codingTask = createCodingWorkflow();
const rule = createWorkflowRule(codingTask, store);
createUserEvent('task-4', 'CAS test', 'Verify CAS storage', '/tmp/repo');
await rule.tick(); // architect
const architectEvents = store.queryByKind('coding.architect');
expect(architectEvents.length).toBe(1);
expect(architectEvents[0].hash).toBeTruthy();
// meta should be empty/undefined (no verdict for architect)
const meta = architectEvents[0].meta;
expect(
!meta || meta === '{}' || meta === 'null' || meta === undefined,
).toBe(true);
// Content retrievable from CAS
const content = store.getObject(architectEvents[0].hash!);
expect(content).toBeTruthy();
expect((content as any).content).toBeTruthy();
expect((content as any).artifacts?.targetFiles).toBeInstanceOf(Array);
});
});
+125 -123
View File
@@ -1,152 +1,146 @@
/**
* CodingTask WorkflowType — Council v2 implementation.
* CodingTask WorkflowType — message chain + CAS model.
*
* Events:
* coding.created { topicId, title, description, repoDir }
* coding.analyzed { topicId, analysis, targetFiles }
* coding.coded { topicId, filesChanged, testsPassed, summary }
* coding.reviewed { topicId, verdict: 'approved'|'rejected', comments }
* coding.closed { topicId, summary }
* Events use `coding.{role}` naming:
* coding.user — user creates a task
* coding.architect — architect analysis (CAS)
* coding.coder — coder output (CAS)
* coding.reviewer — review verdict (CAS + meta.verdict)
* coding.closer task closed (CAS)
*
* 小橘 🍊 (NEKO Team)
*/
import type { PulseStore } from '../store.js';
import type { WorkflowAction, WorkflowType } from './workflow-type.js';
// ── Context ────────────────────────────────────────────────────
export interface CodingWorkflowContext {
topicId: string;
title: string;
description: string;
repoDir: string;
analysis?: { analysis: string; targetFiles: string[] };
codeResult?: {
filesChanged: string[];
testsPassed: boolean;
summary: string;
};
reviewResult?: { verdict: 'approved' | 'rejected'; comments: string };
codeCount: number;
reviewCount: number;
closed?: boolean;
}
import type {
TopicSummary,
WorkflowAction,
WorkflowMessage,
WorkflowType,
} from './workflow-type.js';
// ── Roles type ─────────────────────────────────────────────────
type RoleFn = (
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void>;
type CodingWorkflowRoles = {
architect: (ctx: CodingWorkflowContext, store: PulseStore) => Promise<void>;
coder: (ctx: CodingWorkflowContext, store: PulseStore) => Promise<void>;
reviewer: (ctx: CodingWorkflowContext, store: PulseStore) => Promise<void>;
closer: (ctx: CodingWorkflowContext, store: PulseStore) => Promise<void>;
architect: RoleFn;
coder: RoleFn;
reviewer: RoleFn;
closer: RoleFn;
};
// ── JSONata projection ────────────────────────────────────────
// ── Projection function ────────────────────────────────────────
/**
* JSONata expression that projects coding.* events into { [topicId]: CodingWorkflowContext }.
*
* Strategy: filter coding.* events, group by topicId (from meta), build context.
*/
const CODING_TASK_PROJECTION = `
(
$coding := $[kind ~> /^coding\\./];
$ids := $distinct($coding.($parse(meta).topicId));
$merge($ids.(
$id := $;
$evts := $coding[$parse(meta).topicId = $id];
$created := $evts[kind = 'coding.created'][0];
$analyzed := $evts[kind = 'coding.analyzed'][0];
$coded := $evts[kind = 'coding.coded'];
$lastCoded := $coded[$count($coded) - 1];
$reviewed := $evts[kind = 'coding.reviewed'];
$lastReviewed := $reviewed[$count($reviewed) - 1];
$closedEvt := $evts[kind = 'coding.closed'][0];
$cm := $parse($created.meta);
$ctx := {
"topicId": $id,
"title": $cm.title,
"description": $cm.description,
"repoDir": $cm.repoDir,
"codeCount": $count($coded),
"reviewCount": $count($reviewed)
};
$ctx := $analyzed ? $merge([$ctx, {"analysis": {"analysis": $parse($analyzed.meta).analysis, "targetFiles": $parse($analyzed.meta).targetFiles}}]) : $ctx;
$ctx := $lastCoded ? $merge([$ctx, {"codeResult": {"filesChanged": $parse($lastCoded.meta).filesChanged, "testsPassed": $parse($lastCoded.meta).testsPassed, "summary": $parse($lastCoded.meta).summary}}]) : $ctx;
$ctx := $lastReviewed ? $merge([$ctx, {"reviewResult": {"verdict": $parse($lastReviewed.meta).verdict, "comments": $parse($lastReviewed.meta).comments}}]) : $ctx;
$ctx := $closedEvt ? $merge([$ctx, {"closed": true}]) : $ctx;
{ $id: $ctx }
))
)
`;
function codingProjection(
events: Array<{
kind: string;
key?: string | null;
meta?: string | null;
hash?: string | null;
occurredAt: number;
}>,
): Map<string, TopicSummary> {
const topics = new Map<string, TopicSummary>();
// events are ordered by id ASC
for (const e of events) {
if (!e.kind.startsWith('coding.')) continue;
const topicId = e.key;
if (!topicId) continue;
const role = e.kind.split('.')[1];
let meta: Record<string, unknown> | undefined;
if (e.meta) {
try {
meta = JSON.parse(e.meta);
} catch {}
}
topics.set(topicId, { lastRole: role, meta });
}
return topics;
}
// ── Default mock role implementations ──────────────────────────
function defaultArchitect(
ctx: CodingWorkflowContext,
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
): Promise<void> {
const userMsg = chain.find((m) => m.role === 'user');
const userContent = (userMsg?.content as any) ?? {};
const hash = store.putObject({
content: `[mock] Analysis for "${userContent.artifacts?.title ?? topicId}": ${userContent.content ?? ''}`,
artifacts: { targetFiles: ['src/main.ts', 'src/utils.ts'] },
});
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.analyzed',
key: ctx.topicId,
meta: JSON.stringify({
topicId: ctx.topicId,
analysis: `[mock] Analysis for "${ctx.title}": ${ctx.description}`,
targetFiles: ['src/main.ts', 'src/utils.ts'],
}),
kind: 'coding.architect',
key: topicId,
hash,
});
return Promise.resolve();
}
function defaultCoder(
ctx: CodingWorkflowContext,
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
): Promise<void> {
const architectMsg = chain.find((m) => m.role === 'architect');
const architectContent = (architectMsg?.content as any) ?? {};
const hash = store.putObject({
content: `[mock] Implemented changes`,
artifacts: {
filesChanged: architectContent.artifacts?.targetFiles ?? ['src/main.ts'],
testsPassed: true,
},
});
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.coded',
key: ctx.topicId,
meta: JSON.stringify({
topicId: ctx.topicId,
filesChanged: ctx.analysis?.targetFiles ?? ['src/main.ts'],
testsPassed: true,
summary: `[mock] Implemented "${ctx.title}"`,
}),
kind: 'coding.coder',
key: topicId,
hash,
});
return Promise.resolve();
}
function defaultReviewer(
ctx: CodingWorkflowContext,
_chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
): Promise<void> {
const hash = store.putObject({
content: `[mock] Code looks good`,
});
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.reviewed',
key: ctx.topicId,
meta: JSON.stringify({
topicId: ctx.topicId,
verdict: 'approved' as const,
comments: `[mock] Code looks good for "${ctx.title}"`,
}),
kind: 'coding.reviewer',
key: topicId,
hash,
meta: JSON.stringify({ verdict: 'approved' }),
});
return Promise.resolve();
}
function defaultCloser(
ctx: CodingWorkflowContext,
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
): Promise<void> {
const userMsg = chain.find((m) => m.role === 'user');
const userContent = (userMsg?.content as any) ?? {};
const hash = store.putObject({
content: `Completed: ${userContent.artifacts?.title ?? topicId}`,
});
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.closed',
key: ctx.topicId,
meta: JSON.stringify({
topicId: ctx.topicId,
summary: `Completed: ${ctx.title}`,
}),
kind: 'coding.closer',
key: topicId,
hash,
});
return Promise.resolve();
}
@@ -154,13 +148,10 @@ function defaultCloser(
// ── Factory ────────────────────────────────────────────────────
export function createCodingWorkflow(opts?: {
architectFn?: (
ctx: CodingWorkflowContext,
store: PulseStore,
) => Promise<void>;
coderFn?: (ctx: CodingWorkflowContext, store: PulseStore) => Promise<void>;
reviewerFn?: (ctx: CodingWorkflowContext, store: PulseStore) => Promise<void>;
}): WorkflowType<CodingWorkflowContext, CodingWorkflowRoles> {
architectFn?: RoleFn;
coderFn?: RoleFn;
reviewerFn?: RoleFn;
}): WorkflowType<CodingWorkflowRoles> {
const roles: CodingWorkflowRoles = {
architect: opts?.architectFn ?? defaultArchitect,
coder: opts?.coderFn ?? defaultCoder,
@@ -169,23 +160,34 @@ export function createCodingWorkflow(opts?: {
};
const moderator = (
_roles: CodingWorkflowRoles,
topics: Record<string, CodingWorkflowContext>,
topics: Map<string, TopicSummary>,
): WorkflowAction<keyof CodingWorkflowRoles & string>[] => {
const actions: WorkflowAction<keyof CodingWorkflowRoles & string>[] = [];
for (const [topicId, ctx] of Object.entries(topics)) {
if (ctx.closed) continue;
if (!ctx.analysis) {
actions.push({ topicId, role: 'architect' });
} else if (!ctx.codeResult) {
actions.push({ topicId, role: 'coder' });
} else if (!ctx.reviewResult || ctx.codeCount > ctx.reviewCount) {
// Need review: either never reviewed, or re-coded after last review
actions.push({ topicId, role: 'reviewer' });
} else if (ctx.reviewResult.verdict === 'rejected') {
actions.push({ topicId, role: 'coder' });
} else if (ctx.reviewResult.verdict === 'approved') {
actions.push({ topicId, role: 'closer' });
for (const [topicId, summary] of topics) {
const { lastRole, meta } = summary;
let nextRole: string | null = null;
switch (lastRole) {
case 'user':
nextRole = 'architect';
break;
case 'architect':
nextRole = 'coder';
break;
case 'coder':
nextRole = 'reviewer';
break;
case 'reviewer':
nextRole = meta?.verdict === 'rejected' ? 'coder' : 'closer';
break;
case 'closer':
nextRole = null;
break;
}
if (nextRole) {
actions.push({
topicId,
role: nextRole as keyof CodingWorkflowRoles & string,
});
}
}
return actions;
@@ -193,7 +195,7 @@ export function createCodingWorkflow(opts?: {
return {
name: 'coding',
projection: CODING_TASK_PROJECTION,
projection: codingProjection,
roles,
moderator,
};
+7 -8
View File
@@ -4,10 +4,7 @@
* 小橘 🍊 (NEKO Team)
*/
export {
type CodingWorkflowContext,
createCodingWorkflow,
} from './coding-workflow.js';
export { createCodingWorkflow } from './coding-workflow.js';
export { createArchitectRole } from './roles/architect-llm.js';
export { createCoderRole } from './roles/coder-cursor.js';
export { createReviewerRole } from './roles/reviewer-cursor.js';
@@ -16,7 +13,12 @@ export {
type WorkflowRule,
type WorkflowTickResult,
} from './workflow-rule-adapter.js';
export type { WorkflowAction, WorkflowType } from './workflow-type.js';
export type {
TopicSummary,
WorkflowAction,
WorkflowMessage,
WorkflowType,
} from './workflow-type.js';
import type { WorkflowRule } from './workflow-rule-adapter.js';
@@ -29,9 +31,6 @@ export function createWorkflowTicker(
): () => Promise<void> {
let pending: Promise<void> | null = null;
return () => {
// Serialize tick calls: if a tick is in progress, chain after it.
// This ensures the Moore machine's critical section (read → diff → update baseline)
// is never concurrent, even when called from fire-and-forget executors.
const run = async () => {
for (const rule of rules) {
await rule.tick();
@@ -1,5 +1,5 @@
/**
* Architect role tests — mock LLM, real SQLite store.
* Architect role tests — mock LLM, real SQLite store (message chain model).
*
* 小橘 🍊 (NEKO Team)
*/
@@ -10,7 +10,7 @@ import { tmpdir } from 'node:os';
import { join } from 'node:path';
import type { LlmClient } from '../../llm-client.js';
import { createStore, type PulseStore } from '../../store.js';
import type { CodingWorkflowContext } from '../coding-workflow.js';
import type { WorkflowMessage } from '../workflow-type.js';
import { createArchitectRole } from './architect-llm.js';
describe('createArchitectRole', () => {
@@ -32,7 +32,7 @@ describe('createArchitectRole', () => {
if (tmpDir) rmSync(tmpDir, { recursive: true, force: true });
});
it('calls LLM and writes coding.analyzed event', async () => {
it('calls LLM and writes coding.architect event with CAS', async () => {
setup();
const mockLlm: LlmClient = {
@@ -46,37 +46,27 @@ describe('createArchitectRole', () => {
const role = createArchitectRole(mockLlm);
const ctx: CodingWorkflowContext = {
topicId: 'test-task-1',
title: 'Fix the bug',
description: 'There is a bug in main.ts',
repoDir: '/tmp/test-repo',
codeCount: 0,
reviewCount: 0,
};
const chain: WorkflowMessage[] = [
{
role: 'user',
content: {
content: 'There is a bug in main.ts',
artifacts: { title: 'Fix the bug', repoDir: '/tmp/test-repo' },
},
timestamp: Date.now(),
},
];
// Write coding.created first
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.created',
key: ctx.topicId,
meta: JSON.stringify({
topicId: ctx.topicId,
title: ctx.title,
description: ctx.description,
repoDir: ctx.repoDir,
}),
});
await role(ctx, store);
await role(chain, 'test-task-1', store);
const events = store.getAfter(0);
const analyzed = events.find((e) => e.kind === 'coding.analyzed');
expect(analyzed).toBeDefined();
const meta = JSON.parse(analyzed!.meta!);
expect(meta.topicId).toBe('test-task-1');
expect(meta.analysis).toBe('Need to update main.ts');
expect(meta.targetFiles).toEqual(['src/main.ts']);
const architectEvt = events.find((e) => e.kind === 'coding.architect');
expect(architectEvt).toBeDefined();
expect(architectEvt!.hash).toBeTruthy();
const content = store.getObject(architectEvt!.hash!) as any;
expect(content.content).toBe('Need to update main.ts');
expect(content.artifacts.targetFiles).toEqual(['src/main.ts']);
});
it('handles non-JSON LLM response gracefully', async () => {
@@ -87,22 +77,25 @@ describe('createArchitectRole', () => {
};
const role = createArchitectRole(mockLlm);
const ctx: CodingWorkflowContext = {
topicId: 'test-2',
title: 'Task',
description: 'Desc',
repoDir: '/tmp',
codeCount: 0,
reviewCount: 0,
};
const chain: WorkflowMessage[] = [
{
role: 'user',
content: {
content: 'Desc',
artifacts: { title: 'Task', repoDir: '/tmp' },
},
timestamp: Date.now(),
},
];
await role(ctx, store);
await role(chain, 'test-2', store);
const events = store.getAfter(0);
const analyzed = events.find((e) => e.kind === 'coding.analyzed');
expect(analyzed).toBeDefined();
const meta = JSON.parse(analyzed!.meta!);
expect(meta.analysis).toBe('Just do it');
expect(meta.targetFiles).toEqual([]);
const architectEvt = events.find((e) => e.kind === 'coding.architect');
expect(architectEvt).toBeDefined();
const content = store.getObject(architectEvt!.hash!) as any;
expect(content.content).toBe('Just do it');
expect(content.artifacts.targetFiles).toEqual([]);
});
});
@@ -6,12 +6,22 @@
import type { LlmClient } from '../../llm-client.js';
import type { PulseStore } from '../../store.js';
import type { CodingWorkflowContext } from '../coding-workflow.js';
import type { WorkflowMessage } from '../workflow-type.js';
export function createArchitectRole(
llmClient: LlmClient,
): (ctx: CodingWorkflowContext, store: PulseStore) => Promise<void> {
return async (ctx, store) => {
): (
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void> {
return async (chain, topicId, store) => {
const userMsg = chain.find((m) => m.role === 'user');
const userContent = (userMsg?.content as any) ?? {};
const title = userContent.artifacts?.title ?? topicId;
const description = userContent.content ?? '';
const repoDir = userContent.artifacts?.repoDir ?? '';
const resp = await llmClient.chat({
messages: [
{
@@ -21,7 +31,7 @@ export function createArchitectRole(
},
{
role: 'user',
content: `Task: ${ctx.title}\nDescription: ${ctx.description}\nRepo: ${ctx.repoDir}`,
content: `Task: ${title}\nDescription: ${description}\nRepo: ${repoDir}`,
},
],
});
@@ -33,15 +43,15 @@ export function createArchitectRole(
parsed = { analysis: resp.content ?? 'No analysis', targetFiles: [] };
}
const hash = store.putObject({
content: parsed.analysis ?? 'No analysis',
artifacts: { targetFiles: parsed.targetFiles ?? [] },
});
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.analyzed',
key: ctx.topicId,
meta: JSON.stringify({
topicId: ctx.topicId,
analysis: parsed.analysis ?? 'No analysis',
targetFiles: parsed.targetFiles ?? [],
}),
kind: 'coding.architect',
key: topicId,
hash,
});
};
}
@@ -1,6 +1,5 @@
/**
* Coder role tests — mock Cursor agent (cannot spawn real agent in test).
* Tests the event writing logic using the default mock from CodingTaskType.
* Coder role tests — mock Cursor agent (message chain model).
*
* 小橘 🍊 (NEKO Team)
*/
@@ -10,7 +9,7 @@ import { mkdtempSync, rmSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { createStore, type PulseStore } from '../../store.js';
import type { CodingWorkflowContext } from '../coding-workflow.js';
import type { WorkflowMessage } from '../workflow-type.js';
describe('coder-cursor role', () => {
let store: PulseStore;
@@ -31,54 +30,61 @@ describe('coder-cursor role', () => {
if (tmpDir) rmSync(tmpDir, { recursive: true, force: true });
});
it('writes coding.coded event with correct structure', async () => {
it('writes coding.coder event with CAS', async () => {
setup();
// Use a mock coder that simulates what createCoderRole does (without spawning Cursor)
const mockCoderFn = async (ctx: CodingWorkflowContext, s: PulseStore) => {
// Mock coder that simulates what createCoderRole does
const mockCoderFn = async (
chain: WorkflowMessage[],
topicId: string,
s: PulseStore,
) => {
const architectMsg = chain.find((m) => m.role === 'architect');
const architectContent = (architectMsg?.content as any) ?? {};
const hash = s.putObject({
content: 'Mock implementation done',
artifacts: {
filesChanged: architectContent.artifacts?.targetFiles ?? [],
testsPassed: true,
},
});
s.appendEvent({
occurredAt: Date.now(),
kind: 'coding.coded',
key: ctx.topicId,
meta: JSON.stringify({
topicId: ctx.topicId,
filesChanged: ctx.analysis?.targetFiles ?? [],
testsPassed: true,
summary: 'Mock implementation done',
}),
kind: 'coding.coder',
key: topicId,
hash,
});
};
const ctx: CodingWorkflowContext = {
topicId: 'coder-test-1',
title: 'Add feature',
description: 'Add a new feature',
repoDir: '/tmp/test-repo',
analysis: { analysis: 'Modify utils', targetFiles: ['src/utils.ts'] },
codeCount: 0,
reviewCount: 0,
};
const chain: WorkflowMessage[] = [
{
role: 'user',
content: {
content: 'Add a new feature',
artifacts: { title: 'Add feature', repoDir: '/tmp/test-repo' },
},
timestamp: Date.now(),
},
{
role: 'architect',
content: {
content: 'Modify utils',
artifacts: { targetFiles: ['src/utils.ts'] },
},
timestamp: Date.now(),
},
];
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.created',
key: ctx.topicId,
meta: JSON.stringify({
topicId: ctx.topicId,
title: ctx.title,
description: ctx.description,
repoDir: ctx.repoDir,
}),
});
await mockCoderFn(ctx, store);
await mockCoderFn(chain, 'coder-test-1', store);
const events = store.getAfter(0);
const coded = events.find((e) => e.kind === 'coding.coded');
const coded = events.find((e) => e.kind === 'coding.coder');
expect(coded).toBeDefined();
const meta = JSON.parse(coded!.meta!);
expect(meta.topicId).toBe('coder-test-1');
expect(meta.filesChanged).toEqual(['src/utils.ts']);
expect(meta.testsPassed).toBe(true);
expect(coded!.hash).toBeTruthy();
const content = store.getObject(coded!.hash!) as any;
expect(content.content).toBe('Mock implementation done');
expect(content.artifacts.filesChanged).toEqual(['src/utils.ts']);
expect(content.artifacts.testsPassed).toBe(true);
});
});
@@ -8,39 +8,54 @@ import { mkdirSync, unlinkSync, writeFileSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import type { PulseStore } from '../../store.js';
import type { CodingWorkflowContext } from '../coding-workflow.js';
import type { WorkflowMessage } from '../workflow-type.js';
export function createCoderRole(opts: {
agentBin: string;
}): (ctx: CodingWorkflowContext, store: PulseStore) => Promise<void> {
return async (ctx, store) => {
const prompt = `## Task: ${ctx.title}
}): (
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void> {
return async (chain, topicId, store) => {
const userMsg = chain.find((m) => m.role === 'user');
const userContent = (userMsg?.content as any) ?? {};
const title = userContent.artifacts?.title ?? topicId;
const description = userContent.content ?? '';
const repoDir = userContent.artifacts?.repoDir ?? '/tmp';
${ctx.description}
const architectMsg = chain.find((m) => m.role === 'architect');
const architectContent = (architectMsg?.content as any) ?? {};
const prompt = `## Task: ${title}
${description}
## Architect Analysis
${ctx.analysis?.analysis ?? 'None'}
${architectContent.content ?? 'None'}
## Target Files
${(ctx.analysis?.targetFiles ?? []).join(', ') || 'Not specified'}
${(architectContent.artifacts?.targetFiles ?? []).join(', ') || 'Not specified'}
## Instructions
Implement the changes. Do NOT modify any existing test files. Only create or modify source files as needed.
If the task asks to create a new file, create it. If it asks to modify existing files, modify them.
Run tests if applicable. Commit your changes.`;
const result = await runCursorAgent(opts.agentBin, prompt, ctx.repoDir);
const result = await runCursorAgent(opts.agentBin, prompt, repoDir);
const hash = store.putObject({
content: result.output,
artifacts: {
filesChanged: architectContent.artifacts?.targetFiles ?? [],
testsPassed: result.success,
},
});
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.coded',
key: ctx.topicId,
meta: JSON.stringify({
topicId: ctx.topicId,
filesChanged: ctx.analysis?.targetFiles ?? [],
testsPassed: result.success,
summary: result.output.slice(0, 500),
}),
kind: 'coding.coder',
key: topicId,
hash,
});
};
}
@@ -77,7 +92,7 @@ async function runCursorAgent(
},
);
const timer = setTimeout(() => proc.kill(), 300_000); // 5 min timeout
const timer = setTimeout(() => proc.kill(), 300_000);
const exitCode = await proc.exited;
clearTimeout(timer);
@@ -1,5 +1,5 @@
/**
* Reviewer role tests — mock Cursor agent.
* Reviewer role tests — mock Cursor agent (message chain model).
*
* 小橘 🍊 (NEKO Team)
*/
@@ -9,7 +9,7 @@ import { mkdtempSync, rmSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { createStore, type PulseStore } from '../../store.js';
import type { CodingWorkflowContext } from '../coding-workflow.js';
import type { WorkflowMessage } from '../workflow-type.js';
describe('reviewer-cursor role', () => {
let store: PulseStore;
@@ -30,68 +30,70 @@ describe('reviewer-cursor role', () => {
if (tmpDir) rmSync(tmpDir, { recursive: true, force: true });
});
it('writes coding.reviewed event with approved verdict', async () => {
it('writes coding.reviewer event with approved verdict and CAS', async () => {
setup();
const mockReviewerFn = async (
ctx: CodingWorkflowContext,
_chain: WorkflowMessage[],
topicId: string,
s: PulseStore,
) => {
const verdict: 'approved' | 'rejected' = 'approved';
const hash = s.putObject({ content: 'LGTM, code looks good. APPROVED.' });
s.appendEvent({
occurredAt: Date.now(),
kind: 'coding.reviewed',
key: ctx.topicId,
meta: JSON.stringify({
topicId: ctx.topicId,
verdict,
comments: 'LGTM, code looks good. APPROVED.',
}),
kind: 'coding.reviewer',
key: topicId,
hash,
meta: JSON.stringify({ verdict: 'approved' }),
});
};
const ctx: CodingWorkflowContext = {
topicId: 'review-test-1',
title: 'Fix bug',
description: 'Fix the bug',
repoDir: '/tmp/test-repo',
analysis: { analysis: 'Bug in utils', targetFiles: ['src/utils.ts'] },
codeResult: {
filesChanged: ['src/utils.ts'],
testsPassed: true,
summary: 'Fixed',
const chain: WorkflowMessage[] = [
{
role: 'user',
content: {
content: 'Fix the bug',
artifacts: { title: 'Fix bug', repoDir: '/tmp/test-repo' },
},
timestamp: Date.now(),
},
codeCount: 1,
reviewCount: 0,
};
{
role: 'architect',
content: {
content: 'Bug in utils',
artifacts: { targetFiles: ['src/utils.ts'] },
},
timestamp: Date.now(),
},
{
role: 'coder',
content: {
content: 'Fixed',
artifacts: { filesChanged: ['src/utils.ts'], testsPassed: true },
},
timestamp: Date.now(),
},
];
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.created',
key: ctx.topicId,
meta: JSON.stringify({
topicId: ctx.topicId,
title: ctx.title,
description: ctx.description,
repoDir: ctx.repoDir,
}),
});
await mockReviewerFn(ctx, store);
await mockReviewerFn(chain, 'review-test-1', store);
const events = store.getAfter(0);
const reviewed = events.find((e) => e.kind === 'coding.reviewed');
const reviewed = events.find((e) => e.kind === 'coding.reviewer');
expect(reviewed).toBeDefined();
expect(reviewed!.hash).toBeTruthy();
const meta = JSON.parse(reviewed!.meta!);
expect(meta.topicId).toBe('review-test-1');
expect(meta.verdict).toBe('approved');
const content = store.getObject(reviewed!.hash!) as any;
expect(content.content).toContain('APPROVED');
});
it('writes rejected verdict when output contains "rejected"', async () => {
it('writes rejected verdict', async () => {
setup();
const mockReviewerFn = async (
ctx: CodingWorkflowContext,
_chain: WorkflowMessage[],
topicId: string,
s: PulseStore,
) => {
const output = 'Code has issues. REJECTED: missing error handling.';
@@ -100,32 +102,39 @@ describe('reviewer-cursor role', () => {
.includes('rejected')
? 'rejected'
: 'approved';
const hash = s.putObject({ content: output });
s.appendEvent({
occurredAt: Date.now(),
kind: 'coding.reviewed',
key: ctx.topicId,
meta: JSON.stringify({
topicId: ctx.topicId,
verdict,
comments: output.slice(0, 500),
}),
kind: 'coding.reviewer',
key: topicId,
hash,
meta: JSON.stringify({ verdict }),
});
};
const ctx: CodingWorkflowContext = {
topicId: 'review-test-2',
title: 'Bad code',
description: 'Bad',
repoDir: '/tmp',
codeResult: { filesChanged: [], testsPassed: false, summary: 'Bad' },
codeCount: 1,
reviewCount: 0,
};
const chain: WorkflowMessage[] = [
{
role: 'user',
content: {
content: 'Bad',
artifacts: { title: 'Bad code', repoDir: '/tmp' },
},
timestamp: Date.now(),
},
{
role: 'coder',
content: {
content: 'Bad impl',
artifacts: { filesChanged: [], testsPassed: false },
},
timestamp: Date.now(),
},
];
await mockReviewerFn(ctx, store);
await mockReviewerFn(chain, 'review-test-2', store);
const events = store.getAfter(0);
const reviewed = events.find((e) => e.kind === 'coding.reviewed');
const reviewed = events.find((e) => e.kind === 'coding.reviewer');
const meta = JSON.parse(reviewed!.meta!);
expect(meta.verdict).toBe('rejected');
});
@@ -8,41 +8,53 @@ import { mkdirSync, unlinkSync, writeFileSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import type { PulseStore } from '../../store.js';
import type { CodingWorkflowContext } from '../coding-workflow.js';
import type { WorkflowMessage } from '../workflow-type.js';
export function createReviewerRole(opts: {
agentBin: string;
}): (ctx: CodingWorkflowContext, store: PulseStore) => Promise<void> {
return async (ctx, store) => {
const prompt = `## Code Review: ${ctx.title}
}): (
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void> {
return async (chain, topicId, store) => {
const userMsg = chain.find((m) => m.role === 'user');
const userContent = (userMsg?.content as any) ?? {};
const title = userContent.artifacts?.title ?? topicId;
const repoDir = userContent.artifacts?.repoDir ?? '/tmp';
const coderMsg = [...chain].reverse().find((m) => m.role === 'coder');
const coderContent = (coderMsg?.content as any) ?? {};
const prompt = `## Code Review: ${title}
## What was done
${ctx.codeResult?.summary ?? 'Unknown'}
${coderContent.content ?? 'Unknown'}
## Files changed
${(ctx.codeResult?.filesChanged ?? []).join(', ')}
${(coderContent.artifacts?.filesChanged ?? []).join(', ')}
## Instructions
Review the recent changes for correctness, security, and code quality.
Do NOT modify any files. Only output your review.
End with a clear verdict: APPROVED or REJECTED with reasons.`;
const result = await runCursorAgent(opts.agentBin, prompt, ctx.repoDir);
const result = await runCursorAgent(opts.agentBin, prompt, repoDir);
const output = result.output.toLowerCase();
const verdict: 'approved' | 'rejected' = output.includes('rejected')
? 'rejected'
: 'approved';
const hash = store.putObject({
content: result.output,
});
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.reviewed',
key: ctx.topicId,
meta: JSON.stringify({
topicId: ctx.topicId,
verdict,
comments: result.output.slice(0, 500),
}),
kind: 'coding.reviewer',
key: topicId,
hash,
meta: JSON.stringify({ verdict }),
});
};
}
@@ -79,7 +91,7 @@ async function runCursorAgent(
},
);
const timer = setTimeout(() => proc.kill(), 300_000); // 5 min timeout
const timer = setTimeout(() => proc.kill(), 300_000);
const exitCode = await proc.exited;
clearTimeout(timer);
@@ -1,5 +1,5 @@
/**
* WorkflowRuleAdapter tests.
* WorkflowRuleAdapter tests — message chain model.
*
* 小橘 🍊 (NEKO Team)
*/
@@ -10,7 +10,12 @@ import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { createStore, type PulseStore } from '../store.js';
import { createWorkflowRule } from './workflow-rule-adapter.js';
import type { WorkflowAction, WorkflowType } from './workflow-type.js';
import type {
TopicSummary,
WorkflowAction,
WorkflowMessage,
WorkflowType,
} from './workflow-type.js';
describe('createWorkflowRule', () => {
let store: PulseStore;
@@ -39,52 +44,59 @@ describe('createWorkflowRule', () => {
if (tmpDir) rmSync(tmpDir, { recursive: true, force: true });
});
it('executes a simple echo topic through tick', async () => {
it('executes a simple echo workflow through tick', async () => {
setup();
interface EchoCtx {
topicId: string;
message: string;
echoed?: boolean;
}
type EchoRoles = {
echo: (
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void>;
};
const echoType: WorkflowType<
EchoCtx,
{ echo: (ctx: EchoCtx, store: PulseStore) => Promise<void> }
> = {
const echoType: WorkflowType<EchoRoles> = {
name: 'echo',
projection: `
(
$created := $[kind = 'echo.created'];
$echoed := $[kind = 'echo.echoed'];
$echoedIds := $distinct($echoed.($parse(meta).topicId));
$merge($created.(
$m := $parse(meta);
{ $m.topicId: {
"topicId": $m.topicId,
"message": $m.message,
"echoed": $m.topicId in $echoedIds
}}
))
)
`,
projection: (events) => {
const topics = new Map<string, TopicSummary>();
for (const e of events) {
if (!e.kind.startsWith('echo.')) continue;
if (!e.key) continue;
topics.set(e.key, {
lastRole: e.kind.split('.')[1],
meta: e.meta
? (() => {
try {
return JSON.parse(e.meta!);
} catch {
return undefined;
}
})()
: undefined,
});
}
return topics;
},
roles: {
echo: async (ctx, s) => {
echo: async (chain, topicId, s) => {
const userMsg = chain.find((m) => m.role === 'created');
const userContent = (userMsg?.content as any) ?? {};
const hash = s.putObject({
content: `Echo: ${userContent.message ?? ''}`,
});
s.appendEvent({
occurredAt: Date.now(),
kind: 'echo.echoed',
key: ctx.topicId,
meta: JSON.stringify({
topicId: ctx.topicId,
reply: `Echo: ${ctx.message}`,
}),
key: topicId,
hash,
});
},
},
moderator: (_roles, topics) => {
moderator: (topics) => {
const actions: WorkflowAction<'echo'>[] = [];
for (const [topicId, ctx] of Object.entries(topics)) {
if (!ctx.echoed) actions.push({ topicId, role: 'echo' });
for (const [topicId, summary] of topics) {
if (summary.lastRole === 'created')
actions.push({ topicId, role: 'echo' });
}
return actions;
},
@@ -92,12 +104,13 @@ describe('createWorkflowRule', () => {
const rule = createWorkflowRule(echoType, store, logStore);
// Create a topic
// Create a topic with CAS
const hash = store.putObject({ message: 'hello' });
store.appendEvent({
occurredAt: Date.now(),
kind: 'echo.created',
key: 't1',
meta: JSON.stringify({ topicId: 't1', message: 'hello' }),
hash,
});
// First tick: should execute echo
@@ -108,7 +121,7 @@ describe('createWorkflowRule', () => {
const r2 = await rule.tick();
expect(r2.executed).toEqual([]);
// Verify role execution log events were written to logStore (not store)
// Verify role execution logs in logStore
const started = logStore.queryByKind('echo.role-started');
expect(started.length).toBe(1);
const startedMeta = JSON.parse(started[0].meta!);
@@ -116,58 +129,53 @@ describe('createWorkflowRule', () => {
const completed = logStore.queryByKind('echo.role-completed');
expect(completed.length).toBe(1);
const completedMeta = JSON.parse(completed[0].meta!);
expect(completedMeta.topicId).toBe('t1');
expect(completedMeta.role).toBe('echo');
expect(completedMeta.scope).toBe('echo');
expect(typeof completedMeta.durationMs).toBe('number');
// Verify no role log events in business store
// No role logs in business store
expect(store.queryByKind('echo.role-started').length).toBe(0);
expect(store.queryByKind('echo.role-completed').length).toBe(0);
});
it('writes role-failed event when role throws, continues other actions', async () => {
setup();
interface Ctx {
topicId: string;
done: boolean;
}
type Roles = {
bomb: (
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void>;
safe: (
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void>;
};
const failType: WorkflowType<
Ctx,
{
bomb: (ctx: Ctx, store: PulseStore) => Promise<void>;
safe: (ctx: Ctx, store: PulseStore) => Promise<void>;
}
> = {
const failType: WorkflowType<Roles> = {
name: 'failtest',
projection: `
(
$created := $[kind = 'failtest.created'];
$merge($created.(
$m := $parse(meta);
{ $m.topicId: { "topicId": $m.topicId, "done": false } }
))
)
`,
projection: (events) => {
const topics = new Map<string, TopicSummary>();
for (const e of events) {
if (e.kind === 'failtest.created' && e.key) {
topics.set(e.key, { lastRole: 'created' });
}
}
return topics;
},
roles: {
bomb: async () => {
throw new Error('kaboom');
},
safe: async (ctx, s) => {
safe: async (_chain, topicId, s) => {
s.appendEvent({
occurredAt: Date.now(),
kind: 'failtest.done',
key: ctx.topicId,
meta: JSON.stringify({ topicId: ctx.topicId }),
key: topicId,
});
},
},
moderator: (_roles, topics) => {
moderator: (topics) => {
const actions: WorkflowAction[] = [];
for (const [topicId] of Object.entries(topics)) {
for (const [topicId] of topics) {
actions.push({ topicId, role: 'bomb' });
actions.push({ topicId, role: 'safe' });
}
@@ -181,93 +189,66 @@ describe('createWorkflowRule', () => {
occurredAt: Date.now(),
kind: 'failtest.created',
key: 't1',
meta: JSON.stringify({ topicId: 't1' }),
});
const r1 = await rule.tick();
// bomb fails so not in executed; safe succeeds
expect(r1.executed).toEqual([{ topicId: 't1', role: 'safe' }]);
// Verify role-failed event in logStore
const failed = logStore.queryByKind('failtest.role-failed');
expect(failed.length).toBe(1);
const failedMeta = JSON.parse(failed[0].meta!);
expect(failedMeta.role).toBe('bomb');
expect(failedMeta.error).toBe('kaboom');
expect(failedMeta.scope).toBe('failtest');
// Verify safe role completed in logStore
const safeCompleted = logStore.queryByKind('failtest.role-completed');
expect(safeCompleted.length).toBe(1);
expect(JSON.parse(failed[0].meta!).error).toBe('kaboom');
});
it('skips logging when logStore is not provided', async () => {
setup();
interface EchoCtx {
topicId: string;
message: string;
echoed?: boolean;
}
type EchoRoles = {
echo: (
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void>;
};
const echoType: WorkflowType<
EchoCtx,
{ echo: (ctx: EchoCtx, store: PulseStore) => Promise<void> }
> = {
const echoType: WorkflowType<EchoRoles> = {
name: 'echo',
projection: `
(
$created := $[kind = 'echo.created'];
$echoed := $[kind = 'echo.echoed'];
$echoedIds := $distinct($echoed.($parse(meta).topicId));
$merge($created.(
$m := $parse(meta);
{ $m.topicId: {
"topicId": $m.topicId,
"message": $m.message,
"echoed": $m.topicId in $echoedIds
}}
))
)
`,
projection: (events) => {
const topics = new Map<string, TopicSummary>();
for (const e of events) {
if (!e.kind.startsWith('echo.') || !e.key) continue;
topics.set(e.key, { lastRole: e.kind.split('.')[1] });
}
return topics;
},
roles: {
echo: async (ctx, s) => {
echo: async (_chain, topicId, s) => {
s.appendEvent({
occurredAt: Date.now(),
kind: 'echo.echoed',
key: ctx.topicId,
meta: JSON.stringify({
topicId: ctx.topicId,
reply: `Echo: ${ctx.message}`,
}),
key: topicId,
});
},
},
moderator: (_roles, topics) => {
moderator: (topics) => {
const actions: WorkflowAction<'echo'>[] = [];
for (const [topicId, ctx] of Object.entries(topics)) {
if (!ctx.echoed) actions.push({ topicId, role: 'echo' });
for (const [topicId, s] of topics) {
if (s.lastRole === 'created') actions.push({ topicId, role: 'echo' });
}
return actions;
},
};
// No logStore — should work without logging
const rule = createWorkflowRule(echoType, store);
store.appendEvent({
occurredAt: Date.now(),
kind: 'echo.created',
key: 't1',
meta: JSON.stringify({ topicId: 't1', message: 'hello' }),
});
const r1 = await rule.tick();
expect(r1.executed).toEqual([{ topicId: 't1', role: 'echo' }]);
// No role log events in store
expect(store.queryByKind('echo.role-started').length).toBe(0);
expect(store.queryByKind('echo.role-completed').length).toBe(0);
});
it('Moore diff prevents re-execution of same action', async () => {
@@ -275,36 +256,34 @@ describe('createWorkflowRule', () => {
let execCount = 0;
interface Ctx {
topicId: string;
done: boolean;
}
type Roles = {
work: (
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void>;
};
// A topic type where moderator always returns action for non-done topics
// but the role does NOT write any event (so topic stays non-done)
const stickyType: WorkflowType<
Ctx,
{ work: (ctx: Ctx, store: PulseStore) => Promise<void> }
> = {
const stickyType: WorkflowType<Roles> = {
name: 'sticky',
projection: `
(
$created := $[kind = 'sticky.created'];
$merge($created.(
$m := $parse(meta);
{ $m.topicId: { "topicId": $m.topicId, "done": false } }
))
)
`,
projection: (events) => {
const topics = new Map<string, TopicSummary>();
for (const e of events) {
if (e.kind === 'sticky.created' && e.key) {
topics.set(e.key, { lastRole: 'created' });
}
}
return topics;
},
roles: {
work: async () => {
execCount++;
},
},
moderator: (_roles, topics) => {
moderator: (topics) => {
const actions: WorkflowAction<'work'>[] = [];
for (const [topicId, ctx] of Object.entries(topics)) {
if (!ctx.done) actions.push({ topicId, role: 'work' });
for (const [topicId] of topics) {
actions.push({ topicId, role: 'work' });
}
return actions;
},
@@ -316,68 +295,57 @@ describe('createWorkflowRule', () => {
occurredAt: Date.now(),
kind: 'sticky.created',
key: 't1',
meta: JSON.stringify({ topicId: 't1' }),
});
// First tick: executes
await rule.tick();
expect(execCount).toBe(1);
// Second tick: same action key, Moore diff skips
await rule.tick();
expect(execCount).toBe(1);
});
it('baseline updates before execution — prevents duplicate dispatch on concurrent ticks', async () => {
it('baseline updates before execution — prevents duplicate dispatch', async () => {
setup();
// Track execution order: when role starts and when baseline is checked
let roleStartCount = 0;
let roleResolve: (() => void) | null = null;
interface Ctx {
topicId: string;
processed: boolean;
}
type Roles = {
process: (
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void>;
};
const slowType: WorkflowType<
Ctx,
{ process: (ctx: Ctx, store: PulseStore) => Promise<void> }
> = {
const slowType: WorkflowType<Roles> = {
name: 'slow',
projection: `
(
$created := $[kind = 'slow.created'];
$processed := $[kind = 'slow.processed'];
$processedIds := $distinct($processed.($parse(meta).topicId));
$merge($created.(
$m := $parse(meta);
{ $m.topicId: {
"topicId": $m.topicId,
"processed": $m.topicId in $processedIds
}}
))
)
`,
projection: (events) => {
const topics = new Map<string, TopicSummary>();
for (const e of events) {
if (!e.kind.startsWith('slow.') || !e.key) continue;
topics.set(e.key, { lastRole: e.kind.split('.')[1] });
}
return topics;
},
roles: {
process: async (ctx, s) => {
process: async (_chain, topicId, s) => {
roleStartCount++;
// Simulate slow execution — role blocks until externally resolved
await new Promise<void>((resolve) => {
roleResolve = resolve;
});
s.appendEvent({
occurredAt: Date.now(),
kind: 'slow.processed',
key: ctx.topicId,
meta: JSON.stringify({ topicId: ctx.topicId }),
key: topicId,
});
},
},
moderator: (_roles, topics) => {
moderator: (topics) => {
const actions: WorkflowAction<'process'>[] = [];
for (const [topicId, ctx] of Object.entries(topics)) {
if (!ctx.processed) actions.push({ topicId, role: 'process' });
for (const [topicId, s] of topics) {
if (s.lastRole === 'created')
actions.push({ topicId, role: 'process' });
}
return actions;
},
@@ -389,27 +357,82 @@ describe('createWorkflowRule', () => {
occurredAt: Date.now(),
kind: 'slow.created',
key: 't1',
meta: JSON.stringify({ topicId: 't1' }),
});
// Start first tick — it will block inside the role
const tick1 = rule.tick();
// Wait a bit for tick1 to reach role execution
await new Promise((r) => setTimeout(r, 50));
expect(roleStartCount).toBe(1);
// Start second tick while tick1's role is still running
// Because baseline was updated BEFORE execution,
// tick2 sees snapshot === baseline → skips (no-op)
const tick2Promise = rule.tick();
const r2 = await tick2Promise;
expect(r2.executed).toEqual([]);
expect(roleStartCount).toBe(1); // ← still 1, not 2!
expect(roleStartCount).toBe(1);
// Resolve tick1's role
roleResolve!();
const r1 = await tick1;
expect(r1.executed).toEqual([{ topicId: 't1', role: 'process' }]);
});
it('chain is passed to role functions with CAS content', async () => {
setup();
let receivedChain: WorkflowMessage[] = [];
type Roles = {
responder: (
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void>;
};
const chainType: WorkflowType<Roles> = {
name: 'chain',
projection: (events) => {
const topics = new Map<string, TopicSummary>();
for (const e of events) {
if (!e.kind.startsWith('chain.') || !e.key) continue;
topics.set(e.key, { lastRole: e.kind.split('.')[1] });
}
return topics;
},
roles: {
responder: async (chain, topicId, s) => {
receivedChain = chain;
const hash = s.putObject({ content: 'responded' });
s.appendEvent({
occurredAt: Date.now(),
kind: 'chain.responded',
key: topicId,
hash,
});
},
},
moderator: (topics) => {
const actions: WorkflowAction<'responder'>[] = [];
for (const [topicId, s] of topics) {
if (s.lastRole === 'created')
actions.push({ topicId, role: 'responder' });
}
return actions;
},
};
const rule = createWorkflowRule(chainType, store);
const hash = store.putObject({ message: 'hello world', extra: 42 });
store.appendEvent({
occurredAt: Date.now(),
kind: 'chain.created',
key: 't1',
hash,
});
await rule.tick();
expect(receivedChain.length).toBe(1);
expect(receivedChain[0].role).toBe('created');
expect((receivedChain[0].content as any).message).toBe('hello world');
expect((receivedChain[0].content as any).extra).toBe(42);
});
});
@@ -1,33 +1,33 @@
/**
* WorkflowType → Pulse Rule adapter.
* WorkflowType → Pulse Rule adapter (message chain model).
*
* createWorkflowRule(workflowType, store) wraps a WorkflowType as a standalone
* tick function. Each tick has two phases:
*
* ── Critical section (must be serialized, never concurrent) ──
* 1. Read events → JSONata projection → snapshot
* 2. Moore diff: compare snapshot to baseline; skip if unchanged
* 3. Moderator decides actions from new snapshot
* 1. Read events → projection → per-topic summaries
* 2. Moore diff: compare summaries to baseline; skip if unchanged
* 3. Moderator decides actions from new summaries
* 4. Update baseline BEFORE execution (so concurrent ticks see new baseline)
*
* ── Execution (can be async, concurrent control is executor's job) ──
* 5. Execute role functions
* 5. Build message chain from CAS, execute role functions
*
* ⚠️ IMPORTANT: Baseline is updated in step 4, BEFORE role execution.
* This prevents duplicate dispatches when tick() is called again while
* a slow role (e.g. Cursor agent ~60s) is still running. The next tick
* sees the updated baseline, compares snapshot (unchanged because the
* role hasn't written its event yet), and correctly skips.
*
* If you move baseline update to AFTER execution, you reintroduce the
* duplicate dispatch bug. See test: "baseline updates before execution".
* a slow role (e.g. Cursor agent ~60s) is still running.
*
* 小橘 🍊 (NEKO Team)
*/
import jsonata from 'jsonata';
import type { PulseStore } from '../store.js';
import type { WorkflowAction, WorkflowType } from './workflow-type.js';
import type {
TopicSummary,
WorkflowAction,
WorkflowMessage,
WorkflowType,
} from './workflow-type.js';
/**
* Tick result from a topic rule.
@@ -35,8 +35,8 @@ import type { WorkflowAction, WorkflowType } from './workflow-type.js';
export interface WorkflowTickResult {
/** Actions that were executed this tick */
executed: WorkflowAction[];
/** Current projected topics after this tick */
topics: Record<string, unknown>;
/** Current per-topic summaries */
topics: Map<string, TopicSummary>;
}
/**
@@ -48,120 +48,153 @@ export interface WorkflowRule {
/**
* Create a topic rule from a WorkflowType and a PulseStore.
*
* The returned object has a tick() method. See module-level doc for
* the critical section / execution phase split and why baseline must
* update before execution.
*/
export function createWorkflowRule<TContext>(
workflowType: WorkflowType<TContext, any>,
export function createWorkflowRule(
workflowType: WorkflowType<any>,
store: PulseStore,
logStore?: PulseStore,
): WorkflowRule {
// Moore machine state: snapshot baseline from last tick
let prevSnapshotJson = '';
// Compile JSONata expression once; register $parse helper for JSON strings
const expr = jsonata(workflowType.projection);
expr.registerFunction('parse', (s: string) => {
try {
return JSON.parse(s);
} catch {
return {};
}
});
// For JSONata legacy mode
let compiledExpr: ReturnType<typeof jsonata> | null = null;
if (typeof workflowType.projection === 'string') {
compiledExpr = jsonata(workflowType.projection);
compiledExpr.registerFunction('parse', (s: string) => {
try {
return JSON.parse(s);
} catch {
return {};
}
});
}
return {
async tick(): Promise<WorkflowTickResult> {
// 1. Get all events from store
const events = store.getAfter(0);
// Preserve meta as raw JSON string — $parse() in JSONata handles it
const parsedEvents = events.map((e) => ({
id: e.id,
occurredAt: e.occurredAt,
kind: e.kind,
key: e.key ?? null,
hash: e.hash ?? null,
meta: e.meta ?? '{}',
}));
// 2. Build per-topic summaries
let topics: Map<string, TopicSummary>;
// 2. Evaluate JSONata projection
const topics: Record<string, TContext> =
(await expr.evaluate(parsedEvents)) ?? {};
if (typeof workflowType.projection === 'function') {
// Function mode: direct call
topics = workflowType.projection(
events.map((e) => ({
kind: e.kind,
key: e.key ?? null,
meta: e.meta ?? null,
hash: e.hash ?? null,
occurredAt: e.occurredAt,
})),
);
} else {
// JSONata legacy mode: evaluate then wrap into Map<string, TopicSummary>
const parsedEvents = events.map((e) => ({
id: e.id,
occurredAt: e.occurredAt,
kind: e.kind,
key: e.key ?? null,
hash: e.hash ?? null,
meta: e.meta ?? '{}',
}));
const raw: Record<string, any> =
(await compiledExpr!.evaluate(parsedEvents)) ?? {};
topics = new Map<string, TopicSummary>();
for (const [k, v] of Object.entries(raw)) {
topics.set(k, v as TopicSummary);
}
}
// 3. Moore diff: compare projected snapshot to baseline
// If snapshot hasn't changed since last tick, skip entirely.
// This is the core Moore machine property: output only on state change.
const currSnapshotJson = JSON.stringify(topics);
// 3. Moore diff
const summaryObj: Record<string, TopicSummary> = {};
for (const [k, v] of topics) summaryObj[k] = v;
const currSnapshotJson = JSON.stringify(summaryObj);
if (currSnapshotJson === prevSnapshotJson) {
return { executed: [], topics };
}
// 4. Get actions from moderator (state changed, so re-evaluate)
const actions = workflowType.moderator(workflowType.roles, topics);
// 4. Get actions from moderator
const actions = workflowType.moderator(topics);
// 5. Update baseline BEFORE execution.
// ⚠️ DO NOT move this after role execution — see module-level doc.
// This is the boundary between critical section and execution phase.
// 5. Update baseline BEFORE execution
prevSnapshotJson = currSnapshotJson;
// 6. Execute actions (outside critical section — may be slow)
// 6. Execute actions — build chain from CAS for each topic
const executed: WorkflowAction[] = [];
for (const action of actions) {
const roleFn = workflowType.roles[action.role];
if (roleFn) {
const ctx = topics[action.topicId];
if (ctx !== undefined) {
// Write role-started event to logStore if available
if (logStore) {
logStore.appendEvent({
occurredAt: Date.now(),
kind: `${workflowType.name}.role-started`,
key: action.topicId,
meta: JSON.stringify({
topicId: action.topicId,
role: action.role,
scope: workflowType.name,
}),
});
}
if (!roleFn) continue;
if (!topics.has(action.topicId)) continue;
const start = Date.now();
try {
await roleFn(ctx, store);
// Write role-completed event to logStore if available
if (logStore) {
logStore.appendEvent({
occurredAt: Date.now(),
kind: `${workflowType.name}.role-completed`,
key: action.topicId,
meta: JSON.stringify({
topicId: action.topicId,
role: action.role,
scope: workflowType.name,
durationMs: Date.now() - start,
}),
});
}
executed.push(action);
} catch (err) {
// Write role-failed event to logStore if available (don't rethrow — let other actions continue)
if (logStore) {
logStore.appendEvent({
occurredAt: Date.now(),
kind: `${workflowType.name}.role-failed`,
key: action.topicId,
meta: JSON.stringify({
topicId: action.topicId,
role: action.role,
scope: workflowType.name,
durationMs: Date.now() - start,
error: err instanceof Error ? err.message : String(err),
}),
});
}
}
// Build message chain for this topic
const chain: WorkflowMessage[] = events
.filter(
(e) =>
e.key === action.topicId &&
e.kind.startsWith(`${workflowType.name}.`),
)
.map((e) => ({
role: e.kind.split('.')[1],
content: e.hash ? store.getObject(e.hash) : null,
meta: e.meta
? (() => {
try {
return JSON.parse(e.meta!);
} catch {
return undefined;
}
})()
: undefined,
timestamp: e.occurredAt,
}));
// Write role-started event to logStore if available
if (logStore) {
logStore.appendEvent({
occurredAt: Date.now(),
kind: `${workflowType.name}.role-started`,
key: action.topicId,
meta: JSON.stringify({
topicId: action.topicId,
role: action.role,
scope: workflowType.name,
}),
});
}
const start = Date.now();
try {
await roleFn(chain, action.topicId, store);
if (logStore) {
logStore.appendEvent({
occurredAt: Date.now(),
kind: `${workflowType.name}.role-completed`,
key: action.topicId,
meta: JSON.stringify({
topicId: action.topicId,
role: action.role,
scope: workflowType.name,
durationMs: Date.now() - start,
}),
});
}
executed.push(action);
} catch (err) {
if (logStore) {
logStore.appendEvent({
occurredAt: Date.now(),
kind: `${workflowType.name}.role-failed`,
key: action.topicId,
meta: JSON.stringify({
topicId: action.topicId,
role: action.role,
scope: workflowType.name,
durationMs: Date.now() - start,
error: err instanceof Error ? err.message : String(err),
}),
});
}
}
}
@@ -6,29 +6,40 @@
import { describe, expect, it } from 'bun:test';
import type { PulseStore } from '../store.js';
import type { WorkflowAction, WorkflowType } from './workflow-type.js';
import type {
TopicSummary,
WorkflowAction,
WorkflowMessage,
WorkflowType,
} from './workflow-type.js';
describe('WorkflowType interface', () => {
it('allows defining a minimal WorkflowType', () => {
interface EchoCtx {
topicId: string;
message: string;
echoed?: boolean;
}
type EchoRoles = {
echo: (
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void>;
};
const echoType: WorkflowType<
EchoCtx,
{ echo: (ctx: EchoCtx, store: PulseStore) => Promise<void> }
> = {
const echoType: WorkflowType<EchoRoles> = {
name: 'echo',
projection: '{}',
roles: {
echo: async (_ctx, _store) => {},
projection: (events) => {
const topics = new Map<string, TopicSummary>();
for (const e of events) {
if (!e.kind.startsWith('echo.') || !e.key) continue;
topics.set(e.key, { lastRole: e.kind.split('.')[1] });
}
return topics;
},
moderator: (_roles, topics) => {
roles: {
echo: async (_chain, _topicId, _store) => {},
},
moderator: (topics) => {
const actions: WorkflowAction<'echo'>[] = [];
for (const [topicId, ctx] of Object.entries(topics)) {
if (!ctx.echoed) actions.push({ topicId, role: 'echo' });
for (const [topicId, s] of topics) {
if (s.lastRole === 'created') actions.push({ topicId, role: 'echo' });
}
return actions;
},
@@ -36,33 +47,40 @@ describe('WorkflowType interface', () => {
expect(echoType.name).toBe('echo');
expect(typeof echoType.roles.echo).toBe('function');
expect(echoType.moderator(echoType.roles, {})).toEqual([]);
expect(echoType.moderator(new Map())).toEqual([]);
});
it('moderator returns actions for incomplete topics', () => {
interface Ctx {
done: boolean;
}
const tt: WorkflowType<Ctx> = {
type Roles = {
doIt: (
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void>;
};
const tt: WorkflowType<Roles> = {
name: 'test',
projection: '{}',
projection: () => new Map(),
roles: {
doIt: async () => {},
},
moderator: (_roles, topics) => {
moderator: (topics) => {
const actions: WorkflowAction[] = [];
for (const [topicId, ctx] of Object.entries(topics)) {
if (!ctx.done) actions.push({ topicId, role: 'doIt' });
for (const [topicId, s] of topics) {
if (s.lastRole !== 'done') actions.push({ topicId, role: 'doIt' });
}
return actions;
},
};
const result = tt.moderator(tt.roles, {
a: { done: false },
b: { done: true },
c: { done: false },
});
const input = new Map<string, TopicSummary>([
['a', { lastRole: 'created' }],
['b', { lastRole: 'done' }],
['c', { lastRole: 'created' }],
]);
const result = tt.moderator(input);
expect(result).toEqual([
{ topicId: 'a', role: 'doIt' },
{ topicId: 'c', role: 'doIt' },
+50 -17
View File
@@ -1,5 +1,5 @@
/**
* WorkflowType — Council v2 core interface.
* WorkflowType — Council v2 core interface (message chain model).
*
* Each WorkflowType is a single .ts file that exports events schema,
* projection, roles, and moderator. One Rule manages all instances
@@ -16,39 +16,72 @@ export interface WorkflowAction<R extends string = string> {
role: R;
}
/** A message in a workflow chain — one per role execution */
export interface WorkflowMessage {
role: string;
content: unknown; // from CAS via store.getObject(hash)
meta?: Record<string, unknown>;
timestamp: number;
}
/** Per-topic summary used by moderator */
export interface TopicSummary {
lastRole: string;
meta?: Record<string, unknown>;
}
/**
* WorkflowType definition — one .ts file exports one object conforming to this interface.
* WorkflowType definition — message chain model.
*
* @typeParam TContext - The projected context type for each topic instance
* @typeParam TRoles - Record of role name → async role function
* Projection is now a function (or JSONata string for backward compat).
* Roles receive the full message chain instead of a projected context.
* Moderator receives per-topic summaries (lastRole + meta).
*/
export interface WorkflowType<
TContext = unknown,
TRoles extends Record<
string,
(ctx: TContext, store: PulseStore) => Promise<void>
> = Record<string, (ctx: TContext, store: PulseStore) => Promise<void>>,
(
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void>
> = Record<
string,
(
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void>
>,
> {
/** Topic type name, used as event prefix */
name: string;
/**
* JSONata expression: projects events array → { [topicId]: TContext }
*
* Input: array of event objects with { kind, key, meta }
* Output: object keyed by topicId with TContext values
* Projection: either a JSONata string (legacy) or a function.
* Function mode: receives raw events, returns Map<topicId, TopicSummary>.
* JSONata mode: projects events → { [topicId]: any } (adapter wraps to TopicSummary).
*/
projection: string;
projection:
| string
| ((
events: Array<{
kind: string;
key?: string | null;
meta?: string | null;
hash?: string | null;
occurredAt: number;
}>,
) => Map<string, TopicSummary>);
/** Topic-local roles */
/** Topic-local roles — each receives the full message chain */
roles: TRoles;
/**
* Moderator: inspects all active topic instances, returns actions to execute.
* Empty array = nothing to do (all topics complete or idle).
* Moderator: inspects per-topic summaries, returns actions to execute.
* Empty array = nothing to do.
*/
moderator: (
roles: TRoles,
topics: Record<string, TContext>,
topics: Map<string, TopicSummary>,
) => WorkflowAction<Extract<keyof TRoles, string>>[];
}
+62 -81
View File
@@ -1,5 +1,5 @@
/**
* topic.test.ts — Tests for upulse topic create/list logic.
* workflow.test.ts — Tests for upulse workflow create/list logic (message chain model).
*/
import { afterEach, beforeEach, describe, expect, it } from 'bun:test';
@@ -12,7 +12,7 @@ import {
type ScopedStore,
} from '@uncaged/pulse';
describe('topic', () => {
describe('workflow', () => {
let tmpDir: string;
let scopedStore: ScopedStore;
let store: PulseStore;
@@ -31,115 +31,104 @@ describe('topic', () => {
rmSync(tmpDir, { recursive: true, force: true });
});
it('create topic writes coding.created event', () => {
it('create topic writes coding.user event with CAS', () => {
const topicId = 'fix-login-bug-a3k';
const hash = store.putObject({
content: 'The login page crashes on submit',
artifacts: { title: 'Fix login bug', repoDir: '/tmp/repo' },
});
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.created',
kind: 'coding.user',
key: topicId,
meta: JSON.stringify({
topicId,
title: 'Fix login bug',
description: 'The login page crashes on submit',
repoDir: '/tmp/repo',
}),
hash,
});
const events = store.queryByKind('coding.created');
const events = store.queryByKind('coding.user');
expect(events.length).toBe(1);
expect(events[0].key).toBe(topicId);
expect(events[0].hash).toBeTruthy();
const meta = JSON.parse(events[0].meta!);
expect(meta.title).toBe('Fix login bug');
expect(meta.description).toBe('The login page crashes on submit');
expect(meta.repoDir).toBe('/tmp/repo');
const content = store.getObject(events[0].hash!) as any;
expect(content.artifacts.title).toBe('Fix login bug');
expect(content.content).toBe('The login page crashes on submit');
expect(content.artifacts.repoDir).toBe('/tmp/repo');
});
it('list topics shows created topic', () => {
const topicId = 'add-feature-x-b2c';
const hash = store.putObject({
content: 'Implement feature X',
artifacts: { title: 'Add feature X', repoDir: '/tmp/repo' },
});
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.created',
kind: 'coding.user',
key: topicId,
meta: JSON.stringify({
topicId,
title: 'Add feature X',
description: 'Implement feature X',
repoDir: '/tmp/repo',
}),
hash,
});
const allEvents = store
.getAfter(0)
.filter((e) => e.kind.startsWith('coding.'));
expect(allEvents.length).toBe(1);
const topics = new Map<
string,
{ title: string; state: string; lastUpdated: number }
{ title: string; role: string; lastUpdated: number }
>();
for (const event of allEvents) {
const tid = event.key;
if (!tid) continue;
const state = event.kind.replace('coding.', '');
let meta: { title?: string } = {};
if (event.meta) {
const role = event.kind.replace('coding.', '');
let title = tid;
if (role === 'user' && event.hash) {
try {
meta = JSON.parse(event.meta);
} catch {
/* ignore */
}
}
const existing = topics.get(tid);
if (!existing) {
topics.set(tid, {
title: meta.title ?? tid,
state,
lastUpdated: event.occurredAt,
});
} else {
existing.state = state;
if (event.occurredAt > existing.lastUpdated)
existing.lastUpdated = event.occurredAt;
const obj = store.getObject(event.hash) as any;
if (obj?.artifacts?.title) title = obj.artifacts.title;
} catch {}
}
topics.set(tid, { title, role, lastUpdated: event.occurredAt });
}
expect(topics.size).toBe(1);
const t = topics.get(topicId)!;
expect(t.title).toBe('Add feature X');
expect(t.state).toBe('created');
expect(t.role).toBe('user');
});
it('list without --all hides closed topics', () => {
const now = Date.now();
// Topic 1: active (created)
// Topic 1: active (user)
const h1 = store.putObject({
content: 'Active',
artifacts: { title: 'Active Topic' },
});
store.appendEvent({
occurredAt: now,
kind: 'coding.created',
kind: 'coding.user',
key: 'active-topic-1a2',
meta: JSON.stringify({
topicId: 'active-topic-1a2',
title: 'Active Topic',
}),
hash: h1,
});
// Topic 2: closed
store.appendEvent({
occurredAt: now,
kind: 'coding.created',
key: 'closed-topic-3b4',
meta: JSON.stringify({
topicId: 'closed-topic-3b4',
title: 'Closed Topic',
}),
const h2 = store.putObject({
content: 'Closed',
artifacts: { title: 'Closed Topic' },
});
store.appendEvent({
occurredAt: now + 1,
kind: 'coding.closed',
occurredAt: now,
kind: 'coding.user',
key: 'closed-topic-3b4',
meta: JSON.stringify({ topicId: 'closed-topic-3b4', summary: 'Done' }),
hash: h2,
});
const h3 = store.putObject({ content: 'Done' });
store.appendEvent({
occurredAt: now + 1,
kind: 'coding.closer',
key: 'closed-topic-3b4',
hash: h3,
});
const allEvents = store
@@ -148,43 +137,35 @@ describe('topic', () => {
const topics = new Map<
string,
{ title: string; state: string; lastUpdated: number }
{ title: string; role: string; lastUpdated: number }
>();
for (const event of allEvents) {
const tid = event.key;
if (!tid) continue;
const state = event.kind.replace('coding.', '');
let meta: { title?: string } = {};
if (event.meta) {
const role = event.kind.replace('coding.', '');
let title = tid;
if (role === 'user' && event.hash) {
try {
meta = JSON.parse(event.meta);
} catch {
/* ignore */
}
const obj = store.getObject(event.hash) as any;
if (obj?.artifacts?.title) title = obj.artifacts.title;
} catch {}
}
const existing = topics.get(tid);
if (!existing) {
topics.set(tid, {
title: meta.title ?? tid,
state,
lastUpdated: event.occurredAt,
});
topics.set(tid, { title, role, lastUpdated: event.occurredAt });
} else {
existing.state = state;
existing.role = role;
if (event.occurredAt > existing.lastUpdated)
existing.lastUpdated = event.occurredAt;
}
}
// Without --all: only non-closed
const active = [...topics.entries()].filter(
([, t]) => t.state !== 'closed',
);
// Without --all: only non-closer
const active = [...topics.entries()].filter(([, t]) => t.role !== 'closer');
expect(active.length).toBe(1);
expect(active[0][0]).toBe('active-topic-1a2');
// With --all: all topics
const all = [...topics.entries()];
expect(all.length).toBe(2);
expect(topics.size).toBe(2);
});
});
+30 -41
View File
@@ -1,8 +1,8 @@
/**
* commands/topic.ts — upulse topic create/list
* commands/workflow.ts — upulse workflow create/list
*
* Manage coding workflows via the routine scope store.
* Events follow the coding.* convention from @uncaged/pulse CodingTask.
* Events follow the coding.{role} convention (message chain model).
*/
import { randomBytes } from 'node:crypto';
@@ -23,18 +23,10 @@ function shortId(): string {
return randomBytes(2).toString('hex').slice(0, 3);
}
const _CODING_KINDS = [
'coding.created',
'coding.analyzed',
'coding.coded',
'coding.reviewed',
'coding.closed',
] as const;
type CodingRole = 'user' | 'architect' | 'coder' | 'reviewer' | 'closer';
type CodingState = 'created' | 'analyzed' | 'coded' | 'reviewed' | 'closed';
function kindToState(kind: string): CodingState {
return kind.replace('coding.', '') as CodingState;
function kindToRole(kind: string): CodingRole {
return kind.replace('coding.', '') as CodingRole;
}
export function registerWorkflowCommand(program: Command): void {
@@ -58,16 +50,17 @@ export function registerWorkflowCommand(program: Command): void {
const topicId = `${slugify(title)}-${shortId()}`;
// Store full content in CAS
const hash = store.putObject({
content: opts.desc,
artifacts: { title, repoDir: opts.repo },
});
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.created',
kind: 'coding.user',
key: topicId,
meta: JSON.stringify({
topicId,
title,
description: opts.desc,
repoDir: opts.repo,
}),
hash,
});
console.log(topicId);
@@ -103,48 +96,44 @@ export function registerWorkflowCommand(program: Command): void {
return;
}
// Group by topicId, track latest state and metadata
// Group by key (topicId), track latest role
const topics = new Map<
string,
{ title: string; state: CodingState; lastUpdated: number }
{ title: string; role: CodingRole; lastUpdated: number }
>();
for (const event of allEvents) {
const topicId = event.key;
if (!topicId) continue;
const state = kindToState(event.kind);
let meta: { title?: string } = {};
if (event.meta) {
const role = kindToRole(event.kind);
// Try to get title from CAS for user events
let title = topicId;
if (role === 'user' && event.hash) {
try {
meta = JSON.parse(event.meta);
} catch {
// ignore
}
const obj = store.getObject(event.hash) as any;
if (obj?.artifacts?.title) title = obj.artifacts.title;
} catch {}
}
const existing = topics.get(topicId);
if (!existing) {
topics.set(topicId, {
title: meta.title ?? topicId,
state,
lastUpdated: event.occurredAt,
});
topics.set(topicId, { title, role, lastUpdated: event.occurredAt });
} else {
// Events from getAfter are ordered by id ASC, so later events overwrite
existing.state = state;
existing.role = role;
if (event.occurredAt > existing.lastUpdated) {
existing.lastUpdated = event.occurredAt;
}
if (meta.title) {
existing.title = meta.title;
if (role === 'user' && title !== topicId) {
existing.title = title;
}
}
}
// Filter closed unless --all
const entries = [...topics.entries()].filter(
([, t]) => opts.all || t.state !== 'closed',
([, t]) => opts.all || t.role !== 'closer',
);
if (entries.length === 0) {
@@ -154,14 +143,14 @@ export function registerWorkflowCommand(program: Command): void {
}
console.log(
'topicId | title | state | last updated',
'topicId | title | role | last updated',
);
console.log('-'.repeat(120));
for (const [topicId, t] of entries) {
const ts = new Date(t.lastUpdated).toISOString();
console.log(
`${topicId.padEnd(49)}| ${t.title.slice(0, 30).padEnd(31)}| ${t.state.padEnd(9)}| ${ts}`,
`${topicId.padEnd(49)}| ${t.title.slice(0, 30).padEnd(31)}| ${t.role.padEnd(9)}| ${ts}`,
);
}