diff --git a/packages/pulse/src/e2e/council-demo.ts b/packages/pulse/src/e2e/council-demo.ts index 0ff2703..f0a3f38 100644 --- a/packages/pulse/src/e2e/council-demo.ts +++ b/packages/pulse/src/e2e/council-demo.ts @@ -1,9 +1,6 @@ #!/usr/bin/env bun /** - * Pulse Council Demo — v2 WorkflowType Pipeline - * - * Executable script (not a test) that simulates the complete - * Council v2 dispatch chain driven by WorkflowType rules. + * Pulse Council Demo — v2 WorkflowType Pipeline (message chain model) * * Run: cd ~/repos/pulse && bun packages/pulse/src/e2e/council-demo.ts * @@ -19,26 +16,18 @@ import { createCodingWorkflow } from '../workflows/coding-workflow.js'; import { createArchitectRole } from '../workflows/roles/architect-llm.js'; import { createWorkflowRule } from '../workflows/workflow-rule-adapter.js'; -// ── Helpers ──────────────────────────────────────────────────── - const SEP = '─'.repeat(50); - function log(emoji: string, msg: string) { console.log(`${emoji} ${msg}`); } - function logItem(msg: string) { console.log(` ✅ ${msg}`); } -// ── CLI Args ─────────────────────────────────────────────────── - const LIVE = process.argv.includes('--live'); const DB_PATH = process.argv.find((a) => a.startsWith('--db='))?.slice(5); const KEEP = process.argv.includes('--keep'); -// ── Main ─────────────────────────────────────────────────────── - async function main() { const useCustomDb = !!DB_PATH; const tmpDir = useCustomDb @@ -48,7 +37,7 @@ async function main() { console.log(); console.log(SEP); - log('🚀', 'Pulse Council v2 Demo'); + log('🚀', 'Pulse Council v2 Demo (Message Chain)'); console.log(SEP); console.log(); log('📋', `Mode: ${LIVE ? 'LIVE (LLM)' : 'Mock'}`); @@ -61,7 +50,6 @@ async function main() { }); try { - // Build topic type (mock or live) let codingTask: ReturnType; if (LIVE) { const baseUrl = @@ -84,7 +72,7 @@ async function main() { } const rule = createWorkflowRule(codingTask, store); - // Step 1: Create coding tasks + // Step 1: Create coding tasks via coding.user events log('📝', 'Step 1: Creating coding tasks'); const tasks = [ { @@ -102,18 +90,22 @@ async function main() { ]; for (const t of tasks) { + const hash = store.putObject({ + content: t.description, + artifacts: { title: t.title, repoDir: t.repoDir }, + }); store.appendEvent({ occurredAt: Date.now(), - kind: 'coding.created', + kind: 'coding.user', key: t.topicId, - meta: JSON.stringify(t), + hash, }); logItem(`Created: ${t.title} (${t.topicId})`); } console.log(); // Step 2: Run tick loop - log('⚡', 'Step 2: Running topic rule ticks'); + log('⚡', 'Step 2: Running workflow ticks'); console.log(); for (let tick = 1; tick <= 20; tick++) { @@ -137,20 +129,21 @@ async function main() { .getAfter(0) .filter((e) => e.kind.startsWith('coding.')); for (const e of allEvents) { - const meta = e.meta ? JSON.parse(e.meta) : {}; const ts = new Date(e.occurredAt).toISOString().slice(11, 23); + const _role = e.kind.split('.')[1]; + const hasContent = e.hash ? '📦' : ' '; + const metaStr = e.meta ? ` meta=${e.meta}` : ''; console.log( - ` [${ts}] ${e.kind.padEnd(18)} topic=${meta.topicId ?? '?'}`, + ` [${ts}] ${e.kind.padEnd(20)} key=${e.key ?? '?'} ${hasContent}${metaStr}`, ); } console.log(); log('📈', `Total events: ${allEvents.length}`); const closedCount = allEvents.filter( - (e) => e.kind === 'coding.closed', + (e) => e.kind === 'coding.closer', ).length; log('🏁', `Closed topics: ${closedCount}/${tasks.length}`); - console.log(); console.log(SEP); log('✅', 'Council v2 demo completed!'); diff --git a/packages/pulse/src/e2e/council-v2-live.ts b/packages/pulse/src/e2e/council-v2-live.ts index 81bdfc3..89c0031 100644 --- a/packages/pulse/src/e2e/council-v2-live.ts +++ b/packages/pulse/src/e2e/council-v2-live.ts @@ -1,6 +1,7 @@ #!/usr/bin/env bun /** * Council v2 — Full LIVE: architect(LLM) + coder(Cursor) + reviewer(Cursor) + * Message chain + CAS model. * * Usage: * PULSE_LLM_BASE_URL=... PULSE_LLM_API_KEY=... \ @@ -11,11 +12,9 @@ import { tmpdir } from 'node:os'; import { dirname, join } from 'node:path'; import { createStore, type PulseStore } from '../index.js'; import { createOpenAiLlmClient } from '../llm-client.js'; -import { - type CodingWorkflowContext, - createCodingWorkflow, -} from '../workflows/coding-workflow.js'; +import { createCodingWorkflow } from '../workflows/coding-workflow.js'; import { createWorkflowRule } from '../workflows/workflow-rule-adapter.js'; +import type { WorkflowMessage } from '../workflows/workflow-type.js'; const DB_PATH = process.argv.find((a) => a.startsWith('--db='))?.slice(5); const tmpDir = DB_PATH @@ -93,13 +92,20 @@ async function runCursorAgent( }; } -// ── Role implementations ─────────────────────────────────────── +// ── Role implementations (message chain model) ───────────────── async function architectFn( - ctx: CodingWorkflowContext, + chain: WorkflowMessage[], + topicId: string, store: PulseStore, ): Promise { - console.log(` 🏗️ architect analyzing "${ctx.title}"...`); + const userMsg = chain.find((m) => m.role === 'user'); + const userContent = (userMsg?.content as any) ?? {}; + const title = userContent.artifacts?.title ?? topicId; + const description = userContent.content ?? ''; + const repoDir = userContent.artifacts?.repoDir ?? '/tmp'; + + console.log(` 🏗️ architect analyzing "${title}"...`); const resp = await llm.chat({ messages: [ { @@ -109,7 +115,7 @@ async function architectFn( }, { role: 'user', - content: `Task: ${ctx.title}\nDescription: ${ctx.description}\nRepo: ${ctx.repoDir}`, + content: `Task: ${title}\nDescription: ${description}\nRepo: ${repoDir}`, }, ], }); @@ -120,94 +126,113 @@ async function architectFn( parsed = { analysis: resp.content ?? 'No analysis', targetFiles: [] }; } console.log(` 🏗️ analysis: ${(parsed.analysis ?? '').slice(0, 120)}...`); + + const hash = store.putObject({ + content: parsed.analysis ?? 'No analysis', + artifacts: { targetFiles: parsed.targetFiles ?? [] }, + }); store.appendEvent({ occurredAt: Date.now(), - kind: 'coding.analyzed', - key: ctx.topicId, - meta: JSON.stringify({ - topicId: ctx.topicId, - analysis: parsed.analysis ?? 'No analysis', - targetFiles: parsed.targetFiles ?? [], - }), + kind: 'coding.architect', + key: topicId, + hash, }); } async function coderFn( - ctx: CodingWorkflowContext, + chain: WorkflowMessage[], + topicId: string, store: PulseStore, ): Promise { - console.log(` 💻 coder working on "${ctx.title}"...`); - const prompt = `## Task: ${ctx.title} + const userMsg = chain.find((m) => m.role === 'user'); + const userContent = (userMsg?.content as any) ?? {}; + const title = userContent.artifacts?.title ?? topicId; + const description = userContent.content ?? ''; + const repoDir = userContent.artifacts?.repoDir ?? '/tmp'; -${ctx.description} + const architectMsg = chain.find((m) => m.role === 'architect'); + const architectContent = (architectMsg?.content as any) ?? {}; + + console.log(` 💻 coder working on "${title}"...`); + const prompt = `## Task: ${title} + +${description} ## Architect Analysis -${ctx.analysis?.analysis ?? 'None'} +${architectContent.content ?? 'None'} ## Target Files -${(ctx.analysis?.targetFiles ?? []).join(', ') || 'Not specified'} +${(architectContent.artifacts?.targetFiles ?? []).join(', ') || 'Not specified'} ## Instructions Implement the changes. Do NOT modify any existing test files. Only create or modify source files as needed. If the task asks to create a new file, create it. If it asks to modify existing files, modify them. Run tests if applicable. Commit your changes.`; - const result = await runCursorAgent(prompt, ctx.repoDir); + const result = await runCursorAgent(prompt, repoDir); console.log( ` 💻 coder ${result.success ? '✅' : '❌'} (${(result.durationMs / 1000).toFixed(1)}s)`, ); + const hash = store.putObject({ + content: result.output, + artifacts: { + filesChanged: architectContent.artifacts?.targetFiles ?? [], + testsPassed: result.success, + }, + }); store.appendEvent({ occurredAt: Date.now(), - kind: 'coding.coded', - key: ctx.topicId, - meta: JSON.stringify({ - topicId: ctx.topicId, - filesChanged: ctx.analysis?.targetFiles ?? [], - testsPassed: result.success, - summary: result.output.slice(0, 500), - }), + kind: 'coding.coder', + key: topicId, + hash, }); } async function reviewerFn( - ctx: CodingWorkflowContext, + chain: WorkflowMessage[], + topicId: string, store: PulseStore, ): Promise { - console.log(` 🔍 reviewer reviewing "${ctx.title}"...`); - const prompt = `## Code Review: ${ctx.title} + const userMsg = chain.find((m) => m.role === 'user'); + const userContent = (userMsg?.content as any) ?? {}; + const title = userContent.artifacts?.title ?? topicId; + const repoDir = userContent.artifacts?.repoDir ?? '/tmp'; + + const coderMsg = [...chain].reverse().find((m) => m.role === 'coder'); + const coderContent = (coderMsg?.content as any) ?? {}; + + console.log(` 🔍 reviewer reviewing "${title}"...`); + const prompt = `## Code Review: ${title} ## What was done -${ctx.codeResult?.summary ?? 'Unknown'} +${coderContent.content ?? 'Unknown'} ## Files changed -${(ctx.codeResult?.filesChanged ?? []).join(', ')} +${(coderContent.artifacts?.filesChanged ?? []).join(', ')} ## Instructions Review the recent changes for correctness, security, and code quality. Do NOT modify any files. Only output your review. End with a clear verdict: APPROVED or REJECTED with reasons.`; - const result = await runCursorAgent(prompt, ctx.repoDir); + const result = await runCursorAgent(prompt, repoDir); console.log( ` 🔍 reviewer ${result.success ? '✅' : '❌'} (${(result.durationMs / 1000).toFixed(1)}s)`, ); - // Parse verdict from output const output = result.output.toLowerCase(); const verdict: 'approved' | 'rejected' = output.includes('rejected') ? 'rejected' : 'approved'; + const hash = store.putObject({ content: result.output }); store.appendEvent({ occurredAt: Date.now(), - kind: 'coding.reviewed', - key: ctx.topicId, - meta: JSON.stringify({ - topicId: ctx.topicId, - verdict, - comments: result.output.slice(0, 500), - }), + kind: 'coding.reviewer', + key: topicId, + hash, + meta: JSON.stringify({ verdict }), }); } @@ -216,7 +241,7 @@ End with a clear verdict: APPROVED or REJECTED with reasons.`; async function main() { const store = createStore({ eventsDbPath: dbPath, objectsDir: objDir }); - console.log(`\n🍊 === Council v2 — Full LIVE Pipeline ===\n`); + console.log(`\n🍊 === Council v2 — Full LIVE Pipeline (Message Chain) ===\n`); console.log(` LLM: ${model} @ ${baseUrl}`); console.log(` Cursor: ${AGENT_BIN}`); console.log(` DB: ${dbPath}\n`); @@ -228,18 +253,20 @@ async function main() { }); const rule = createWorkflowRule(codingTask, store); - // Create a real task on pulse repo itself + // Create a real task via coding.user event with CAS + const hash = store.putObject({ + content: + 'Create packages/pulse/COUNCIL-V2.md — a concise (30-50 lines) overview of the Council v2 model. Cover: WorkflowType (events, projection, roles, moderator), Topic as Rule pattern, one-rule-per-type managing all instances, Moore machine diff-driven ticks. Reference source files in topics/ directory. Do NOT modify any existing files.', + artifacts: { + title: 'Add COUNCIL-V2.md overview', + repoDir: '/home/azureuser/repos/pulse', + }, + }); store.appendEvent({ occurredAt: Date.now(), - kind: 'coding.created', + kind: 'coding.user', key: 'add-v2-readme', - meta: JSON.stringify({ - topicId: 'add-v2-readme', - title: 'Add COUNCIL-V2.md overview', - description: - 'Create packages/pulse/COUNCIL-V2.md — a concise (30-50 lines) overview of the Council v2 model. Cover: WorkflowType (events, projection, roles, moderator), Topic as Rule pattern, one-rule-per-type managing all instances, Moore machine diff-driven ticks. Reference source files in topics/ directory. Do NOT modify any existing files.', - repoDir: '/home/azureuser/repos/pulse', - }), + hash, }); console.log(ts(), 'Task created: add-v2-readme\n'); @@ -265,14 +292,11 @@ async function main() { const t0ev = events[0]?.occurredAt ?? 0; for (const ev of events) { const delta = ((ev.occurredAt - t0ev) / 1000).toFixed(1); - const meta = ev.meta ? JSON.parse(ev.meta) : {}; - const detail = - meta.analysis?.slice(0, 60) ?? - meta.summary?.slice(0, 60) ?? - meta.verdict ?? - ''; + const _role = ev.kind.split('.')[1] ?? '?'; + const hasContent = ev.hash ? '📦' : ''; + const metaStr = ev.meta ? ` meta=${ev.meta}` : ''; console.log( - ` [+${delta}s] #${String(ev.id).padStart(2)} ${(ev.kind ?? '').padEnd(18)} topic=${meta.topicId ?? ev.key ?? ''} ${detail}`, + ` [+${delta}s] #${String(ev.id).padStart(2)} ${(ev.kind ?? '').padEnd(20)} key=${ev.key ?? '?'} ${hasContent}${metaStr}`, ); } diff --git a/packages/pulse/src/e2e/t11-council-v2.test.ts b/packages/pulse/src/e2e/t11-council-v2.test.ts index c0ce63a..0ea005c 100644 --- a/packages/pulse/src/e2e/t11-council-v2.test.ts +++ b/packages/pulse/src/e2e/t11-council-v2.test.ts @@ -11,6 +11,7 @@ import { join } from 'node:path'; import { createStore, type PulseStore } from '../store.js'; import { createCodingWorkflow } from '../workflows/coding-workflow.js'; import { createWorkflowRule } from '../workflows/workflow-rule-adapter.js'; +import type { WorkflowMessage } from '../workflows/workflow-type.js'; describe('Council v2 E2E', () => { let store: PulseStore; @@ -31,24 +32,47 @@ describe('Council v2 E2E', () => { if (tmpDir) rmSync(tmpDir, { recursive: true, force: true }); }); + function createUserEvent( + topicId: string, + title: string, + description: string, + repoDir: string, + ) { + const hash = store.putObject({ + content: description, + artifacts: { title, repoDir }, + }); + store.appendEvent({ + occurredAt: Date.now(), + kind: 'coding.user', + key: topicId, + hash, + }); + } + it('parallel topics: one approved, one rejected then re-coded', async () => { setup(); - // Custom reviewer: rejects task-b on first review, approves everything else const reviewCounts: Record = {}; const codingTask = createCodingWorkflow({ - reviewerFn: async (ctx, s) => { - reviewCounts[ctx.topicId] = (reviewCounts[ctx.topicId] ?? 0) + 1; + reviewerFn: async ( + _chain: WorkflowMessage[], + topicId: string, + s: PulseStore, + ) => { + reviewCounts[topicId] = (reviewCounts[topicId] ?? 0) + 1; const shouldReject = - ctx.topicId === 'task-b' && reviewCounts[ctx.topicId] === 1; + topicId === 'task-b' && reviewCounts[topicId] === 1; + const hash = s.putObject({ + content: shouldReject ? 'Needs work' : 'LGTM', + }); s.appendEvent({ occurredAt: Date.now(), - kind: 'coding.reviewed', - key: ctx.topicId, + kind: 'coding.reviewer', + key: topicId, + hash, meta: JSON.stringify({ - topicId: ctx.topicId, verdict: shouldReject ? 'rejected' : 'approved', - comments: shouldReject ? 'Needs work' : 'LGTM', }), }); }, @@ -56,29 +80,8 @@ describe('Council v2 E2E', () => { const rule = createWorkflowRule(codingTask, store); - // Create two topics - store.appendEvent({ - occurredAt: Date.now(), - kind: 'coding.created', - key: 'task-a', - meta: JSON.stringify({ - topicId: 'task-a', - title: 'Feature A', - description: 'Build feature A', - repoDir: '/tmp/a', - }), - }); - store.appendEvent({ - occurredAt: Date.now(), - kind: 'coding.created', - key: 'task-b', - meta: JSON.stringify({ - topicId: 'task-b', - title: 'Feature B', - description: 'Build feature B', - repoDir: '/tmp/b', - }), - }); + createUserEvent('task-a', 'Feature A', 'Build feature A', '/tmp/a'); + createUserEvent('task-b', 'Feature B', 'Build feature B', '/tmp/b'); // Run ticks until both are closed const allExecuted: { topicId: string; role: string }[] = []; @@ -89,22 +92,18 @@ describe('Council v2 E2E', () => { } // Verify both closed - const closedEvents = store.queryByKind('coding.closed'); + const closedEvents = store.queryByKind('coding.closer'); expect(closedEvents.length).toBe(2); - const closedIds = closedEvents.map((e) => JSON.parse(e.meta!).topicId); + const closedIds = closedEvents.map((e) => e.key); expect(closedIds.sort()).toEqual(['task-a', 'task-b']); // task-b should have been coded twice (rejected then re-coded) - const codedEvents = store.queryByKind('coding.coded'); - const taskBCoded = codedEvents.filter( - (e) => JSON.parse(e.meta!).topicId === 'task-b', - ); + const codedEvents = store.queryByKind('coding.coder'); + const taskBCoded = codedEvents.filter((e) => e.key === 'task-b'); expect(taskBCoded.length).toBe(2); // task-a should have been coded once - const taskACoded = codedEvents.filter( - (e) => JSON.parse(e.meta!).topicId === 'task-a', - ); + const taskACoded = codedEvents.filter((e) => e.key === 'task-a'); expect(taskACoded.length).toBe(1); // Verify complete event chain @@ -112,6 +111,5 @@ describe('Council v2 E2E', () => { .getAfter(0) .filter((e) => e.kind.startsWith('coding.')); expect(allEvents.length).toBeGreaterThanOrEqual(10); - // 2 created + 2 analyzed + 3 coded (a:1, b:2) + 3 reviewed (a:1, b:2) + 2 closed = 12 }); }); diff --git a/packages/pulse/src/index.ts b/packages/pulse/src/index.ts index f112a5c..823f61f 100644 --- a/packages/pulse/src/index.ts +++ b/packages/pulse/src/index.ts @@ -938,7 +938,6 @@ export { buildPersonasFromEvents } from './persona.js'; // ── Council v2: WorkflowType ──────────────────────────────────────── -export type { CodingWorkflowContext } from './workflows/coding-workflow.js'; export { createCodingWorkflow } from './workflows/coding-workflow.js'; export { createWorkflowTicker } from './workflows/index.js'; export { createArchitectRole } from './workflows/roles/architect-llm.js'; @@ -950,7 +949,9 @@ export type { } from './workflows/workflow-rule-adapter.js'; export { createWorkflowRule } from './workflows/workflow-rule-adapter.js'; export type { + TopicSummary, WorkflowAction, + WorkflowMessage, WorkflowType, } from './workflows/workflow-type.js'; diff --git a/packages/pulse/src/workflows/coding-workflow.test.ts b/packages/pulse/src/workflows/coding-workflow.test.ts index 5818e1f..83d2c4e 100644 --- a/packages/pulse/src/workflows/coding-workflow.test.ts +++ b/packages/pulse/src/workflows/coding-workflow.test.ts @@ -1,5 +1,5 @@ /** - * CodingTask WorkflowType tests — full lifecycle with real SQLite store. + * CodingTask WorkflowType tests — message chain + CAS model. * * 小橘 🍊 (NEKO Team) */ @@ -11,6 +11,7 @@ import { join } from 'node:path'; import { createStore, type PulseStore } from '../store.js'; import { createCodingWorkflow } from './coding-workflow.js'; import { createWorkflowRule } from './workflow-rule-adapter.js'; +import type { WorkflowMessage } from './workflow-type.js'; describe('CodingTask WorkflowType', () => { let store: PulseStore; @@ -31,52 +32,55 @@ describe('CodingTask WorkflowType', () => { if (tmpDir) rmSync(tmpDir, { recursive: true, force: true }); }); - it('full lifecycle: created → analyzed → coded → reviewed → closed', async () => { + function createUserEvent( + topicId: string, + title: string, + description: string, + repoDir: string, + ) { + const hash = store.putObject({ + content: description, + artifacts: { title, repoDir }, + }); + store.appendEvent({ + occurredAt: Date.now(), + kind: 'coding.user', + key: topicId, + hash, + }); + } + + it('full lifecycle: user → architect → coder → reviewer → closer', async () => { setup(); const codingTask = createCodingWorkflow(); const rule = createWorkflowRule(codingTask, store); - // Write coding.created event - store.appendEvent({ - occurredAt: Date.now(), - kind: 'coding.created', - key: 'task-1', - meta: JSON.stringify({ - topicId: 'task-1', - title: 'Fix login bug', - description: 'Users cannot log in with SSO', - repoDir: '/tmp/repo', - }), - }); + createUserEvent( + 'task-1', + 'Fix login bug', + 'Users cannot log in with SSO', + '/tmp/repo', + ); // Tick 1: architect const r1 = await rule.tick(); expect(r1.executed).toEqual([{ topicId: 'task-1', role: 'architect' }]); - - // Verify coding.analyzed was written - const events1 = store.queryByKind('coding.analyzed'); - expect(events1.length).toBe(1); + expect(store.queryByKind('coding.architect').length).toBe(1); // Tick 2: coder const r2 = await rule.tick(); expect(r2.executed).toEqual([{ topicId: 'task-1', role: 'coder' }]); - - const events2 = store.queryByKind('coding.coded'); - expect(events2.length).toBe(1); + expect(store.queryByKind('coding.coder').length).toBe(1); // Tick 3: reviewer const r3 = await rule.tick(); expect(r3.executed).toEqual([{ topicId: 'task-1', role: 'reviewer' }]); - - const events3 = store.queryByKind('coding.reviewed'); - expect(events3.length).toBe(1); + expect(store.queryByKind('coding.reviewer').length).toBe(1); // Tick 4: closer (approved → close) const r4 = await rule.tick(); expect(r4.executed).toEqual([{ topicId: 'task-1', role: 'closer' }]); - - const events4 = store.queryByKind('coding.closed'); - expect(events4.length).toBe(1); + expect(store.queryByKind('coding.closer').length).toBe(1); // Tick 5: nothing to do const r5 = await rule.tick(); @@ -88,34 +92,29 @@ describe('CodingTask WorkflowType', () => { let reviewCount = 0; const codingTask = createCodingWorkflow({ - reviewerFn: async (ctx, s) => { + reviewerFn: async ( + _chain: WorkflowMessage[], + topicId: string, + s: PulseStore, + ) => { reviewCount++; + const hash = s.putObject({ + content: reviewCount === 1 ? 'Needs fixes' : 'Looks good after fixes', + }); s.appendEvent({ occurredAt: Date.now(), - kind: 'coding.reviewed', - key: ctx.topicId, + kind: 'coding.reviewer', + key: topicId, + hash, meta: JSON.stringify({ - topicId: ctx.topicId, verdict: reviewCount === 1 ? 'rejected' : 'approved', - comments: - reviewCount === 1 ? 'Needs fixes' : 'Looks good after fixes', }), }); }, }); const rule = createWorkflowRule(codingTask, store); - store.appendEvent({ - occurredAt: Date.now(), - kind: 'coding.created', - key: 'task-2', - meta: JSON.stringify({ - topicId: 'task-2', - title: 'Add feature X', - description: 'New feature', - repoDir: '/tmp/repo', - }), - }); + createUserEvent('task-2', 'Add feature X', 'New feature', '/tmp/repo'); // architect → coder → reviewer (rejects) await rule.tick(); // architect @@ -151,34 +150,47 @@ describe('CodingTask WorkflowType', () => { const codingTask = createCodingWorkflow(); const rule = createWorkflowRule(codingTask, store, logStore); - store.appendEvent({ - occurredAt: Date.now(), - kind: 'coding.created', - key: 'task-3', - meta: JSON.stringify({ - topicId: 'task-3', - title: 'Test log isolation', - description: 'Role logs go to logStore', - repoDir: '/tmp/repo', - }), - }); + createUserEvent( + 'task-3', + 'Test log isolation', + 'Role logs go to logStore', + '/tmp/repo', + ); - // Tick: architect runs await rule.tick(); - // Role logs in logStore expect(logStore.queryByKind('coding.role-started').length).toBe(1); expect(logStore.queryByKind('coding.role-completed').length).toBe(1); - - // No role logs in business store expect(store.queryByKind('coding.role-started').length).toBe(0); expect(store.queryByKind('coding.role-completed').length).toBe(0); - - // Business event still written to store - expect(store.queryByKind('coding.analyzed').length).toBe(1); + expect(store.queryByKind('coding.architect').length).toBe(1); } finally { logStore.close(); rmSync(logTmpDir, { recursive: true, force: true }); } }); + + it('CAS stores full content, events only have hash', async () => { + setup(); + const codingTask = createCodingWorkflow(); + const rule = createWorkflowRule(codingTask, store); + + createUserEvent('task-4', 'CAS test', 'Verify CAS storage', '/tmp/repo'); + await rule.tick(); // architect + + const architectEvents = store.queryByKind('coding.architect'); + expect(architectEvents.length).toBe(1); + expect(architectEvents[0].hash).toBeTruthy(); + // meta should be empty/undefined (no verdict for architect) + const meta = architectEvents[0].meta; + expect( + !meta || meta === '{}' || meta === 'null' || meta === undefined, + ).toBe(true); + + // Content retrievable from CAS + const content = store.getObject(architectEvents[0].hash!); + expect(content).toBeTruthy(); + expect((content as any).content).toBeTruthy(); + expect((content as any).artifacts?.targetFiles).toBeInstanceOf(Array); + }); }); diff --git a/packages/pulse/src/workflows/coding-workflow.ts b/packages/pulse/src/workflows/coding-workflow.ts index 570c029..3420da5 100644 --- a/packages/pulse/src/workflows/coding-workflow.ts +++ b/packages/pulse/src/workflows/coding-workflow.ts @@ -1,152 +1,146 @@ /** - * CodingTask WorkflowType — Council v2 implementation. + * CodingTask WorkflowType — message chain + CAS model. * - * Events: - * coding.created { topicId, title, description, repoDir } - * coding.analyzed { topicId, analysis, targetFiles } - * coding.coded { topicId, filesChanged, testsPassed, summary } - * coding.reviewed { topicId, verdict: 'approved'|'rejected', comments } - * coding.closed { topicId, summary } + * Events use `coding.{role}` naming: + * coding.user — user creates a task + * coding.architect — architect analysis (CAS) + * coding.coder — coder output (CAS) + * coding.reviewer — review verdict (CAS + meta.verdict) + * coding.closer — task closed (CAS) * * 小橘 🍊 (NEKO Team) */ import type { PulseStore } from '../store.js'; -import type { WorkflowAction, WorkflowType } from './workflow-type.js'; - -// ── Context ──────────────────────────────────────────────────── - -export interface CodingWorkflowContext { - topicId: string; - title: string; - description: string; - repoDir: string; - analysis?: { analysis: string; targetFiles: string[] }; - codeResult?: { - filesChanged: string[]; - testsPassed: boolean; - summary: string; - }; - reviewResult?: { verdict: 'approved' | 'rejected'; comments: string }; - codeCount: number; - reviewCount: number; - closed?: boolean; -} +import type { + TopicSummary, + WorkflowAction, + WorkflowMessage, + WorkflowType, +} from './workflow-type.js'; // ── Roles type ───────────────────────────────────────────────── +type RoleFn = ( + chain: WorkflowMessage[], + topicId: string, + store: PulseStore, +) => Promise; + type CodingWorkflowRoles = { - architect: (ctx: CodingWorkflowContext, store: PulseStore) => Promise; - coder: (ctx: CodingWorkflowContext, store: PulseStore) => Promise; - reviewer: (ctx: CodingWorkflowContext, store: PulseStore) => Promise; - closer: (ctx: CodingWorkflowContext, store: PulseStore) => Promise; + architect: RoleFn; + coder: RoleFn; + reviewer: RoleFn; + closer: RoleFn; }; -// ── JSONata projection ───────────────────────────────────────── +// ── Projection function ──────────────────────────────────────── -/** - * JSONata expression that projects coding.* events into { [topicId]: CodingWorkflowContext }. - * - * Strategy: filter coding.* events, group by topicId (from meta), build context. - */ -const CODING_TASK_PROJECTION = ` -( - $coding := $[kind ~> /^coding\\./]; - $ids := $distinct($coding.($parse(meta).topicId)); - $merge($ids.( - $id := $; - $evts := $coding[$parse(meta).topicId = $id]; - $created := $evts[kind = 'coding.created'][0]; - $analyzed := $evts[kind = 'coding.analyzed'][0]; - $coded := $evts[kind = 'coding.coded']; - $lastCoded := $coded[$count($coded) - 1]; - $reviewed := $evts[kind = 'coding.reviewed']; - $lastReviewed := $reviewed[$count($reviewed) - 1]; - $closedEvt := $evts[kind = 'coding.closed'][0]; - $cm := $parse($created.meta); - $ctx := { - "topicId": $id, - "title": $cm.title, - "description": $cm.description, - "repoDir": $cm.repoDir, - "codeCount": $count($coded), - "reviewCount": $count($reviewed) - }; - $ctx := $analyzed ? $merge([$ctx, {"analysis": {"analysis": $parse($analyzed.meta).analysis, "targetFiles": $parse($analyzed.meta).targetFiles}}]) : $ctx; - $ctx := $lastCoded ? $merge([$ctx, {"codeResult": {"filesChanged": $parse($lastCoded.meta).filesChanged, "testsPassed": $parse($lastCoded.meta).testsPassed, "summary": $parse($lastCoded.meta).summary}}]) : $ctx; - $ctx := $lastReviewed ? $merge([$ctx, {"reviewResult": {"verdict": $parse($lastReviewed.meta).verdict, "comments": $parse($lastReviewed.meta).comments}}]) : $ctx; - $ctx := $closedEvt ? $merge([$ctx, {"closed": true}]) : $ctx; - { $id: $ctx } - )) -) -`; +function codingProjection( + events: Array<{ + kind: string; + key?: string | null; + meta?: string | null; + hash?: string | null; + occurredAt: number; + }>, +): Map { + const topics = new Map(); + // events are ordered by id ASC + for (const e of events) { + if (!e.kind.startsWith('coding.')) continue; + const topicId = e.key; + if (!topicId) continue; + const role = e.kind.split('.')[1]; + let meta: Record | undefined; + if (e.meta) { + try { + meta = JSON.parse(e.meta); + } catch {} + } + topics.set(topicId, { lastRole: role, meta }); + } + return topics; +} // ── Default mock role implementations ────────────────────────── function defaultArchitect( - ctx: CodingWorkflowContext, + chain: WorkflowMessage[], + topicId: string, store: PulseStore, ): Promise { + const userMsg = chain.find((m) => m.role === 'user'); + const userContent = (userMsg?.content as any) ?? {}; + const hash = store.putObject({ + content: `[mock] Analysis for "${userContent.artifacts?.title ?? topicId}": ${userContent.content ?? ''}`, + artifacts: { targetFiles: ['src/main.ts', 'src/utils.ts'] }, + }); store.appendEvent({ occurredAt: Date.now(), - kind: 'coding.analyzed', - key: ctx.topicId, - meta: JSON.stringify({ - topicId: ctx.topicId, - analysis: `[mock] Analysis for "${ctx.title}": ${ctx.description}`, - targetFiles: ['src/main.ts', 'src/utils.ts'], - }), + kind: 'coding.architect', + key: topicId, + hash, }); return Promise.resolve(); } function defaultCoder( - ctx: CodingWorkflowContext, + chain: WorkflowMessage[], + topicId: string, store: PulseStore, ): Promise { + const architectMsg = chain.find((m) => m.role === 'architect'); + const architectContent = (architectMsg?.content as any) ?? {}; + const hash = store.putObject({ + content: `[mock] Implemented changes`, + artifacts: { + filesChanged: architectContent.artifacts?.targetFiles ?? ['src/main.ts'], + testsPassed: true, + }, + }); store.appendEvent({ occurredAt: Date.now(), - kind: 'coding.coded', - key: ctx.topicId, - meta: JSON.stringify({ - topicId: ctx.topicId, - filesChanged: ctx.analysis?.targetFiles ?? ['src/main.ts'], - testsPassed: true, - summary: `[mock] Implemented "${ctx.title}"`, - }), + kind: 'coding.coder', + key: topicId, + hash, }); return Promise.resolve(); } function defaultReviewer( - ctx: CodingWorkflowContext, + _chain: WorkflowMessage[], + topicId: string, store: PulseStore, ): Promise { + const hash = store.putObject({ + content: `[mock] Code looks good`, + }); store.appendEvent({ occurredAt: Date.now(), - kind: 'coding.reviewed', - key: ctx.topicId, - meta: JSON.stringify({ - topicId: ctx.topicId, - verdict: 'approved' as const, - comments: `[mock] Code looks good for "${ctx.title}"`, - }), + kind: 'coding.reviewer', + key: topicId, + hash, + meta: JSON.stringify({ verdict: 'approved' }), }); return Promise.resolve(); } function defaultCloser( - ctx: CodingWorkflowContext, + chain: WorkflowMessage[], + topicId: string, store: PulseStore, ): Promise { + const userMsg = chain.find((m) => m.role === 'user'); + const userContent = (userMsg?.content as any) ?? {}; + const hash = store.putObject({ + content: `Completed: ${userContent.artifacts?.title ?? topicId}`, + }); store.appendEvent({ occurredAt: Date.now(), - kind: 'coding.closed', - key: ctx.topicId, - meta: JSON.stringify({ - topicId: ctx.topicId, - summary: `Completed: ${ctx.title}`, - }), + kind: 'coding.closer', + key: topicId, + hash, }); return Promise.resolve(); } @@ -154,13 +148,10 @@ function defaultCloser( // ── Factory ──────────────────────────────────────────────────── export function createCodingWorkflow(opts?: { - architectFn?: ( - ctx: CodingWorkflowContext, - store: PulseStore, - ) => Promise; - coderFn?: (ctx: CodingWorkflowContext, store: PulseStore) => Promise; - reviewerFn?: (ctx: CodingWorkflowContext, store: PulseStore) => Promise; -}): WorkflowType { + architectFn?: RoleFn; + coderFn?: RoleFn; + reviewerFn?: RoleFn; +}): WorkflowType { const roles: CodingWorkflowRoles = { architect: opts?.architectFn ?? defaultArchitect, coder: opts?.coderFn ?? defaultCoder, @@ -169,23 +160,34 @@ export function createCodingWorkflow(opts?: { }; const moderator = ( - _roles: CodingWorkflowRoles, - topics: Record, + topics: Map, ): WorkflowAction[] => { const actions: WorkflowAction[] = []; - for (const [topicId, ctx] of Object.entries(topics)) { - if (ctx.closed) continue; - if (!ctx.analysis) { - actions.push({ topicId, role: 'architect' }); - } else if (!ctx.codeResult) { - actions.push({ topicId, role: 'coder' }); - } else if (!ctx.reviewResult || ctx.codeCount > ctx.reviewCount) { - // Need review: either never reviewed, or re-coded after last review - actions.push({ topicId, role: 'reviewer' }); - } else if (ctx.reviewResult.verdict === 'rejected') { - actions.push({ topicId, role: 'coder' }); - } else if (ctx.reviewResult.verdict === 'approved') { - actions.push({ topicId, role: 'closer' }); + for (const [topicId, summary] of topics) { + const { lastRole, meta } = summary; + let nextRole: string | null = null; + switch (lastRole) { + case 'user': + nextRole = 'architect'; + break; + case 'architect': + nextRole = 'coder'; + break; + case 'coder': + nextRole = 'reviewer'; + break; + case 'reviewer': + nextRole = meta?.verdict === 'rejected' ? 'coder' : 'closer'; + break; + case 'closer': + nextRole = null; + break; + } + if (nextRole) { + actions.push({ + topicId, + role: nextRole as keyof CodingWorkflowRoles & string, + }); } } return actions; @@ -193,7 +195,7 @@ export function createCodingWorkflow(opts?: { return { name: 'coding', - projection: CODING_TASK_PROJECTION, + projection: codingProjection, roles, moderator, }; diff --git a/packages/pulse/src/workflows/index.ts b/packages/pulse/src/workflows/index.ts index c33b84f..f25a3fd 100644 --- a/packages/pulse/src/workflows/index.ts +++ b/packages/pulse/src/workflows/index.ts @@ -4,10 +4,7 @@ * 小橘 🍊 (NEKO Team) */ -export { - type CodingWorkflowContext, - createCodingWorkflow, -} from './coding-workflow.js'; +export { createCodingWorkflow } from './coding-workflow.js'; export { createArchitectRole } from './roles/architect-llm.js'; export { createCoderRole } from './roles/coder-cursor.js'; export { createReviewerRole } from './roles/reviewer-cursor.js'; @@ -16,7 +13,12 @@ export { type WorkflowRule, type WorkflowTickResult, } from './workflow-rule-adapter.js'; -export type { WorkflowAction, WorkflowType } from './workflow-type.js'; +export type { + TopicSummary, + WorkflowAction, + WorkflowMessage, + WorkflowType, +} from './workflow-type.js'; import type { WorkflowRule } from './workflow-rule-adapter.js'; @@ -29,9 +31,6 @@ export function createWorkflowTicker( ): () => Promise { let pending: Promise | null = null; return () => { - // Serialize tick calls: if a tick is in progress, chain after it. - // This ensures the Moore machine's critical section (read → diff → update baseline) - // is never concurrent, even when called from fire-and-forget executors. const run = async () => { for (const rule of rules) { await rule.tick(); diff --git a/packages/pulse/src/workflows/roles/architect-llm.test.ts b/packages/pulse/src/workflows/roles/architect-llm.test.ts index 1356e17..fc84aa9 100644 --- a/packages/pulse/src/workflows/roles/architect-llm.test.ts +++ b/packages/pulse/src/workflows/roles/architect-llm.test.ts @@ -1,5 +1,5 @@ /** - * Architect role tests — mock LLM, real SQLite store. + * Architect role tests — mock LLM, real SQLite store (message chain model). * * 小橘 🍊 (NEKO Team) */ @@ -10,7 +10,7 @@ import { tmpdir } from 'node:os'; import { join } from 'node:path'; import type { LlmClient } from '../../llm-client.js'; import { createStore, type PulseStore } from '../../store.js'; -import type { CodingWorkflowContext } from '../coding-workflow.js'; +import type { WorkflowMessage } from '../workflow-type.js'; import { createArchitectRole } from './architect-llm.js'; describe('createArchitectRole', () => { @@ -32,7 +32,7 @@ describe('createArchitectRole', () => { if (tmpDir) rmSync(tmpDir, { recursive: true, force: true }); }); - it('calls LLM and writes coding.analyzed event', async () => { + it('calls LLM and writes coding.architect event with CAS', async () => { setup(); const mockLlm: LlmClient = { @@ -46,37 +46,27 @@ describe('createArchitectRole', () => { const role = createArchitectRole(mockLlm); - const ctx: CodingWorkflowContext = { - topicId: 'test-task-1', - title: 'Fix the bug', - description: 'There is a bug in main.ts', - repoDir: '/tmp/test-repo', - codeCount: 0, - reviewCount: 0, - }; + const chain: WorkflowMessage[] = [ + { + role: 'user', + content: { + content: 'There is a bug in main.ts', + artifacts: { title: 'Fix the bug', repoDir: '/tmp/test-repo' }, + }, + timestamp: Date.now(), + }, + ]; - // Write coding.created first - store.appendEvent({ - occurredAt: Date.now(), - kind: 'coding.created', - key: ctx.topicId, - meta: JSON.stringify({ - topicId: ctx.topicId, - title: ctx.title, - description: ctx.description, - repoDir: ctx.repoDir, - }), - }); - - await role(ctx, store); + await role(chain, 'test-task-1', store); const events = store.getAfter(0); - const analyzed = events.find((e) => e.kind === 'coding.analyzed'); - expect(analyzed).toBeDefined(); - const meta = JSON.parse(analyzed!.meta!); - expect(meta.topicId).toBe('test-task-1'); - expect(meta.analysis).toBe('Need to update main.ts'); - expect(meta.targetFiles).toEqual(['src/main.ts']); + const architectEvt = events.find((e) => e.kind === 'coding.architect'); + expect(architectEvt).toBeDefined(); + expect(architectEvt!.hash).toBeTruthy(); + + const content = store.getObject(architectEvt!.hash!) as any; + expect(content.content).toBe('Need to update main.ts'); + expect(content.artifacts.targetFiles).toEqual(['src/main.ts']); }); it('handles non-JSON LLM response gracefully', async () => { @@ -87,22 +77,25 @@ describe('createArchitectRole', () => { }; const role = createArchitectRole(mockLlm); - const ctx: CodingWorkflowContext = { - topicId: 'test-2', - title: 'Task', - description: 'Desc', - repoDir: '/tmp', - codeCount: 0, - reviewCount: 0, - }; + const chain: WorkflowMessage[] = [ + { + role: 'user', + content: { + content: 'Desc', + artifacts: { title: 'Task', repoDir: '/tmp' }, + }, + timestamp: Date.now(), + }, + ]; - await role(ctx, store); + await role(chain, 'test-2', store); const events = store.getAfter(0); - const analyzed = events.find((e) => e.kind === 'coding.analyzed'); - expect(analyzed).toBeDefined(); - const meta = JSON.parse(analyzed!.meta!); - expect(meta.analysis).toBe('Just do it'); - expect(meta.targetFiles).toEqual([]); + const architectEvt = events.find((e) => e.kind === 'coding.architect'); + expect(architectEvt).toBeDefined(); + + const content = store.getObject(architectEvt!.hash!) as any; + expect(content.content).toBe('Just do it'); + expect(content.artifacts.targetFiles).toEqual([]); }); }); diff --git a/packages/pulse/src/workflows/roles/architect-llm.ts b/packages/pulse/src/workflows/roles/architect-llm.ts index 739d0e2..7aea803 100644 --- a/packages/pulse/src/workflows/roles/architect-llm.ts +++ b/packages/pulse/src/workflows/roles/architect-llm.ts @@ -6,12 +6,22 @@ import type { LlmClient } from '../../llm-client.js'; import type { PulseStore } from '../../store.js'; -import type { CodingWorkflowContext } from '../coding-workflow.js'; +import type { WorkflowMessage } from '../workflow-type.js'; export function createArchitectRole( llmClient: LlmClient, -): (ctx: CodingWorkflowContext, store: PulseStore) => Promise { - return async (ctx, store) => { +): ( + chain: WorkflowMessage[], + topicId: string, + store: PulseStore, +) => Promise { + return async (chain, topicId, store) => { + const userMsg = chain.find((m) => m.role === 'user'); + const userContent = (userMsg?.content as any) ?? {}; + const title = userContent.artifacts?.title ?? topicId; + const description = userContent.content ?? ''; + const repoDir = userContent.artifacts?.repoDir ?? ''; + const resp = await llmClient.chat({ messages: [ { @@ -21,7 +31,7 @@ export function createArchitectRole( }, { role: 'user', - content: `Task: ${ctx.title}\nDescription: ${ctx.description}\nRepo: ${ctx.repoDir}`, + content: `Task: ${title}\nDescription: ${description}\nRepo: ${repoDir}`, }, ], }); @@ -33,15 +43,15 @@ export function createArchitectRole( parsed = { analysis: resp.content ?? 'No analysis', targetFiles: [] }; } + const hash = store.putObject({ + content: parsed.analysis ?? 'No analysis', + artifacts: { targetFiles: parsed.targetFiles ?? [] }, + }); store.appendEvent({ occurredAt: Date.now(), - kind: 'coding.analyzed', - key: ctx.topicId, - meta: JSON.stringify({ - topicId: ctx.topicId, - analysis: parsed.analysis ?? 'No analysis', - targetFiles: parsed.targetFiles ?? [], - }), + kind: 'coding.architect', + key: topicId, + hash, }); }; } diff --git a/packages/pulse/src/workflows/roles/coder-cursor.test.ts b/packages/pulse/src/workflows/roles/coder-cursor.test.ts index 2f3836f..cf439d0 100644 --- a/packages/pulse/src/workflows/roles/coder-cursor.test.ts +++ b/packages/pulse/src/workflows/roles/coder-cursor.test.ts @@ -1,6 +1,5 @@ /** - * Coder role tests — mock Cursor agent (cannot spawn real agent in test). - * Tests the event writing logic using the default mock from CodingTaskType. + * Coder role tests — mock Cursor agent (message chain model). * * 小橘 🍊 (NEKO Team) */ @@ -10,7 +9,7 @@ import { mkdtempSync, rmSync } from 'node:fs'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; import { createStore, type PulseStore } from '../../store.js'; -import type { CodingWorkflowContext } from '../coding-workflow.js'; +import type { WorkflowMessage } from '../workflow-type.js'; describe('coder-cursor role', () => { let store: PulseStore; @@ -31,54 +30,61 @@ describe('coder-cursor role', () => { if (tmpDir) rmSync(tmpDir, { recursive: true, force: true }); }); - it('writes coding.coded event with correct structure', async () => { + it('writes coding.coder event with CAS', async () => { setup(); - // Use a mock coder that simulates what createCoderRole does (without spawning Cursor) - const mockCoderFn = async (ctx: CodingWorkflowContext, s: PulseStore) => { + // Mock coder that simulates what createCoderRole does + const mockCoderFn = async ( + chain: WorkflowMessage[], + topicId: string, + s: PulseStore, + ) => { + const architectMsg = chain.find((m) => m.role === 'architect'); + const architectContent = (architectMsg?.content as any) ?? {}; + const hash = s.putObject({ + content: 'Mock implementation done', + artifacts: { + filesChanged: architectContent.artifacts?.targetFiles ?? [], + testsPassed: true, + }, + }); s.appendEvent({ occurredAt: Date.now(), - kind: 'coding.coded', - key: ctx.topicId, - meta: JSON.stringify({ - topicId: ctx.topicId, - filesChanged: ctx.analysis?.targetFiles ?? [], - testsPassed: true, - summary: 'Mock implementation done', - }), + kind: 'coding.coder', + key: topicId, + hash, }); }; - const ctx: CodingWorkflowContext = { - topicId: 'coder-test-1', - title: 'Add feature', - description: 'Add a new feature', - repoDir: '/tmp/test-repo', - analysis: { analysis: 'Modify utils', targetFiles: ['src/utils.ts'] }, - codeCount: 0, - reviewCount: 0, - }; + const chain: WorkflowMessage[] = [ + { + role: 'user', + content: { + content: 'Add a new feature', + artifacts: { title: 'Add feature', repoDir: '/tmp/test-repo' }, + }, + timestamp: Date.now(), + }, + { + role: 'architect', + content: { + content: 'Modify utils', + artifacts: { targetFiles: ['src/utils.ts'] }, + }, + timestamp: Date.now(), + }, + ]; - store.appendEvent({ - occurredAt: Date.now(), - kind: 'coding.created', - key: ctx.topicId, - meta: JSON.stringify({ - topicId: ctx.topicId, - title: ctx.title, - description: ctx.description, - repoDir: ctx.repoDir, - }), - }); - - await mockCoderFn(ctx, store); + await mockCoderFn(chain, 'coder-test-1', store); const events = store.getAfter(0); - const coded = events.find((e) => e.kind === 'coding.coded'); + const coded = events.find((e) => e.kind === 'coding.coder'); expect(coded).toBeDefined(); - const meta = JSON.parse(coded!.meta!); - expect(meta.topicId).toBe('coder-test-1'); - expect(meta.filesChanged).toEqual(['src/utils.ts']); - expect(meta.testsPassed).toBe(true); + expect(coded!.hash).toBeTruthy(); + + const content = store.getObject(coded!.hash!) as any; + expect(content.content).toBe('Mock implementation done'); + expect(content.artifacts.filesChanged).toEqual(['src/utils.ts']); + expect(content.artifacts.testsPassed).toBe(true); }); }); diff --git a/packages/pulse/src/workflows/roles/coder-cursor.ts b/packages/pulse/src/workflows/roles/coder-cursor.ts index 8011f9f..bb83e4d 100644 --- a/packages/pulse/src/workflows/roles/coder-cursor.ts +++ b/packages/pulse/src/workflows/roles/coder-cursor.ts @@ -8,39 +8,54 @@ import { mkdirSync, unlinkSync, writeFileSync } from 'node:fs'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; import type { PulseStore } from '../../store.js'; -import type { CodingWorkflowContext } from '../coding-workflow.js'; +import type { WorkflowMessage } from '../workflow-type.js'; export function createCoderRole(opts: { agentBin: string; -}): (ctx: CodingWorkflowContext, store: PulseStore) => Promise { - return async (ctx, store) => { - const prompt = `## Task: ${ctx.title} +}): ( + chain: WorkflowMessage[], + topicId: string, + store: PulseStore, +) => Promise { + return async (chain, topicId, store) => { + const userMsg = chain.find((m) => m.role === 'user'); + const userContent = (userMsg?.content as any) ?? {}; + const title = userContent.artifacts?.title ?? topicId; + const description = userContent.content ?? ''; + const repoDir = userContent.artifacts?.repoDir ?? '/tmp'; -${ctx.description} + const architectMsg = chain.find((m) => m.role === 'architect'); + const architectContent = (architectMsg?.content as any) ?? {}; + + const prompt = `## Task: ${title} + +${description} ## Architect Analysis -${ctx.analysis?.analysis ?? 'None'} +${architectContent.content ?? 'None'} ## Target Files -${(ctx.analysis?.targetFiles ?? []).join(', ') || 'Not specified'} +${(architectContent.artifacts?.targetFiles ?? []).join(', ') || 'Not specified'} ## Instructions Implement the changes. Do NOT modify any existing test files. Only create or modify source files as needed. If the task asks to create a new file, create it. If it asks to modify existing files, modify them. Run tests if applicable. Commit your changes.`; - const result = await runCursorAgent(opts.agentBin, prompt, ctx.repoDir); + const result = await runCursorAgent(opts.agentBin, prompt, repoDir); + const hash = store.putObject({ + content: result.output, + artifacts: { + filesChanged: architectContent.artifacts?.targetFiles ?? [], + testsPassed: result.success, + }, + }); store.appendEvent({ occurredAt: Date.now(), - kind: 'coding.coded', - key: ctx.topicId, - meta: JSON.stringify({ - topicId: ctx.topicId, - filesChanged: ctx.analysis?.targetFiles ?? [], - testsPassed: result.success, - summary: result.output.slice(0, 500), - }), + kind: 'coding.coder', + key: topicId, + hash, }); }; } @@ -77,7 +92,7 @@ async function runCursorAgent( }, ); - const timer = setTimeout(() => proc.kill(), 300_000); // 5 min timeout + const timer = setTimeout(() => proc.kill(), 300_000); const exitCode = await proc.exited; clearTimeout(timer); diff --git a/packages/pulse/src/workflows/roles/reviewer-cursor.test.ts b/packages/pulse/src/workflows/roles/reviewer-cursor.test.ts index 670abe5..081461e 100644 --- a/packages/pulse/src/workflows/roles/reviewer-cursor.test.ts +++ b/packages/pulse/src/workflows/roles/reviewer-cursor.test.ts @@ -1,5 +1,5 @@ /** - * Reviewer role tests — mock Cursor agent. + * Reviewer role tests — mock Cursor agent (message chain model). * * 小橘 🍊 (NEKO Team) */ @@ -9,7 +9,7 @@ import { mkdtempSync, rmSync } from 'node:fs'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; import { createStore, type PulseStore } from '../../store.js'; -import type { CodingWorkflowContext } from '../coding-workflow.js'; +import type { WorkflowMessage } from '../workflow-type.js'; describe('reviewer-cursor role', () => { let store: PulseStore; @@ -30,68 +30,70 @@ describe('reviewer-cursor role', () => { if (tmpDir) rmSync(tmpDir, { recursive: true, force: true }); }); - it('writes coding.reviewed event with approved verdict', async () => { + it('writes coding.reviewer event with approved verdict and CAS', async () => { setup(); const mockReviewerFn = async ( - ctx: CodingWorkflowContext, + _chain: WorkflowMessage[], + topicId: string, s: PulseStore, ) => { - const verdict: 'approved' | 'rejected' = 'approved'; + const hash = s.putObject({ content: 'LGTM, code looks good. APPROVED.' }); s.appendEvent({ occurredAt: Date.now(), - kind: 'coding.reviewed', - key: ctx.topicId, - meta: JSON.stringify({ - topicId: ctx.topicId, - verdict, - comments: 'LGTM, code looks good. APPROVED.', - }), + kind: 'coding.reviewer', + key: topicId, + hash, + meta: JSON.stringify({ verdict: 'approved' }), }); }; - const ctx: CodingWorkflowContext = { - topicId: 'review-test-1', - title: 'Fix bug', - description: 'Fix the bug', - repoDir: '/tmp/test-repo', - analysis: { analysis: 'Bug in utils', targetFiles: ['src/utils.ts'] }, - codeResult: { - filesChanged: ['src/utils.ts'], - testsPassed: true, - summary: 'Fixed', + const chain: WorkflowMessage[] = [ + { + role: 'user', + content: { + content: 'Fix the bug', + artifacts: { title: 'Fix bug', repoDir: '/tmp/test-repo' }, + }, + timestamp: Date.now(), }, - codeCount: 1, - reviewCount: 0, - }; + { + role: 'architect', + content: { + content: 'Bug in utils', + artifacts: { targetFiles: ['src/utils.ts'] }, + }, + timestamp: Date.now(), + }, + { + role: 'coder', + content: { + content: 'Fixed', + artifacts: { filesChanged: ['src/utils.ts'], testsPassed: true }, + }, + timestamp: Date.now(), + }, + ]; - store.appendEvent({ - occurredAt: Date.now(), - kind: 'coding.created', - key: ctx.topicId, - meta: JSON.stringify({ - topicId: ctx.topicId, - title: ctx.title, - description: ctx.description, - repoDir: ctx.repoDir, - }), - }); - - await mockReviewerFn(ctx, store); + await mockReviewerFn(chain, 'review-test-1', store); const events = store.getAfter(0); - const reviewed = events.find((e) => e.kind === 'coding.reviewed'); + const reviewed = events.find((e) => e.kind === 'coding.reviewer'); expect(reviewed).toBeDefined(); + expect(reviewed!.hash).toBeTruthy(); const meta = JSON.parse(reviewed!.meta!); - expect(meta.topicId).toBe('review-test-1'); expect(meta.verdict).toBe('approved'); + + const content = store.getObject(reviewed!.hash!) as any; + expect(content.content).toContain('APPROVED'); }); - it('writes rejected verdict when output contains "rejected"', async () => { + it('writes rejected verdict', async () => { setup(); const mockReviewerFn = async ( - ctx: CodingWorkflowContext, + _chain: WorkflowMessage[], + topicId: string, s: PulseStore, ) => { const output = 'Code has issues. REJECTED: missing error handling.'; @@ -100,32 +102,39 @@ describe('reviewer-cursor role', () => { .includes('rejected') ? 'rejected' : 'approved'; + const hash = s.putObject({ content: output }); s.appendEvent({ occurredAt: Date.now(), - kind: 'coding.reviewed', - key: ctx.topicId, - meta: JSON.stringify({ - topicId: ctx.topicId, - verdict, - comments: output.slice(0, 500), - }), + kind: 'coding.reviewer', + key: topicId, + hash, + meta: JSON.stringify({ verdict }), }); }; - const ctx: CodingWorkflowContext = { - topicId: 'review-test-2', - title: 'Bad code', - description: 'Bad', - repoDir: '/tmp', - codeResult: { filesChanged: [], testsPassed: false, summary: 'Bad' }, - codeCount: 1, - reviewCount: 0, - }; + const chain: WorkflowMessage[] = [ + { + role: 'user', + content: { + content: 'Bad', + artifacts: { title: 'Bad code', repoDir: '/tmp' }, + }, + timestamp: Date.now(), + }, + { + role: 'coder', + content: { + content: 'Bad impl', + artifacts: { filesChanged: [], testsPassed: false }, + }, + timestamp: Date.now(), + }, + ]; - await mockReviewerFn(ctx, store); + await mockReviewerFn(chain, 'review-test-2', store); const events = store.getAfter(0); - const reviewed = events.find((e) => e.kind === 'coding.reviewed'); + const reviewed = events.find((e) => e.kind === 'coding.reviewer'); const meta = JSON.parse(reviewed!.meta!); expect(meta.verdict).toBe('rejected'); }); diff --git a/packages/pulse/src/workflows/roles/reviewer-cursor.ts b/packages/pulse/src/workflows/roles/reviewer-cursor.ts index 0d956ff..33a6173 100644 --- a/packages/pulse/src/workflows/roles/reviewer-cursor.ts +++ b/packages/pulse/src/workflows/roles/reviewer-cursor.ts @@ -8,41 +8,53 @@ import { mkdirSync, unlinkSync, writeFileSync } from 'node:fs'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; import type { PulseStore } from '../../store.js'; -import type { CodingWorkflowContext } from '../coding-workflow.js'; +import type { WorkflowMessage } from '../workflow-type.js'; export function createReviewerRole(opts: { agentBin: string; -}): (ctx: CodingWorkflowContext, store: PulseStore) => Promise { - return async (ctx, store) => { - const prompt = `## Code Review: ${ctx.title} +}): ( + chain: WorkflowMessage[], + topicId: string, + store: PulseStore, +) => Promise { + return async (chain, topicId, store) => { + const userMsg = chain.find((m) => m.role === 'user'); + const userContent = (userMsg?.content as any) ?? {}; + const title = userContent.artifacts?.title ?? topicId; + const repoDir = userContent.artifacts?.repoDir ?? '/tmp'; + + const coderMsg = [...chain].reverse().find((m) => m.role === 'coder'); + const coderContent = (coderMsg?.content as any) ?? {}; + + const prompt = `## Code Review: ${title} ## What was done -${ctx.codeResult?.summary ?? 'Unknown'} +${coderContent.content ?? 'Unknown'} ## Files changed -${(ctx.codeResult?.filesChanged ?? []).join(', ')} +${(coderContent.artifacts?.filesChanged ?? []).join(', ')} ## Instructions Review the recent changes for correctness, security, and code quality. Do NOT modify any files. Only output your review. End with a clear verdict: APPROVED or REJECTED with reasons.`; - const result = await runCursorAgent(opts.agentBin, prompt, ctx.repoDir); + const result = await runCursorAgent(opts.agentBin, prompt, repoDir); const output = result.output.toLowerCase(); const verdict: 'approved' | 'rejected' = output.includes('rejected') ? 'rejected' : 'approved'; + const hash = store.putObject({ + content: result.output, + }); store.appendEvent({ occurredAt: Date.now(), - kind: 'coding.reviewed', - key: ctx.topicId, - meta: JSON.stringify({ - topicId: ctx.topicId, - verdict, - comments: result.output.slice(0, 500), - }), + kind: 'coding.reviewer', + key: topicId, + hash, + meta: JSON.stringify({ verdict }), }); }; } @@ -79,7 +91,7 @@ async function runCursorAgent( }, ); - const timer = setTimeout(() => proc.kill(), 300_000); // 5 min timeout + const timer = setTimeout(() => proc.kill(), 300_000); const exitCode = await proc.exited; clearTimeout(timer); diff --git a/packages/pulse/src/workflows/workflow-rule-adapter.test.ts b/packages/pulse/src/workflows/workflow-rule-adapter.test.ts index 96d3305..1d9df39 100644 --- a/packages/pulse/src/workflows/workflow-rule-adapter.test.ts +++ b/packages/pulse/src/workflows/workflow-rule-adapter.test.ts @@ -1,5 +1,5 @@ /** - * WorkflowRuleAdapter tests. + * WorkflowRuleAdapter tests — message chain model. * * 小橘 🍊 (NEKO Team) */ @@ -10,7 +10,12 @@ import { tmpdir } from 'node:os'; import { join } from 'node:path'; import { createStore, type PulseStore } from '../store.js'; import { createWorkflowRule } from './workflow-rule-adapter.js'; -import type { WorkflowAction, WorkflowType } from './workflow-type.js'; +import type { + TopicSummary, + WorkflowAction, + WorkflowMessage, + WorkflowType, +} from './workflow-type.js'; describe('createWorkflowRule', () => { let store: PulseStore; @@ -39,52 +44,59 @@ describe('createWorkflowRule', () => { if (tmpDir) rmSync(tmpDir, { recursive: true, force: true }); }); - it('executes a simple echo topic through tick', async () => { + it('executes a simple echo workflow through tick', async () => { setup(); - interface EchoCtx { - topicId: string; - message: string; - echoed?: boolean; - } + type EchoRoles = { + echo: ( + chain: WorkflowMessage[], + topicId: string, + store: PulseStore, + ) => Promise; + }; - const echoType: WorkflowType< - EchoCtx, - { echo: (ctx: EchoCtx, store: PulseStore) => Promise } - > = { + const echoType: WorkflowType = { name: 'echo', - projection: ` - ( - $created := $[kind = 'echo.created']; - $echoed := $[kind = 'echo.echoed']; - $echoedIds := $distinct($echoed.($parse(meta).topicId)); - $merge($created.( - $m := $parse(meta); - { $m.topicId: { - "topicId": $m.topicId, - "message": $m.message, - "echoed": $m.topicId in $echoedIds - }} - )) - ) - `, + projection: (events) => { + const topics = new Map(); + for (const e of events) { + if (!e.kind.startsWith('echo.')) continue; + if (!e.key) continue; + topics.set(e.key, { + lastRole: e.kind.split('.')[1], + meta: e.meta + ? (() => { + try { + return JSON.parse(e.meta!); + } catch { + return undefined; + } + })() + : undefined, + }); + } + return topics; + }, roles: { - echo: async (ctx, s) => { + echo: async (chain, topicId, s) => { + const userMsg = chain.find((m) => m.role === 'created'); + const userContent = (userMsg?.content as any) ?? {}; + const hash = s.putObject({ + content: `Echo: ${userContent.message ?? ''}`, + }); s.appendEvent({ occurredAt: Date.now(), kind: 'echo.echoed', - key: ctx.topicId, - meta: JSON.stringify({ - topicId: ctx.topicId, - reply: `Echo: ${ctx.message}`, - }), + key: topicId, + hash, }); }, }, - moderator: (_roles, topics) => { + moderator: (topics) => { const actions: WorkflowAction<'echo'>[] = []; - for (const [topicId, ctx] of Object.entries(topics)) { - if (!ctx.echoed) actions.push({ topicId, role: 'echo' }); + for (const [topicId, summary] of topics) { + if (summary.lastRole === 'created') + actions.push({ topicId, role: 'echo' }); } return actions; }, @@ -92,12 +104,13 @@ describe('createWorkflowRule', () => { const rule = createWorkflowRule(echoType, store, logStore); - // Create a topic + // Create a topic with CAS + const hash = store.putObject({ message: 'hello' }); store.appendEvent({ occurredAt: Date.now(), kind: 'echo.created', key: 't1', - meta: JSON.stringify({ topicId: 't1', message: 'hello' }), + hash, }); // First tick: should execute echo @@ -108,7 +121,7 @@ describe('createWorkflowRule', () => { const r2 = await rule.tick(); expect(r2.executed).toEqual([]); - // Verify role execution log events were written to logStore (not store) + // Verify role execution logs in logStore const started = logStore.queryByKind('echo.role-started'); expect(started.length).toBe(1); const startedMeta = JSON.parse(started[0].meta!); @@ -116,58 +129,53 @@ describe('createWorkflowRule', () => { const completed = logStore.queryByKind('echo.role-completed'); expect(completed.length).toBe(1); - const completedMeta = JSON.parse(completed[0].meta!); - expect(completedMeta.topicId).toBe('t1'); - expect(completedMeta.role).toBe('echo'); - expect(completedMeta.scope).toBe('echo'); - expect(typeof completedMeta.durationMs).toBe('number'); - // Verify no role log events in business store + // No role logs in business store expect(store.queryByKind('echo.role-started').length).toBe(0); - expect(store.queryByKind('echo.role-completed').length).toBe(0); }); it('writes role-failed event when role throws, continues other actions', async () => { setup(); - interface Ctx { - topicId: string; - done: boolean; - } + type Roles = { + bomb: ( + chain: WorkflowMessage[], + topicId: string, + store: PulseStore, + ) => Promise; + safe: ( + chain: WorkflowMessage[], + topicId: string, + store: PulseStore, + ) => Promise; + }; - const failType: WorkflowType< - Ctx, - { - bomb: (ctx: Ctx, store: PulseStore) => Promise; - safe: (ctx: Ctx, store: PulseStore) => Promise; - } - > = { + const failType: WorkflowType = { name: 'failtest', - projection: ` - ( - $created := $[kind = 'failtest.created']; - $merge($created.( - $m := $parse(meta); - { $m.topicId: { "topicId": $m.topicId, "done": false } } - )) - ) - `, + projection: (events) => { + const topics = new Map(); + for (const e of events) { + if (e.kind === 'failtest.created' && e.key) { + topics.set(e.key, { lastRole: 'created' }); + } + } + return topics; + }, roles: { bomb: async () => { throw new Error('kaboom'); }, - safe: async (ctx, s) => { + safe: async (_chain, topicId, s) => { s.appendEvent({ occurredAt: Date.now(), kind: 'failtest.done', - key: ctx.topicId, - meta: JSON.stringify({ topicId: ctx.topicId }), + key: topicId, }); }, }, - moderator: (_roles, topics) => { + moderator: (topics) => { const actions: WorkflowAction[] = []; - for (const [topicId] of Object.entries(topics)) { + for (const [topicId] of topics) { actions.push({ topicId, role: 'bomb' }); actions.push({ topicId, role: 'safe' }); } @@ -181,93 +189,66 @@ describe('createWorkflowRule', () => { occurredAt: Date.now(), kind: 'failtest.created', key: 't1', - meta: JSON.stringify({ topicId: 't1' }), }); const r1 = await rule.tick(); - // bomb fails so not in executed; safe succeeds expect(r1.executed).toEqual([{ topicId: 't1', role: 'safe' }]); - // Verify role-failed event in logStore const failed = logStore.queryByKind('failtest.role-failed'); expect(failed.length).toBe(1); - const failedMeta = JSON.parse(failed[0].meta!); - expect(failedMeta.role).toBe('bomb'); - expect(failedMeta.error).toBe('kaboom'); - expect(failedMeta.scope).toBe('failtest'); - - // Verify safe role completed in logStore - const safeCompleted = logStore.queryByKind('failtest.role-completed'); - expect(safeCompleted.length).toBe(1); + expect(JSON.parse(failed[0].meta!).error).toBe('kaboom'); }); it('skips logging when logStore is not provided', async () => { setup(); - interface EchoCtx { - topicId: string; - message: string; - echoed?: boolean; - } + type EchoRoles = { + echo: ( + chain: WorkflowMessage[], + topicId: string, + store: PulseStore, + ) => Promise; + }; - const echoType: WorkflowType< - EchoCtx, - { echo: (ctx: EchoCtx, store: PulseStore) => Promise } - > = { + const echoType: WorkflowType = { name: 'echo', - projection: ` - ( - $created := $[kind = 'echo.created']; - $echoed := $[kind = 'echo.echoed']; - $echoedIds := $distinct($echoed.($parse(meta).topicId)); - $merge($created.( - $m := $parse(meta); - { $m.topicId: { - "topicId": $m.topicId, - "message": $m.message, - "echoed": $m.topicId in $echoedIds - }} - )) - ) - `, + projection: (events) => { + const topics = new Map(); + for (const e of events) { + if (!e.kind.startsWith('echo.') || !e.key) continue; + topics.set(e.key, { lastRole: e.kind.split('.')[1] }); + } + return topics; + }, roles: { - echo: async (ctx, s) => { + echo: async (_chain, topicId, s) => { s.appendEvent({ occurredAt: Date.now(), kind: 'echo.echoed', - key: ctx.topicId, - meta: JSON.stringify({ - topicId: ctx.topicId, - reply: `Echo: ${ctx.message}`, - }), + key: topicId, }); }, }, - moderator: (_roles, topics) => { + moderator: (topics) => { const actions: WorkflowAction<'echo'>[] = []; - for (const [topicId, ctx] of Object.entries(topics)) { - if (!ctx.echoed) actions.push({ topicId, role: 'echo' }); + for (const [topicId, s] of topics) { + if (s.lastRole === 'created') actions.push({ topicId, role: 'echo' }); } return actions; }, }; - // No logStore — should work without logging const rule = createWorkflowRule(echoType, store); store.appendEvent({ occurredAt: Date.now(), kind: 'echo.created', key: 't1', - meta: JSON.stringify({ topicId: 't1', message: 'hello' }), }); const r1 = await rule.tick(); expect(r1.executed).toEqual([{ topicId: 't1', role: 'echo' }]); - - // No role log events in store expect(store.queryByKind('echo.role-started').length).toBe(0); - expect(store.queryByKind('echo.role-completed').length).toBe(0); }); it('Moore diff prevents re-execution of same action', async () => { @@ -275,36 +256,34 @@ describe('createWorkflowRule', () => { let execCount = 0; - interface Ctx { - topicId: string; - done: boolean; - } + type Roles = { + work: ( + chain: WorkflowMessage[], + topicId: string, + store: PulseStore, + ) => Promise; + }; - // A topic type where moderator always returns action for non-done topics - // but the role does NOT write any event (so topic stays non-done) - const stickyType: WorkflowType< - Ctx, - { work: (ctx: Ctx, store: PulseStore) => Promise } - > = { + const stickyType: WorkflowType = { name: 'sticky', - projection: ` - ( - $created := $[kind = 'sticky.created']; - $merge($created.( - $m := $parse(meta); - { $m.topicId: { "topicId": $m.topicId, "done": false } } - )) - ) - `, + projection: (events) => { + const topics = new Map(); + for (const e of events) { + if (e.kind === 'sticky.created' && e.key) { + topics.set(e.key, { lastRole: 'created' }); + } + } + return topics; + }, roles: { work: async () => { execCount++; }, }, - moderator: (_roles, topics) => { + moderator: (topics) => { const actions: WorkflowAction<'work'>[] = []; - for (const [topicId, ctx] of Object.entries(topics)) { - if (!ctx.done) actions.push({ topicId, role: 'work' }); + for (const [topicId] of topics) { + actions.push({ topicId, role: 'work' }); } return actions; }, @@ -316,68 +295,57 @@ describe('createWorkflowRule', () => { occurredAt: Date.now(), kind: 'sticky.created', key: 't1', - meta: JSON.stringify({ topicId: 't1' }), }); - // First tick: executes await rule.tick(); expect(execCount).toBe(1); - // Second tick: same action key, Moore diff skips await rule.tick(); expect(execCount).toBe(1); }); - it('baseline updates before execution — prevents duplicate dispatch on concurrent ticks', async () => { + it('baseline updates before execution — prevents duplicate dispatch', async () => { setup(); - // Track execution order: when role starts and when baseline is checked let roleStartCount = 0; let roleResolve: (() => void) | null = null; - interface Ctx { - topicId: string; - processed: boolean; - } + type Roles = { + process: ( + chain: WorkflowMessage[], + topicId: string, + store: PulseStore, + ) => Promise; + }; - const slowType: WorkflowType< - Ctx, - { process: (ctx: Ctx, store: PulseStore) => Promise } - > = { + const slowType: WorkflowType = { name: 'slow', - projection: ` - ( - $created := $[kind = 'slow.created']; - $processed := $[kind = 'slow.processed']; - $processedIds := $distinct($processed.($parse(meta).topicId)); - $merge($created.( - $m := $parse(meta); - { $m.topicId: { - "topicId": $m.topicId, - "processed": $m.topicId in $processedIds - }} - )) - ) - `, + projection: (events) => { + const topics = new Map(); + for (const e of events) { + if (!e.kind.startsWith('slow.') || !e.key) continue; + topics.set(e.key, { lastRole: e.kind.split('.')[1] }); + } + return topics; + }, roles: { - process: async (ctx, s) => { + process: async (_chain, topicId, s) => { roleStartCount++; - // Simulate slow execution — role blocks until externally resolved await new Promise((resolve) => { roleResolve = resolve; }); s.appendEvent({ occurredAt: Date.now(), kind: 'slow.processed', - key: ctx.topicId, - meta: JSON.stringify({ topicId: ctx.topicId }), + key: topicId, }); }, }, - moderator: (_roles, topics) => { + moderator: (topics) => { const actions: WorkflowAction<'process'>[] = []; - for (const [topicId, ctx] of Object.entries(topics)) { - if (!ctx.processed) actions.push({ topicId, role: 'process' }); + for (const [topicId, s] of topics) { + if (s.lastRole === 'created') + actions.push({ topicId, role: 'process' }); } return actions; }, @@ -389,27 +357,82 @@ describe('createWorkflowRule', () => { occurredAt: Date.now(), kind: 'slow.created', key: 't1', - meta: JSON.stringify({ topicId: 't1' }), }); - // Start first tick — it will block inside the role const tick1 = rule.tick(); - - // Wait a bit for tick1 to reach role execution await new Promise((r) => setTimeout(r, 50)); expect(roleStartCount).toBe(1); - // Start second tick while tick1's role is still running - // Because baseline was updated BEFORE execution, - // tick2 sees snapshot === baseline → skips (no-op) const tick2Promise = rule.tick(); const r2 = await tick2Promise; expect(r2.executed).toEqual([]); - expect(roleStartCount).toBe(1); // ← still 1, not 2! + expect(roleStartCount).toBe(1); - // Resolve tick1's role roleResolve!(); const r1 = await tick1; expect(r1.executed).toEqual([{ topicId: 't1', role: 'process' }]); }); + + it('chain is passed to role functions with CAS content', async () => { + setup(); + + let receivedChain: WorkflowMessage[] = []; + + type Roles = { + responder: ( + chain: WorkflowMessage[], + topicId: string, + store: PulseStore, + ) => Promise; + }; + + const chainType: WorkflowType = { + name: 'chain', + projection: (events) => { + const topics = new Map(); + for (const e of events) { + if (!e.kind.startsWith('chain.') || !e.key) continue; + topics.set(e.key, { lastRole: e.kind.split('.')[1] }); + } + return topics; + }, + roles: { + responder: async (chain, topicId, s) => { + receivedChain = chain; + const hash = s.putObject({ content: 'responded' }); + s.appendEvent({ + occurredAt: Date.now(), + kind: 'chain.responded', + key: topicId, + hash, + }); + }, + }, + moderator: (topics) => { + const actions: WorkflowAction<'responder'>[] = []; + for (const [topicId, s] of topics) { + if (s.lastRole === 'created') + actions.push({ topicId, role: 'responder' }); + } + return actions; + }, + }; + + const rule = createWorkflowRule(chainType, store); + + const hash = store.putObject({ message: 'hello world', extra: 42 }); + store.appendEvent({ + occurredAt: Date.now(), + kind: 'chain.created', + key: 't1', + hash, + }); + + await rule.tick(); + + expect(receivedChain.length).toBe(1); + expect(receivedChain[0].role).toBe('created'); + expect((receivedChain[0].content as any).message).toBe('hello world'); + expect((receivedChain[0].content as any).extra).toBe(42); + }); }); diff --git a/packages/pulse/src/workflows/workflow-rule-adapter.ts b/packages/pulse/src/workflows/workflow-rule-adapter.ts index 3e4d054..e2f0c91 100644 --- a/packages/pulse/src/workflows/workflow-rule-adapter.ts +++ b/packages/pulse/src/workflows/workflow-rule-adapter.ts @@ -1,33 +1,33 @@ /** - * WorkflowType → Pulse Rule adapter. + * WorkflowType → Pulse Rule adapter (message chain model). * * createWorkflowRule(workflowType, store) wraps a WorkflowType as a standalone * tick function. Each tick has two phases: * * ── Critical section (must be serialized, never concurrent) ── - * 1. Read events → JSONata projection → snapshot - * 2. Moore diff: compare snapshot to baseline; skip if unchanged - * 3. Moderator decides actions from new snapshot + * 1. Read events → projection → per-topic summaries + * 2. Moore diff: compare summaries to baseline; skip if unchanged + * 3. Moderator decides actions from new summaries * 4. Update baseline BEFORE execution (so concurrent ticks see new baseline) * * ── Execution (can be async, concurrent control is executor's job) ── - * 5. Execute role functions + * 5. Build message chain from CAS, execute role functions * * ⚠️ IMPORTANT: Baseline is updated in step 4, BEFORE role execution. * This prevents duplicate dispatches when tick() is called again while - * a slow role (e.g. Cursor agent ~60s) is still running. The next tick - * sees the updated baseline, compares snapshot (unchanged because the - * role hasn't written its event yet), and correctly skips. - * - * If you move baseline update to AFTER execution, you reintroduce the - * duplicate dispatch bug. See test: "baseline updates before execution". + * a slow role (e.g. Cursor agent ~60s) is still running. * * 小橘 🍊 (NEKO Team) */ import jsonata from 'jsonata'; import type { PulseStore } from '../store.js'; -import type { WorkflowAction, WorkflowType } from './workflow-type.js'; +import type { + TopicSummary, + WorkflowAction, + WorkflowMessage, + WorkflowType, +} from './workflow-type.js'; /** * Tick result from a topic rule. @@ -35,8 +35,8 @@ import type { WorkflowAction, WorkflowType } from './workflow-type.js'; export interface WorkflowTickResult { /** Actions that were executed this tick */ executed: WorkflowAction[]; - /** Current projected topics after this tick */ - topics: Record; + /** Current per-topic summaries */ + topics: Map; } /** @@ -48,120 +48,153 @@ export interface WorkflowRule { /** * Create a topic rule from a WorkflowType and a PulseStore. - * - * The returned object has a tick() method. See module-level doc for - * the critical section / execution phase split and why baseline must - * update before execution. */ -export function createWorkflowRule( - workflowType: WorkflowType, +export function createWorkflowRule( + workflowType: WorkflowType, store: PulseStore, logStore?: PulseStore, ): WorkflowRule { // Moore machine state: snapshot baseline from last tick let prevSnapshotJson = ''; - // Compile JSONata expression once; register $parse helper for JSON strings - const expr = jsonata(workflowType.projection); - expr.registerFunction('parse', (s: string) => { - try { - return JSON.parse(s); - } catch { - return {}; - } - }); + // For JSONata legacy mode + let compiledExpr: ReturnType | null = null; + if (typeof workflowType.projection === 'string') { + compiledExpr = jsonata(workflowType.projection); + compiledExpr.registerFunction('parse', (s: string) => { + try { + return JSON.parse(s); + } catch { + return {}; + } + }); + } return { async tick(): Promise { // 1. Get all events from store const events = store.getAfter(0); - // Preserve meta as raw JSON string — $parse() in JSONata handles it - const parsedEvents = events.map((e) => ({ - id: e.id, - occurredAt: e.occurredAt, - kind: e.kind, - key: e.key ?? null, - hash: e.hash ?? null, - meta: e.meta ?? '{}', - })); + // 2. Build per-topic summaries + let topics: Map; - // 2. Evaluate JSONata projection - const topics: Record = - (await expr.evaluate(parsedEvents)) ?? {}; + if (typeof workflowType.projection === 'function') { + // Function mode: direct call + topics = workflowType.projection( + events.map((e) => ({ + kind: e.kind, + key: e.key ?? null, + meta: e.meta ?? null, + hash: e.hash ?? null, + occurredAt: e.occurredAt, + })), + ); + } else { + // JSONata legacy mode: evaluate then wrap into Map + const parsedEvents = events.map((e) => ({ + id: e.id, + occurredAt: e.occurredAt, + kind: e.kind, + key: e.key ?? null, + hash: e.hash ?? null, + meta: e.meta ?? '{}', + })); + const raw: Record = + (await compiledExpr!.evaluate(parsedEvents)) ?? {}; + topics = new Map(); + for (const [k, v] of Object.entries(raw)) { + topics.set(k, v as TopicSummary); + } + } - // 3. Moore diff: compare projected snapshot to baseline - // If snapshot hasn't changed since last tick, skip entirely. - // This is the core Moore machine property: output only on state change. - const currSnapshotJson = JSON.stringify(topics); + // 3. Moore diff + const summaryObj: Record = {}; + for (const [k, v] of topics) summaryObj[k] = v; + const currSnapshotJson = JSON.stringify(summaryObj); if (currSnapshotJson === prevSnapshotJson) { return { executed: [], topics }; } - // 4. Get actions from moderator (state changed, so re-evaluate) - const actions = workflowType.moderator(workflowType.roles, topics); + // 4. Get actions from moderator + const actions = workflowType.moderator(topics); - // 5. Update baseline BEFORE execution. - // ⚠️ DO NOT move this after role execution — see module-level doc. - // This is the boundary between critical section and execution phase. + // 5. Update baseline BEFORE execution prevSnapshotJson = currSnapshotJson; - // 6. Execute actions (outside critical section — may be slow) + // 6. Execute actions — build chain from CAS for each topic const executed: WorkflowAction[] = []; for (const action of actions) { const roleFn = workflowType.roles[action.role]; - if (roleFn) { - const ctx = topics[action.topicId]; - if (ctx !== undefined) { - // Write role-started event to logStore if available - if (logStore) { - logStore.appendEvent({ - occurredAt: Date.now(), - kind: `${workflowType.name}.role-started`, - key: action.topicId, - meta: JSON.stringify({ - topicId: action.topicId, - role: action.role, - scope: workflowType.name, - }), - }); - } + if (!roleFn) continue; + if (!topics.has(action.topicId)) continue; - const start = Date.now(); - try { - await roleFn(ctx, store); - // Write role-completed event to logStore if available - if (logStore) { - logStore.appendEvent({ - occurredAt: Date.now(), - kind: `${workflowType.name}.role-completed`, - key: action.topicId, - meta: JSON.stringify({ - topicId: action.topicId, - role: action.role, - scope: workflowType.name, - durationMs: Date.now() - start, - }), - }); - } - executed.push(action); - } catch (err) { - // Write role-failed event to logStore if available (don't rethrow — let other actions continue) - if (logStore) { - logStore.appendEvent({ - occurredAt: Date.now(), - kind: `${workflowType.name}.role-failed`, - key: action.topicId, - meta: JSON.stringify({ - topicId: action.topicId, - role: action.role, - scope: workflowType.name, - durationMs: Date.now() - start, - error: err instanceof Error ? err.message : String(err), - }), - }); - } - } + // Build message chain for this topic + const chain: WorkflowMessage[] = events + .filter( + (e) => + e.key === action.topicId && + e.kind.startsWith(`${workflowType.name}.`), + ) + .map((e) => ({ + role: e.kind.split('.')[1], + content: e.hash ? store.getObject(e.hash) : null, + meta: e.meta + ? (() => { + try { + return JSON.parse(e.meta!); + } catch { + return undefined; + } + })() + : undefined, + timestamp: e.occurredAt, + })); + + // Write role-started event to logStore if available + if (logStore) { + logStore.appendEvent({ + occurredAt: Date.now(), + kind: `${workflowType.name}.role-started`, + key: action.topicId, + meta: JSON.stringify({ + topicId: action.topicId, + role: action.role, + scope: workflowType.name, + }), + }); + } + + const start = Date.now(); + try { + await roleFn(chain, action.topicId, store); + if (logStore) { + logStore.appendEvent({ + occurredAt: Date.now(), + kind: `${workflowType.name}.role-completed`, + key: action.topicId, + meta: JSON.stringify({ + topicId: action.topicId, + role: action.role, + scope: workflowType.name, + durationMs: Date.now() - start, + }), + }); + } + executed.push(action); + } catch (err) { + if (logStore) { + logStore.appendEvent({ + occurredAt: Date.now(), + kind: `${workflowType.name}.role-failed`, + key: action.topicId, + meta: JSON.stringify({ + topicId: action.topicId, + role: action.role, + scope: workflowType.name, + durationMs: Date.now() - start, + error: err instanceof Error ? err.message : String(err), + }), + }); } } } diff --git a/packages/pulse/src/workflows/workflow-type.test.ts b/packages/pulse/src/workflows/workflow-type.test.ts index efc2364..a6458b6 100644 --- a/packages/pulse/src/workflows/workflow-type.test.ts +++ b/packages/pulse/src/workflows/workflow-type.test.ts @@ -6,29 +6,40 @@ import { describe, expect, it } from 'bun:test'; import type { PulseStore } from '../store.js'; -import type { WorkflowAction, WorkflowType } from './workflow-type.js'; +import type { + TopicSummary, + WorkflowAction, + WorkflowMessage, + WorkflowType, +} from './workflow-type.js'; describe('WorkflowType interface', () => { it('allows defining a minimal WorkflowType', () => { - interface EchoCtx { - topicId: string; - message: string; - echoed?: boolean; - } + type EchoRoles = { + echo: ( + chain: WorkflowMessage[], + topicId: string, + store: PulseStore, + ) => Promise; + }; - const echoType: WorkflowType< - EchoCtx, - { echo: (ctx: EchoCtx, store: PulseStore) => Promise } - > = { + const echoType: WorkflowType = { name: 'echo', - projection: '{}', - roles: { - echo: async (_ctx, _store) => {}, + projection: (events) => { + const topics = new Map(); + for (const e of events) { + if (!e.kind.startsWith('echo.') || !e.key) continue; + topics.set(e.key, { lastRole: e.kind.split('.')[1] }); + } + return topics; }, - moderator: (_roles, topics) => { + roles: { + echo: async (_chain, _topicId, _store) => {}, + }, + moderator: (topics) => { const actions: WorkflowAction<'echo'>[] = []; - for (const [topicId, ctx] of Object.entries(topics)) { - if (!ctx.echoed) actions.push({ topicId, role: 'echo' }); + for (const [topicId, s] of topics) { + if (s.lastRole === 'created') actions.push({ topicId, role: 'echo' }); } return actions; }, @@ -36,33 +47,40 @@ describe('WorkflowType interface', () => { expect(echoType.name).toBe('echo'); expect(typeof echoType.roles.echo).toBe('function'); - expect(echoType.moderator(echoType.roles, {})).toEqual([]); + expect(echoType.moderator(new Map())).toEqual([]); }); it('moderator returns actions for incomplete topics', () => { - interface Ctx { - done: boolean; - } - const tt: WorkflowType = { + type Roles = { + doIt: ( + chain: WorkflowMessage[], + topicId: string, + store: PulseStore, + ) => Promise; + }; + + const tt: WorkflowType = { name: 'test', - projection: '{}', + projection: () => new Map(), roles: { doIt: async () => {}, }, - moderator: (_roles, topics) => { + moderator: (topics) => { const actions: WorkflowAction[] = []; - for (const [topicId, ctx] of Object.entries(topics)) { - if (!ctx.done) actions.push({ topicId, role: 'doIt' }); + for (const [topicId, s] of topics) { + if (s.lastRole !== 'done') actions.push({ topicId, role: 'doIt' }); } return actions; }, }; - const result = tt.moderator(tt.roles, { - a: { done: false }, - b: { done: true }, - c: { done: false }, - }); + const input = new Map([ + ['a', { lastRole: 'created' }], + ['b', { lastRole: 'done' }], + ['c', { lastRole: 'created' }], + ]); + + const result = tt.moderator(input); expect(result).toEqual([ { topicId: 'a', role: 'doIt' }, { topicId: 'c', role: 'doIt' }, diff --git a/packages/pulse/src/workflows/workflow-type.ts b/packages/pulse/src/workflows/workflow-type.ts index 7679f48..35b308c 100644 --- a/packages/pulse/src/workflows/workflow-type.ts +++ b/packages/pulse/src/workflows/workflow-type.ts @@ -1,5 +1,5 @@ /** - * WorkflowType — Council v2 core interface. + * WorkflowType — Council v2 core interface (message chain model). * * Each WorkflowType is a single .ts file that exports events schema, * projection, roles, and moderator. One Rule manages all instances @@ -16,39 +16,72 @@ export interface WorkflowAction { role: R; } +/** A message in a workflow chain — one per role execution */ +export interface WorkflowMessage { + role: string; + content: unknown; // from CAS via store.getObject(hash) + meta?: Record; + timestamp: number; +} + +/** Per-topic summary used by moderator */ +export interface TopicSummary { + lastRole: string; + meta?: Record; +} + /** - * WorkflowType definition — one .ts file exports one object conforming to this interface. + * WorkflowType definition — message chain model. * - * @typeParam TContext - The projected context type for each topic instance - * @typeParam TRoles - Record of role name → async role function + * Projection is now a function (or JSONata string for backward compat). + * Roles receive the full message chain instead of a projected context. + * Moderator receives per-topic summaries (lastRole + meta). */ export interface WorkflowType< - TContext = unknown, TRoles extends Record< string, - (ctx: TContext, store: PulseStore) => Promise - > = Record Promise>, + ( + chain: WorkflowMessage[], + topicId: string, + store: PulseStore, + ) => Promise + > = Record< + string, + ( + chain: WorkflowMessage[], + topicId: string, + store: PulseStore, + ) => Promise + >, > { /** Topic type name, used as event prefix */ name: string; /** - * JSONata expression: projects events array → { [topicId]: TContext } - * - * Input: array of event objects with { kind, key, meta } - * Output: object keyed by topicId with TContext values + * Projection: either a JSONata string (legacy) or a function. + * Function mode: receives raw events, returns Map. + * JSONata mode: projects events → { [topicId]: any } (adapter wraps to TopicSummary). */ - projection: string; + projection: + | string + | (( + events: Array<{ + kind: string; + key?: string | null; + meta?: string | null; + hash?: string | null; + occurredAt: number; + }>, + ) => Map); - /** Topic-local roles */ + /** Topic-local roles — each receives the full message chain */ roles: TRoles; /** - * Moderator: inspects all active topic instances, returns actions to execute. - * Empty array = nothing to do (all topics complete or idle). + * Moderator: inspects per-topic summaries, returns actions to execute. + * Empty array = nothing to do. */ moderator: ( - roles: TRoles, - topics: Record, + topics: Map, ) => WorkflowAction>[]; } diff --git a/packages/upulse/src/commands/workflow.test.ts b/packages/upulse/src/commands/workflow.test.ts index 49ac99d..d695fa7 100644 --- a/packages/upulse/src/commands/workflow.test.ts +++ b/packages/upulse/src/commands/workflow.test.ts @@ -1,5 +1,5 @@ /** - * topic.test.ts — Tests for upulse topic create/list logic. + * workflow.test.ts — Tests for upulse workflow create/list logic (message chain model). */ import { afterEach, beforeEach, describe, expect, it } from 'bun:test'; @@ -12,7 +12,7 @@ import { type ScopedStore, } from '@uncaged/pulse'; -describe('topic', () => { +describe('workflow', () => { let tmpDir: string; let scopedStore: ScopedStore; let store: PulseStore; @@ -31,115 +31,104 @@ describe('topic', () => { rmSync(tmpDir, { recursive: true, force: true }); }); - it('create topic writes coding.created event', () => { + it('create topic writes coding.user event with CAS', () => { const topicId = 'fix-login-bug-a3k'; + const hash = store.putObject({ + content: 'The login page crashes on submit', + artifacts: { title: 'Fix login bug', repoDir: '/tmp/repo' }, + }); store.appendEvent({ occurredAt: Date.now(), - kind: 'coding.created', + kind: 'coding.user', key: topicId, - meta: JSON.stringify({ - topicId, - title: 'Fix login bug', - description: 'The login page crashes on submit', - repoDir: '/tmp/repo', - }), + hash, }); - const events = store.queryByKind('coding.created'); + const events = store.queryByKind('coding.user'); expect(events.length).toBe(1); expect(events[0].key).toBe(topicId); + expect(events[0].hash).toBeTruthy(); - const meta = JSON.parse(events[0].meta!); - expect(meta.title).toBe('Fix login bug'); - expect(meta.description).toBe('The login page crashes on submit'); - expect(meta.repoDir).toBe('/tmp/repo'); + const content = store.getObject(events[0].hash!) as any; + expect(content.artifacts.title).toBe('Fix login bug'); + expect(content.content).toBe('The login page crashes on submit'); + expect(content.artifacts.repoDir).toBe('/tmp/repo'); }); it('list topics shows created topic', () => { const topicId = 'add-feature-x-b2c'; + const hash = store.putObject({ + content: 'Implement feature X', + artifacts: { title: 'Add feature X', repoDir: '/tmp/repo' }, + }); store.appendEvent({ occurredAt: Date.now(), - kind: 'coding.created', + kind: 'coding.user', key: topicId, - meta: JSON.stringify({ - topicId, - title: 'Add feature X', - description: 'Implement feature X', - repoDir: '/tmp/repo', - }), + hash, }); const allEvents = store .getAfter(0) .filter((e) => e.kind.startsWith('coding.')); - expect(allEvents.length).toBe(1); const topics = new Map< string, - { title: string; state: string; lastUpdated: number } + { title: string; role: string; lastUpdated: number } >(); for (const event of allEvents) { const tid = event.key; if (!tid) continue; - const state = event.kind.replace('coding.', ''); - let meta: { title?: string } = {}; - if (event.meta) { + const role = event.kind.replace('coding.', ''); + let title = tid; + if (role === 'user' && event.hash) { try { - meta = JSON.parse(event.meta); - } catch { - /* ignore */ - } - } - const existing = topics.get(tid); - if (!existing) { - topics.set(tid, { - title: meta.title ?? tid, - state, - lastUpdated: event.occurredAt, - }); - } else { - existing.state = state; - if (event.occurredAt > existing.lastUpdated) - existing.lastUpdated = event.occurredAt; + const obj = store.getObject(event.hash) as any; + if (obj?.artifacts?.title) title = obj.artifacts.title; + } catch {} } + topics.set(tid, { title, role, lastUpdated: event.occurredAt }); } expect(topics.size).toBe(1); const t = topics.get(topicId)!; expect(t.title).toBe('Add feature X'); - expect(t.state).toBe('created'); + expect(t.role).toBe('user'); }); it('list without --all hides closed topics', () => { const now = Date.now(); - // Topic 1: active (created) + // Topic 1: active (user) + const h1 = store.putObject({ + content: 'Active', + artifacts: { title: 'Active Topic' }, + }); store.appendEvent({ occurredAt: now, - kind: 'coding.created', + kind: 'coding.user', key: 'active-topic-1a2', - meta: JSON.stringify({ - topicId: 'active-topic-1a2', - title: 'Active Topic', - }), + hash: h1, }); // Topic 2: closed - store.appendEvent({ - occurredAt: now, - kind: 'coding.created', - key: 'closed-topic-3b4', - meta: JSON.stringify({ - topicId: 'closed-topic-3b4', - title: 'Closed Topic', - }), + const h2 = store.putObject({ + content: 'Closed', + artifacts: { title: 'Closed Topic' }, }); store.appendEvent({ - occurredAt: now + 1, - kind: 'coding.closed', + occurredAt: now, + kind: 'coding.user', key: 'closed-topic-3b4', - meta: JSON.stringify({ topicId: 'closed-topic-3b4', summary: 'Done' }), + hash: h2, + }); + const h3 = store.putObject({ content: 'Done' }); + store.appendEvent({ + occurredAt: now + 1, + kind: 'coding.closer', + key: 'closed-topic-3b4', + hash: h3, }); const allEvents = store @@ -148,43 +137,35 @@ describe('topic', () => { const topics = new Map< string, - { title: string; state: string; lastUpdated: number } + { title: string; role: string; lastUpdated: number } >(); for (const event of allEvents) { const tid = event.key; if (!tid) continue; - const state = event.kind.replace('coding.', ''); - let meta: { title?: string } = {}; - if (event.meta) { + const role = event.kind.replace('coding.', ''); + let title = tid; + if (role === 'user' && event.hash) { try { - meta = JSON.parse(event.meta); - } catch { - /* ignore */ - } + const obj = store.getObject(event.hash) as any; + if (obj?.artifacts?.title) title = obj.artifacts.title; + } catch {} } const existing = topics.get(tid); if (!existing) { - topics.set(tid, { - title: meta.title ?? tid, - state, - lastUpdated: event.occurredAt, - }); + topics.set(tid, { title, role, lastUpdated: event.occurredAt }); } else { - existing.state = state; + existing.role = role; if (event.occurredAt > existing.lastUpdated) existing.lastUpdated = event.occurredAt; } } - // Without --all: only non-closed - const active = [...topics.entries()].filter( - ([, t]) => t.state !== 'closed', - ); + // Without --all: only non-closer + const active = [...topics.entries()].filter(([, t]) => t.role !== 'closer'); expect(active.length).toBe(1); expect(active[0][0]).toBe('active-topic-1a2'); // With --all: all topics - const all = [...topics.entries()]; - expect(all.length).toBe(2); + expect(topics.size).toBe(2); }); }); diff --git a/packages/upulse/src/commands/workflow.ts b/packages/upulse/src/commands/workflow.ts index d99d32a..3138906 100644 --- a/packages/upulse/src/commands/workflow.ts +++ b/packages/upulse/src/commands/workflow.ts @@ -1,8 +1,8 @@ /** - * commands/topic.ts — upulse topic create/list + * commands/workflow.ts — upulse workflow create/list * * Manage coding workflows via the routine scope store. - * Events follow the coding.* convention from @uncaged/pulse CodingTask. + * Events follow the coding.{role} convention (message chain model). */ import { randomBytes } from 'node:crypto'; @@ -23,18 +23,10 @@ function shortId(): string { return randomBytes(2).toString('hex').slice(0, 3); } -const _CODING_KINDS = [ - 'coding.created', - 'coding.analyzed', - 'coding.coded', - 'coding.reviewed', - 'coding.closed', -] as const; +type CodingRole = 'user' | 'architect' | 'coder' | 'reviewer' | 'closer'; -type CodingState = 'created' | 'analyzed' | 'coded' | 'reviewed' | 'closed'; - -function kindToState(kind: string): CodingState { - return kind.replace('coding.', '') as CodingState; +function kindToRole(kind: string): CodingRole { + return kind.replace('coding.', '') as CodingRole; } export function registerWorkflowCommand(program: Command): void { @@ -58,16 +50,17 @@ export function registerWorkflowCommand(program: Command): void { const topicId = `${slugify(title)}-${shortId()}`; + // Store full content in CAS + const hash = store.putObject({ + content: opts.desc, + artifacts: { title, repoDir: opts.repo }, + }); + store.appendEvent({ occurredAt: Date.now(), - kind: 'coding.created', + kind: 'coding.user', key: topicId, - meta: JSON.stringify({ - topicId, - title, - description: opts.desc, - repoDir: opts.repo, - }), + hash, }); console.log(topicId); @@ -103,48 +96,44 @@ export function registerWorkflowCommand(program: Command): void { return; } - // Group by topicId, track latest state and metadata + // Group by key (topicId), track latest role const topics = new Map< string, - { title: string; state: CodingState; lastUpdated: number } + { title: string; role: CodingRole; lastUpdated: number } >(); for (const event of allEvents) { const topicId = event.key; if (!topicId) continue; - const state = kindToState(event.kind); - let meta: { title?: string } = {}; - if (event.meta) { + const role = kindToRole(event.kind); + + // Try to get title from CAS for user events + let title = topicId; + if (role === 'user' && event.hash) { try { - meta = JSON.parse(event.meta); - } catch { - // ignore - } + const obj = store.getObject(event.hash) as any; + if (obj?.artifacts?.title) title = obj.artifacts.title; + } catch {} } const existing = topics.get(topicId); if (!existing) { - topics.set(topicId, { - title: meta.title ?? topicId, - state, - lastUpdated: event.occurredAt, - }); + topics.set(topicId, { title, role, lastUpdated: event.occurredAt }); } else { - // Events from getAfter are ordered by id ASC, so later events overwrite - existing.state = state; + existing.role = role; if (event.occurredAt > existing.lastUpdated) { existing.lastUpdated = event.occurredAt; } - if (meta.title) { - existing.title = meta.title; + if (role === 'user' && title !== topicId) { + existing.title = title; } } } // Filter closed unless --all const entries = [...topics.entries()].filter( - ([, t]) => opts.all || t.state !== 'closed', + ([, t]) => opts.all || t.role !== 'closer', ); if (entries.length === 0) { @@ -154,14 +143,14 @@ export function registerWorkflowCommand(program: Command): void { } console.log( - 'topicId | title | state | last updated', + 'topicId | title | role | last updated', ); console.log('-'.repeat(120)); for (const [topicId, t] of entries) { const ts = new Date(t.lastUpdated).toISOString(); console.log( - `${topicId.padEnd(49)}| ${t.title.slice(0, 30).padEnd(31)}| ${t.state.padEnd(9)}| ${ts}`, + `${topicId.padEnd(49)}| ${t.title.slice(0, 30).padEnd(31)}| ${t.role.padEnd(9)}| ${ts}`, ); }