refactor: type-safe workflow — Role<Meta> + discriminated union moderator

- Role<Meta> = (chain, topicId, store) => Promise<Meta>
- MetaOf<R> extracts Meta from Role type
- RoleOutput<Roles> = discriminated union { role: K, meta: MetaOf<Roles[K]> }
- Moderator receives RoleOutput, switch on role narrows meta type
- WorkflowType simplified: no projection field, just name + roles + moderator
- Adapter builds per-topic summary internally
- coding-workflow: ArchitectMeta, CoderMeta, ReviewerMeta, CloserMeta
- -538/+326 lines — simpler AND more type-safe
This commit is contained in:
2026-04-17 05:24:10 +00:00
parent ff689ce3ce
commit c13290dd25
13 changed files with 324 additions and 536 deletions
+9 -3
View File
@@ -98,7 +98,7 @@ async function architectFn(
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
): Promise<void> {
): Promise<{ targetFiles: string[] }> {
const userMsg = chain.find((m) => m.role === 'user');
const userContent = (userMsg?.content as any) ?? {};
const title = userContent.artifacts?.title ?? topicId;
@@ -137,13 +137,14 @@ async function architectFn(
key: topicId,
hash,
});
return { targetFiles: parsed.targetFiles ?? [] };
}
async function coderFn(
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
): Promise<void> {
) {
const userMsg = chain.find((m) => m.role === 'user');
const userContent = (userMsg?.content as any) ?? {};
const title = userContent.artifacts?.title ?? topicId;
@@ -187,13 +188,17 @@ Run tests if applicable. Commit your changes.`;
key: topicId,
hash,
});
return {
filesChanged: architectContent.artifacts?.targetFiles ?? [],
testsPassed: result.success,
};
}
async function reviewerFn(
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
): Promise<void> {
) {
const userMsg = chain.find((m) => m.role === 'user');
const userContent = (userMsg?.content as any) ?? {};
const title = userContent.artifacts?.title ?? topicId;
@@ -234,6 +239,7 @@ End with a clear verdict: APPROVED or REJECTED with reasons.`;
hash,
meta: JSON.stringify({ verdict }),
});
return { verdict };
}
// ── Main ───────────────────────────────────────────────────────
@@ -63,6 +63,9 @@ describe('Council v2 E2E', () => {
reviewCounts[topicId] = (reviewCounts[topicId] ?? 0) + 1;
const shouldReject =
topicId === 'task-b' && reviewCounts[topicId] === 1;
const verdict = shouldReject
? ('rejected' as const)
: ('approved' as const);
const hash = s.putObject({
content: shouldReject ? 'Needs work' : 'LGTM',
});
@@ -71,10 +74,9 @@ describe('Council v2 E2E', () => {
kind: 'coding.reviewer',
key: topicId,
hash,
meta: JSON.stringify({
verdict: shouldReject ? 'rejected' : 'approved',
}),
meta: JSON.stringify({ verdict }),
});
return { verdict };
},
});
+10 -2
View File
@@ -938,7 +938,13 @@ export { buildPersonasFromEvents } from './persona.js';
// ── Council v2: WorkflowType ────────────────────────────────────────
export { createCodingWorkflow } from './workflows/coding-workflow.js';
export {
type ArchitectMeta,
type CoderMeta,
type CodingRoles,
createCodingWorkflow,
type ReviewerMeta,
} from './workflows/coding-workflow.js';
export { createWorkflowTicker } from './workflows/index.js';
export { createArchitectRole } from './workflows/roles/architect-llm.js';
export { createCoderRole } from './workflows/roles/coder-cursor.js';
@@ -949,7 +955,9 @@ export type {
} from './workflows/workflow-rule-adapter.js';
export { createWorkflowRule } from './workflows/workflow-rule-adapter.js';
export type {
TopicSummary,
MetaOf,
Role,
RoleOutput,
WorkflowAction,
WorkflowMessage,
WorkflowType,
@@ -98,6 +98,8 @@ describe('CodingTask WorkflowType', () => {
s: PulseStore,
) => {
reviewCount++;
const verdict =
reviewCount === 1 ? ('rejected' as const) : ('approved' as const);
const hash = s.putObject({
content: reviewCount === 1 ? 'Needs fixes' : 'Looks good after fixes',
});
@@ -106,10 +108,9 @@ describe('CodingTask WorkflowType', () => {
kind: 'coding.reviewer',
key: topicId,
hash,
meta: JSON.stringify({
verdict: reviewCount === 1 ? 'rejected' : 'approved',
}),
meta: JSON.stringify({ verdict }),
});
return { verdict };
},
});
const rule = createWorkflowRule(codingTask, store);
+71 -145
View File
@@ -1,80 +1,32 @@
/**
* CodingTask WorkflowType — message chain + CAS model.
*
* Events use `coding.{role}` naming:
* coding.user — user creates a task
* coding.architect — architect analysis (CAS)
* coding.coder — coder output (CAS)
* coding.reviewer — review verdict (CAS + meta.verdict)
* coding.closer — task closed (CAS)
*
* 小橘 🍊 (NEKO Team)
*/
import type { Role, RoleOutput, WorkflowType } from './workflow-type.js';
import type { PulseStore } from '../store.js';
import type {
TopicSummary,
WorkflowAction,
WorkflowMessage,
WorkflowType,
} from './workflow-type.js';
// ── Role Meta types ────────────────────────────────────────────
// ── Roles type ─────────────────────────────────────────────────
export type UserMeta = {};
export type ArchitectMeta = { targetFiles: string[] };
export type CoderMeta = { filesChanged: string[]; testsPassed: boolean };
export type ReviewerMeta = { verdict: 'approved' | 'rejected' };
export type CloserMeta = {};
type RoleFn = (
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void>;
// ── Roles record ───────────────────────────────────────────────
type CodingWorkflowRoles = {
architect: RoleFn;
coder: RoleFn;
reviewer: RoleFn;
closer: RoleFn;
export type CodingRoles = {
user: Role<UserMeta>;
architect: Role<ArchitectMeta>;
coder: Role<CoderMeta>;
reviewer: Role<ReviewerMeta>;
closer: Role<CloserMeta>;
};
// ── Projection function ────────────────────────────────────────
// ── Default mock implementations ───────────────────────────────
function codingProjection(
events: Array<{
kind: string;
key?: string | null;
meta?: string | null;
hash?: string | null;
occurredAt: number;
}>,
): Map<string, TopicSummary> {
const topics = new Map<string, TopicSummary>();
// events are ordered by id ASC
for (const e of events) {
if (!e.kind.startsWith('coding.')) continue;
const topicId = e.key;
if (!topicId) continue;
const role = e.kind.split('.')[1];
let meta: Record<string, unknown> | undefined;
if (e.meta) {
try {
meta = JSON.parse(e.meta);
} catch {}
}
topics.set(topicId, { lastRole: role, meta });
}
return topics;
}
// ── Default mock role implementations ──────────────────────────
function defaultArchitect(
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
): Promise<void> {
const defaultArchitect: Role<ArchitectMeta> = async (chain, topicId, store) => {
const userMsg = chain.find((m) => m.role === 'user');
const userContent = (userMsg?.content as any) ?? {};
const targetFiles = ['src/main.ts', 'src/utils.ts'];
const hash = store.putObject({
content: `[mock] Analysis for "${userContent.artifacts?.title ?? topicId}": ${userContent.content ?? ''}`,
artifacts: { targetFiles: ['src/main.ts', 'src/utils.ts'] },
artifacts: { targetFiles },
});
store.appendEvent({
occurredAt: Date.now(),
@@ -82,22 +34,18 @@ function defaultArchitect(
key: topicId,
hash,
});
return Promise.resolve();
}
return { targetFiles };
};
function defaultCoder(
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
): Promise<void> {
const defaultCoder: Role<CoderMeta> = async (chain, topicId, store) => {
const architectMsg = chain.find((m) => m.role === 'architect');
const architectContent = (architectMsg?.content as any) ?? {};
const filesChanged = architectContent.artifacts?.targetFiles ?? [
'src/main.ts',
];
const hash = store.putObject({
content: `[mock] Implemented changes`,
artifacts: {
filesChanged: architectContent.artifacts?.targetFiles ?? ['src/main.ts'],
testsPassed: true,
},
content: '[mock] Implemented changes',
artifacts: { filesChanged, testsPassed: true },
});
store.appendEvent({
occurredAt: Date.now(),
@@ -105,32 +53,23 @@ function defaultCoder(
key: topicId,
hash,
});
return Promise.resolve();
}
return { filesChanged, testsPassed: true };
};
function defaultReviewer(
_chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
): Promise<void> {
const hash = store.putObject({
content: `[mock] Code looks good`,
});
const defaultReviewer: Role<ReviewerMeta> = async (_chain, topicId, store) => {
const verdict = 'approved' as const;
const hash = store.putObject({ content: '[mock] Code looks good' });
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.reviewer',
key: topicId,
hash,
meta: JSON.stringify({ verdict: 'approved' }),
meta: JSON.stringify({ verdict }),
});
return Promise.resolve();
}
return { verdict };
};
function defaultCloser(
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
): Promise<void> {
const defaultCloser: Role<CloserMeta> = async (chain, topicId, store) => {
const userMsg = chain.find((m) => m.role === 'user');
const userContent = (userMsg?.content as any) ?? {};
const hash = store.putObject({
@@ -142,61 +81,48 @@ function defaultCloser(
key: topicId,
hash,
});
return Promise.resolve();
return {};
};
// ── Moderator (type-safe) ──────────────────────────────────────
type CodingOutput = RoleOutput<CodingRoles>;
function codingModerator(
output: CodingOutput,
_topicId: string,
): keyof CodingRoles | null {
switch (output.role) {
case 'user':
return 'architect';
case 'architect':
return 'coder';
case 'coder':
return 'reviewer';
case 'reviewer':
// TypeScript knows: output.meta is ReviewerMeta here
return output.meta.verdict === 'rejected' ? 'coder' : 'closer';
case 'closer':
return null;
}
}
// ── Factory ────────────────────────────────────────────────────
export function createCodingWorkflow(opts?: {
architectFn?: RoleFn;
coderFn?: RoleFn;
reviewerFn?: RoleFn;
}): WorkflowType<CodingWorkflowRoles> {
const roles: CodingWorkflowRoles = {
architect: opts?.architectFn ?? defaultArchitect,
coder: opts?.coderFn ?? defaultCoder,
reviewer: opts?.reviewerFn ?? defaultReviewer,
closer: defaultCloser,
};
const moderator = (
topics: Map<string, TopicSummary>,
): WorkflowAction<keyof CodingWorkflowRoles & string>[] => {
const actions: WorkflowAction<keyof CodingWorkflowRoles & string>[] = [];
for (const [topicId, summary] of topics) {
const { lastRole, meta } = summary;
let nextRole: string | null = null;
switch (lastRole) {
case 'user':
nextRole = 'architect';
break;
case 'architect':
nextRole = 'coder';
break;
case 'coder':
nextRole = 'reviewer';
break;
case 'reviewer':
nextRole = meta?.verdict === 'rejected' ? 'coder' : 'closer';
break;
case 'closer':
nextRole = null;
break;
}
if (nextRole) {
actions.push({
topicId,
role: nextRole as keyof CodingWorkflowRoles & string,
});
}
}
return actions;
};
architectFn?: Role<ArchitectMeta>;
coderFn?: Role<CoderMeta>;
reviewerFn?: Role<ReviewerMeta>;
}): WorkflowType<CodingRoles> {
return {
name: 'coding',
projection: codingProjection,
roles,
moderator,
roles: {
user: async () => ({}), // user role is passive (event created externally)
architect: opts?.architectFn ?? defaultArchitect,
coder: opts?.coderFn ?? defaultCoder,
reviewer: opts?.reviewerFn ?? defaultReviewer,
closer: defaultCloser,
},
moderator: codingModerator,
};
}
+3 -1
View File
@@ -14,7 +14,9 @@ export {
type WorkflowTickResult,
} from './workflow-rule-adapter.js';
export type {
TopicSummary,
MetaOf,
Role,
RoleOutput,
WorkflowAction,
WorkflowMessage,
WorkflowType,
@@ -5,16 +5,10 @@
*/
import type { LlmClient } from '../../llm-client.js';
import type { PulseStore } from '../../store.js';
import type { WorkflowMessage } from '../workflow-type.js';
import type { ArchitectMeta } from '../coding-workflow.js';
import type { Role } from '../workflow-type.js';
export function createArchitectRole(
llmClient: LlmClient,
): (
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void> {
export function createArchitectRole(llmClient: LlmClient): Role<ArchitectMeta> {
return async (chain, topicId, store) => {
const userMsg = chain.find((m) => m.role === 'user');
const userContent = (userMsg?.content as any) ?? {};
@@ -43,9 +37,10 @@ export function createArchitectRole(
parsed = { analysis: resp.content ?? 'No analysis', targetFiles: [] };
}
const targetFiles = parsed.targetFiles ?? [];
const hash = store.putObject({
content: parsed.analysis ?? 'No analysis',
artifacts: { targetFiles: parsed.targetFiles ?? [] },
artifacts: { targetFiles },
});
store.appendEvent({
occurredAt: Date.now(),
@@ -53,5 +48,6 @@ export function createArchitectRole(
key: topicId,
hash,
});
return { targetFiles };
};
}
@@ -7,16 +7,10 @@
import { mkdirSync, unlinkSync, writeFileSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import type { PulseStore } from '../../store.js';
import type { WorkflowMessage } from '../workflow-type.js';
import type { CoderMeta } from '../coding-workflow.js';
import type { Role } from '../workflow-type.js';
export function createCoderRole(opts: {
agentBin: string;
}): (
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void> {
export function createCoderRole(opts: { agentBin: string }): Role<CoderMeta> {
return async (chain, topicId, store) => {
const userMsg = chain.find((m) => m.role === 'user');
const userContent = (userMsg?.content as any) ?? {};
@@ -43,13 +37,12 @@ If the task asks to create a new file, create it. If it asks to modify existing
Run tests if applicable. Commit your changes.`;
const result = await runCursorAgent(opts.agentBin, prompt, repoDir);
const filesChanged = architectContent.artifacts?.targetFiles ?? [];
const testsPassed = result.success;
const hash = store.putObject({
content: result.output,
artifacts: {
filesChanged: architectContent.artifacts?.targetFiles ?? [],
testsPassed: result.success,
},
artifacts: { filesChanged, testsPassed },
});
store.appendEvent({
occurredAt: Date.now(),
@@ -57,6 +50,7 @@ Run tests if applicable. Commit your changes.`;
key: topicId,
hash,
});
return { filesChanged, testsPassed };
};
}
@@ -7,16 +7,12 @@
import { mkdirSync, unlinkSync, writeFileSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import type { PulseStore } from '../../store.js';
import type { WorkflowMessage } from '../workflow-type.js';
import type { ReviewerMeta } from '../coding-workflow.js';
import type { Role } from '../workflow-type.js';
export function createReviewerRole(opts: {
agentBin: string;
}): (
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void> {
}): Role<ReviewerMeta> {
return async (chain, topicId, store) => {
const userMsg = chain.find((m) => m.role === 'user');
const userContent = (userMsg?.content as any) ?? {};
@@ -46,9 +42,7 @@ End with a clear verdict: APPROVED or REJECTED with reasons.`;
? 'rejected'
: 'approved';
const hash = store.putObject({
content: result.output,
});
const hash = store.putObject({ content: result.output });
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.reviewer',
@@ -56,6 +50,7 @@ End with a clear verdict: APPROVED or REJECTED with reasons.`;
hash,
meta: JSON.stringify({ verdict }),
});
return { verdict };
};
}
@@ -1,5 +1,5 @@
/**
* WorkflowRuleAdapter tests — message chain model.
* WorkflowRuleAdapter tests — type-safe message chain model.
*
* 小橘 🍊 (NEKO Team)
*/
@@ -10,12 +10,7 @@ import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { createStore, type PulseStore } from '../store.js';
import { createWorkflowRule } from './workflow-rule-adapter.js';
import type {
TopicSummary,
WorkflowAction,
WorkflowMessage,
WorkflowType,
} from './workflow-type.js';
import type { Role, WorkflowMessage, WorkflowType } from './workflow-type.js';
describe('createWorkflowRule', () => {
let store: PulseStore;
@@ -48,36 +43,14 @@ describe('createWorkflowRule', () => {
setup();
type EchoRoles = {
echo: (
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void>;
created: Role<{}>;
echo: Role<{ echoed: true }>;
};
const echoType: WorkflowType<EchoRoles> = {
name: 'echo',
projection: (events) => {
const topics = new Map<string, TopicSummary>();
for (const e of events) {
if (!e.kind.startsWith('echo.')) continue;
if (!e.key) continue;
topics.set(e.key, {
lastRole: e.kind.split('.')[1],
meta: e.meta
? (() => {
try {
return JSON.parse(e.meta!);
} catch {
return undefined;
}
})()
: undefined,
});
}
return topics;
},
roles: {
created: async () => ({}),
echo: async (chain, topicId, s) => {
const userMsg = chain.find((m) => m.role === 'created');
const userContent = (userMsg?.content as any) ?? {};
@@ -86,25 +59,21 @@ describe('createWorkflowRule', () => {
});
s.appendEvent({
occurredAt: Date.now(),
kind: 'echo.echoed',
kind: 'echo.echo',
key: topicId,
hash,
});
return { echoed: true as const };
},
},
moderator: (topics) => {
const actions: WorkflowAction<'echo'>[] = [];
for (const [topicId, summary] of topics) {
if (summary.lastRole === 'created')
actions.push({ topicId, role: 'echo' });
}
return actions;
moderator: (output) => {
if (output.role === 'created') return 'echo';
return null;
},
};
const rule = createWorkflowRule(echoType, store, logStore);
// Create a topic with CAS
const hash = store.putObject({ message: 'hello' });
store.appendEvent({
occurredAt: Date.now(),
@@ -113,15 +82,12 @@ describe('createWorkflowRule', () => {
hash,
});
// First tick: should execute echo
const r1 = await rule.tick();
expect(r1.executed).toEqual([{ topicId: 't1', role: 'echo' }]);
// Second tick: echo already done, no new actions
const r2 = await rule.tick();
expect(r2.executed).toEqual([]);
// Verify role execution logs in logStore
const started = logStore.queryByKind('echo.role-started');
expect(started.length).toBe(1);
const startedMeta = JSON.parse(started[0].meta!);
@@ -130,7 +96,6 @@ describe('createWorkflowRule', () => {
const completed = logStore.queryByKind('echo.role-completed');
expect(completed.length).toBe(1);
// No role logs in business store
expect(store.queryByKind('echo.role-started').length).toBe(0);
});
@@ -138,30 +103,15 @@ describe('createWorkflowRule', () => {
setup();
type Roles = {
bomb: (
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void>;
safe: (
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void>;
created: Role<{}>;
bomb: Role<never>;
safe: Role<{}>;
};
const failType: WorkflowType<Roles> = {
name: 'failtest',
projection: (events) => {
const topics = new Map<string, TopicSummary>();
for (const e of events) {
if (e.kind === 'failtest.created' && e.key) {
topics.set(e.key, { lastRole: 'created' });
}
}
return topics;
},
roles: {
created: async () => ({}),
bomb: async () => {
throw new Error('kaboom');
},
@@ -171,16 +121,12 @@ describe('createWorkflowRule', () => {
kind: 'failtest.done',
key: topicId,
});
return {};
},
},
moderator: (topics) => {
const actions: WorkflowAction[] = [];
for (const [topicId] of topics) {
actions.push({ topicId, role: 'bomb' });
actions.push({ topicId, role: 'safe' });
}
return actions;
},
// This moderator dispatches both bomb and safe for testing error handling
// We use a cast since the adapter calls moderator per-topic
moderator: (() => 'bomb') as any,
};
const rule = createWorkflowRule(failType, store, logStore);
@@ -191,8 +137,10 @@ describe('createWorkflowRule', () => {
key: 't1',
});
// The adapter calls moderator per topic and gets 'bomb', which throws.
// Then safe is not dispatched because moderator returns one role per topic.
const r1 = await rule.tick();
expect(r1.executed).toEqual([{ topicId: 't1', role: 'safe' }]);
expect(r1.executed).toEqual([]);
const failed = logStore.queryByKind('failtest.role-failed');
expect(failed.length).toBe(1);
@@ -203,38 +151,26 @@ describe('createWorkflowRule', () => {
setup();
type EchoRoles = {
echo: (
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void>;
created: Role<{}>;
echo: Role<{}>;
};
const echoType: WorkflowType<EchoRoles> = {
name: 'echo',
projection: (events) => {
const topics = new Map<string, TopicSummary>();
for (const e of events) {
if (!e.kind.startsWith('echo.') || !e.key) continue;
topics.set(e.key, { lastRole: e.kind.split('.')[1] });
}
return topics;
},
roles: {
created: async () => ({}),
echo: async (_chain, topicId, s) => {
s.appendEvent({
occurredAt: Date.now(),
kind: 'echo.echoed',
kind: 'echo.echo',
key: topicId,
});
return {};
},
},
moderator: (topics) => {
const actions: WorkflowAction<'echo'>[] = [];
for (const [topicId, s] of topics) {
if (s.lastRole === 'created') actions.push({ topicId, role: 'echo' });
}
return actions;
moderator: (output) => {
if (output.role === 'created') return 'echo';
return null;
},
};
@@ -257,35 +193,22 @@ describe('createWorkflowRule', () => {
let execCount = 0;
type Roles = {
work: (
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void>;
created: Role<{}>;
work: Role<{}>;
};
const stickyType: WorkflowType<Roles> = {
name: 'sticky',
projection: (events) => {
const topics = new Map<string, TopicSummary>();
for (const e of events) {
if (e.kind === 'sticky.created' && e.key) {
topics.set(e.key, { lastRole: 'created' });
}
}
return topics;
},
roles: {
created: async () => ({}),
work: async () => {
execCount++;
return {};
},
},
moderator: (topics) => {
const actions: WorkflowAction<'work'>[] = [];
for (const [topicId] of topics) {
actions.push({ topicId, role: 'work' });
}
return actions;
moderator: (output) => {
if (output.role === 'created') return 'work';
return null;
},
};
@@ -311,24 +234,14 @@ describe('createWorkflowRule', () => {
let roleResolve: (() => void) | null = null;
type Roles = {
process: (
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void>;
created: Role<{}>;
process: Role<{}>;
};
const slowType: WorkflowType<Roles> = {
name: 'slow',
projection: (events) => {
const topics = new Map<string, TopicSummary>();
for (const e of events) {
if (!e.kind.startsWith('slow.') || !e.key) continue;
topics.set(e.key, { lastRole: e.kind.split('.')[1] });
}
return topics;
},
roles: {
created: async () => ({}),
process: async (_chain, topicId, s) => {
roleStartCount++;
await new Promise<void>((resolve) => {
@@ -339,15 +252,12 @@ describe('createWorkflowRule', () => {
kind: 'slow.processed',
key: topicId,
});
return {};
},
},
moderator: (topics) => {
const actions: WorkflowAction<'process'>[] = [];
for (const [topicId, s] of topics) {
if (s.lastRole === 'created')
actions.push({ topicId, role: 'process' });
}
return actions;
moderator: (output) => {
if (output.role === 'created') return 'process';
return null;
},
};
@@ -379,24 +289,14 @@ describe('createWorkflowRule', () => {
let receivedChain: WorkflowMessage[] = [];
type Roles = {
responder: (
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void>;
created: Role<{}>;
responder: Role<{}>;
};
const chainType: WorkflowType<Roles> = {
name: 'chain',
projection: (events) => {
const topics = new Map<string, TopicSummary>();
for (const e of events) {
if (!e.kind.startsWith('chain.') || !e.key) continue;
topics.set(e.key, { lastRole: e.kind.split('.')[1] });
}
return topics;
},
roles: {
created: async () => ({}),
responder: async (chain, topicId, s) => {
receivedChain = chain;
const hash = s.putObject({ content: 'responded' });
@@ -406,15 +306,12 @@ describe('createWorkflowRule', () => {
key: topicId,
hash,
});
return {};
},
},
moderator: (topics) => {
const actions: WorkflowAction<'responder'>[] = [];
for (const [topicId, s] of topics) {
if (s.lastRole === 'created')
actions.push({ topicId, role: 'responder' });
}
return actions;
moderator: (output) => {
if (output.role === 'created') return 'responder';
return null;
},
};
@@ -2,141 +2,108 @@
* WorkflowType → Pulse Rule adapter (message chain model).
*
* createWorkflowRule(workflowType, store) wraps a WorkflowType as a standalone
* tick function. Each tick has two phases:
* tick function. Each tick:
*
* ── Critical section (must be serialized, never concurrent) ──
* 1. Read events → projection → per-topic summaries
* 2. Moore diff: compare summaries to baseline; skip if unchanged
* 3. Moderator decides actions from new summaries
* 4. Update baseline BEFORE execution (so concurrent ticks see new baseline)
*
* ── Execution (can be async, concurrent control is executor's job) ──
* 5. Build message chain from CAS, execute role functions
*
* ⚠️ IMPORTANT: Baseline is updated in step 4, BEFORE role execution.
* This prevents duplicate dispatches when tick() is called again while
* a slow role (e.g. Cursor agent ~60s) is still running.
* 1. Read events → build per-topic last output (role + meta)
* 2. Moore diff: skip if unchanged
* 3. For each topic: call moderator(lastOutput, topicId) → nextRole
* 4. Update baseline BEFORE execution
* 5. Build message chain, execute role, store returned meta in CAS + event
*
* 小橘 🍊 (NEKO Team)
*/
import jsonata from 'jsonata';
import type { PulseStore } from '../store.js';
import type {
TopicSummary,
RoleOutput,
WorkflowAction,
WorkflowMessage,
WorkflowType,
} from './workflow-type.js';
/**
* Tick result from a topic rule.
*/
/** Tick result from a workflow rule */
export interface WorkflowTickResult {
/** Actions that were executed this tick */
executed: WorkflowAction[];
/** Current per-topic summaries */
topics: Map<string, TopicSummary>;
topics: Map<string, { lastRole: string; meta?: Record<string, unknown> }>;
}
/**
* A callable topic rule. Call tick() to advance all topic instances.
*/
/** A callable workflow rule */
export interface WorkflowRule {
tick: () => Promise<WorkflowTickResult>;
}
/**
* Create a topic rule from a WorkflowType and a PulseStore.
* Create a workflow rule from a WorkflowType and a PulseStore.
*/
export function createWorkflowRule(
workflowType: WorkflowType<any>,
store: PulseStore,
logStore?: PulseStore,
): WorkflowRule {
// Moore machine state: snapshot baseline from last tick
let prevSnapshotJson = '';
// For JSONata legacy mode
let compiledExpr: ReturnType<typeof jsonata> | null = null;
if (typeof workflowType.projection === 'string') {
compiledExpr = jsonata(workflowType.projection);
compiledExpr.registerFunction('parse', (s: string) => {
try {
return JSON.parse(s);
} catch {
return {};
}
});
}
return {
async tick(): Promise<WorkflowTickResult> {
// 1. Get all events from store
// 1. Get all events, build per-topic last output
const events = store.getAfter(0);
const prefix = `${workflowType.name}.`;
// 2. Build per-topic summaries
let topics: Map<string, TopicSummary>;
if (typeof workflowType.projection === 'function') {
// Function mode: direct call
topics = workflowType.projection(
events.map((e) => ({
kind: e.kind,
key: e.key ?? null,
meta: e.meta ?? null,
hash: e.hash ?? null,
occurredAt: e.occurredAt,
})),
);
} else {
// JSONata legacy mode: evaluate then wrap into Map<string, TopicSummary>
const parsedEvents = events.map((e) => ({
id: e.id,
occurredAt: e.occurredAt,
kind: e.kind,
key: e.key ?? null,
hash: e.hash ?? null,
meta: e.meta ?? '{}',
}));
const raw: Record<string, any> =
(await compiledExpr!.evaluate(parsedEvents)) ?? {};
topics = new Map<string, TopicSummary>();
for (const [k, v] of Object.entries(raw)) {
topics.set(k, v as TopicSummary);
const topics = new Map<
string,
{ lastRole: string; meta?: Record<string, unknown> }
>();
for (const e of events) {
if (!e.kind.startsWith(prefix)) continue;
const topicId = e.key;
if (!topicId) continue;
const role = e.kind.slice(prefix.length);
let meta: Record<string, unknown> | undefined;
if (e.meta) {
try {
meta = JSON.parse(e.meta);
} catch {}
}
topics.set(topicId, { lastRole: role, meta });
}
// 3. Moore diff
const summaryObj: Record<string, TopicSummary> = {};
// 2. Moore diff
const summaryObj: Record<
string,
{ lastRole: string; meta?: Record<string, unknown> }
> = {};
for (const [k, v] of topics) summaryObj[k] = v;
const currSnapshotJson = JSON.stringify(summaryObj);
if (currSnapshotJson === prevSnapshotJson) {
return { executed: [], topics };
}
// 4. Get actions from moderator
const actions = workflowType.moderator(topics);
// 3. For each topic, call moderator to get next role
const actions: WorkflowAction[] = [];
for (const [topicId, summary] of topics) {
const output: RoleOutput<any> = {
role: summary.lastRole,
meta: summary.meta ?? {},
};
const nextRole = workflowType.moderator(output, topicId);
if (nextRole != null) {
actions.push({ topicId, role: nextRole });
}
}
// 5. Update baseline BEFORE execution
// 4. Update baseline BEFORE execution
prevSnapshotJson = currSnapshotJson;
// 6. Execute actions — build chain from CAS for each topic
// 5. Execute actions
const executed: WorkflowAction[] = [];
for (const action of actions) {
const roleFn = workflowType.roles[action.role];
if (!roleFn) continue;
if (!topics.has(action.topicId)) continue;
// Build message chain for this topic
const chain: WorkflowMessage[] = events
.filter(
(e) =>
e.key === action.topicId &&
e.kind.startsWith(`${workflowType.name}.`),
)
.filter((e) => e.key === action.topicId && e.kind.startsWith(prefix))
.map((e) => ({
role: e.kind.split('.')[1],
role: e.kind.slice(prefix.length),
content: e.hash ? store.getObject(e.hash) : null,
meta: e.meta
? (() => {
@@ -150,7 +117,7 @@ export function createWorkflowRule(
timestamp: e.occurredAt,
}));
// Write role-started event to logStore if available
// Log role-started
if (logStore) {
logStore.appendEvent({
occurredAt: Date.now(),
@@ -5,85 +5,77 @@
*/
import { describe, expect, it } from 'bun:test';
import type { PulseStore } from '../store.js';
import type {
TopicSummary,
WorkflowAction,
WorkflowMessage,
WorkflowType,
} from './workflow-type.js';
import type { Role, WorkflowType } from './workflow-type.js';
describe('WorkflowType interface', () => {
it('allows defining a minimal WorkflowType', () => {
it('allows defining a minimal WorkflowType with typed roles', () => {
type EchoRoles = {
echo: (
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void>;
created: Role<{}>;
echo: Role<{ echoed: true }>;
};
const echoType: WorkflowType<EchoRoles> = {
name: 'echo',
projection: (events) => {
const topics = new Map<string, TopicSummary>();
for (const e of events) {
if (!e.kind.startsWith('echo.') || !e.key) continue;
topics.set(e.key, { lastRole: e.kind.split('.')[1] });
}
return topics;
},
roles: {
echo: async (_chain, _topicId, _store) => {},
created: async () => ({}),
echo: async (_chain, _topicId, _store) => ({ echoed: true as const }),
},
moderator: (topics) => {
const actions: WorkflowAction<'echo'>[] = [];
for (const [topicId, s] of topics) {
if (s.lastRole === 'created') actions.push({ topicId, role: 'echo' });
}
return actions;
moderator: (output) => {
if (output.role === 'created') return 'echo';
return null;
},
};
expect(echoType.name).toBe('echo');
expect(typeof echoType.roles.echo).toBe('function');
expect(echoType.moderator(new Map())).toEqual([]);
expect(echoType.moderator({ role: 'created', meta: {} })).toBe('echo');
expect(
echoType.moderator({ role: 'echo', meta: { echoed: true } }),
).toBeNull();
});
it('moderator returns actions for incomplete topics', () => {
it('moderator receives discriminated union for type-safe routing', () => {
type Roles = {
doIt: (
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void>;
user: Role<{}>;
reviewer: Role<{ verdict: 'approved' | 'rejected' }>;
coder: Role<{ files: string[] }>;
closer: Role<{}>;
};
const tt: WorkflowType<Roles> = {
const wf: WorkflowType<Roles> = {
name: 'test',
projection: () => new Map(),
roles: {
doIt: async () => {},
user: async () => ({}),
reviewer: async () => ({ verdict: 'approved' as const }),
coder: async () => ({ files: [] }),
closer: async () => ({}),
},
moderator: (topics) => {
const actions: WorkflowAction[] = [];
for (const [topicId, s] of topics) {
if (s.lastRole !== 'done') actions.push({ topicId, role: 'doIt' });
moderator: (output) => {
switch (output.role) {
case 'user':
return 'coder';
case 'coder':
return 'reviewer';
case 'reviewer':
// TypeScript narrows: output.meta is { verdict: 'approved' | 'rejected' }
return output.meta.verdict === 'rejected' ? 'coder' : 'closer';
case 'closer':
return null;
}
return actions;
},
};
const input = new Map<string, TopicSummary>([
['a', { lastRole: 'created' }],
['b', { lastRole: 'done' }],
['c', { lastRole: 'created' }],
]);
const result = tt.moderator(input);
expect(result).toEqual([
{ topicId: 'a', role: 'doIt' },
{ topicId: 'c', role: 'doIt' },
]);
// Test each transition
expect(wf.moderator({ role: 'user', meta: {} })).toBe('coder');
expect(wf.moderator({ role: 'coder', meta: { files: ['a.ts'] } })).toBe(
'reviewer',
);
expect(
wf.moderator({ role: 'reviewer', meta: { verdict: 'approved' } }),
).toBe('closer');
expect(
wf.moderator({ role: 'reviewer', meta: { verdict: 'rejected' } }),
).toBe('coder');
expect(wf.moderator({ role: 'closer', meta: {} })).toBeNull();
});
});
+58 -56
View File
@@ -1,20 +1,18 @@
/**
* WorkflowType — Council v2 core interface (message chain model).
* WorkflowType — type-safe workflow definition.
*
* Each WorkflowType is a single .ts file that exports events schema,
* projection, roles, and moderator. One Rule manages all instances
* of the same WorkflowType.
* Core ideas:
* - Role<Meta> = (chain) => Promise<Meta> — a role is a function that returns typed output
* - Workflow<Roles> = { roles, moderator } — moderator receives discriminated union of role outputs
* - kind = '{workflow}.{role}' — event kind encodes workflow name + role
* - Content in CAS (hash), meta for routing decisions only
*
* 小橘 🍊 (NEKO Team)
*/
import type { PulseStore } from '../store.js';
/** A single action produced by a moderator */
export interface WorkflowAction<R extends string = string> {
topicId: string;
role: R;
}
// ── Message chain ──────────────────────────────────────────────
/** A message in a workflow chain — one per role execution */
export interface WorkflowMessage {
@@ -24,64 +22,68 @@ export interface WorkflowMessage {
timestamp: number;
}
/** Per-topic summary used by moderator */
export interface TopicSummary {
lastRole: string;
meta?: Record<string, unknown>;
// ── Role ───────────────────────────────────────────────────────
/**
* A Role is a function that receives the message chain and produces typed output.
* The output (Meta) is stored in CAS + event meta for routing decisions.
*/
export type Role<Meta = unknown> = (
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<Meta>;
/** Extract the Meta type from a Role */
export type MetaOf<R> = R extends Role<infer M> ? M : never;
// ── Moderator (discriminated union) ────────────────────────────
/**
* Discriminated union of all role outputs.
* moderator receives { role: 'reviewer', meta: { verdict: ... } } | { role: 'coder', meta: { ... } } | ...
* switch on output.role to narrow meta type.
*/
export type RoleOutput<Roles extends Record<string, Role<any>>> = {
[K in keyof Roles & string]: { role: K; meta: MetaOf<Roles[K]> };
}[keyof Roles & string];
// ── Workflow ───────────────────────────────────────────────────
/** A single action produced by a moderator */
export interface WorkflowAction<R extends string = string> {
topicId: string;
role: R;
}
/** Per-topic summary: last role output as discriminated union */
export interface TopicSummary<
Roles extends Record<string, Role<any>> = Record<string, Role<any>>,
> {
lastOutput: RoleOutput<Roles>;
}
/**
* WorkflowType definition — message chain model.
* WorkflowType definition — type-safe, function-first.
*
* Projection is now a function (or JSONata string for backward compat).
* Roles receive the full message chain instead of a projected context.
* Moderator receives per-topic summaries (lastRole + meta).
* Roles is Record<string, Role<Meta>>. Each role's Meta type flows through
* to the moderator via discriminated union, so moderator is fully type-safe.
*/
export interface WorkflowType<
TRoles extends Record<
string,
(
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void>
> = Record<
string,
(
chain: WorkflowMessage[],
topicId: string,
store: PulseStore,
) => Promise<void>
>,
Roles extends Record<string, Role<any>> = Record<string, Role<any>>,
> {
/** Topic type name, used as event prefix */
/** Workflow name, used as event kind prefix: '{name}.{role}' */
name: string;
/**
* Projection: either a JSONata string (legacy) or a function.
* Function mode: receives raw events, returns Map<topicId, TopicSummary>.
* JSONata mode: projects events → { [topicId]: any } (adapter wraps to TopicSummary).
*/
projection:
| string
| ((
events: Array<{
kind: string;
key?: string | null;
meta?: string | null;
hash?: string | null;
occurredAt: number;
}>,
) => Map<string, TopicSummary>);
/** Topic-local roles — each receives the full message chain */
roles: TRoles;
/** Role implementations — each returns typed Meta */
roles: Roles;
/**
* Moderator: inspects per-topic summaries, returns actions to execute.
* Empty array = nothing to do.
* Moderator: receives per-topic last output (discriminated union),
* returns next role to execute, or null if done.
*/
moderator: (
topics: Map<string, TopicSummary>,
) => WorkflowAction<Extract<keyof TRoles, string>>[];
output: RoleOutput<Roles>,
topicId: string,
) => (keyof Roles & string) | null;
}