feat: write __end__ event when workflow completes
CI / test (push) Has been cancelled

This commit is contained in:
2026-04-18 15:09:14 +00:00
parent aa89fdf144
commit d6b607d610
2 changed files with 106 additions and 1 deletions
@@ -459,6 +459,94 @@ describe('createWorkflowRule', () => {
expect(execCount).toBe(3);
});
it('writes __end__ event when moderator returns END', async () => {
setup();
type Roles = { echo: Role<null> };
const wf: WorkflowType<Roles> = {
name: 'endtest',
roles: {
echo: async () => ({ content: 'done', meta: null }),
},
moderator: (output) => (output.role === START ? 'echo' : END),
};
const rule = createWorkflowRule(wf, store);
await triggerWorkflow('endtest', 't1', 'go');
// tick 1: executes echo
await rule.tick();
// tick 2: moderator returns END → writes __end__
await rule.tick();
const endEvents = await store.queryByKind('endtest.__end__');
expect(endEvents.length).toBe(1);
expect(endEvents[0].key).toBe('t1');
const meta = JSON.parse(endEvents[0].meta!);
expect(meta.lastRole).toBe('echo');
expect(meta.reason).toBe('moderator returned END');
});
it('does not process topics that already have __end__', async () => {
setup();
let execCount = 0;
type Roles = { work: Role<null> };
const wf: WorkflowType<Roles> = {
name: 'skipend',
roles: {
work: async () => { execCount++; return { content: 'ok', meta: null }; },
},
moderator: (output) => (output.role === START ? 'work' : END),
};
const rule = createWorkflowRule(wf, store);
await triggerWorkflow('skipend', 't1', 'go');
await rule.tick(); // executes work
await rule.tick(); // writes __end__
expect(execCount).toBe(1);
// Further ticks should not process t1 again
await rule.tick();
await rule.tick();
expect(execCount).toBe(1);
// Only one __end__ event
const endEvents = await store.queryByKind('skipend.__end__');
expect(endEvents.length).toBe(1);
});
it('__end__ event meta contains lastRole', async () => {
setup();
type Roles = { step1: Role<null>; step2: Role<null> };
const wf: WorkflowType<Roles> = {
name: 'metaend',
roles: {
step1: async () => ({ content: 's1', meta: null }),
step2: async () => ({ content: 's2', meta: null }),
},
moderator: (output) => {
if (output.role === START) return 'step1';
if (output.role === 'step1') return 'step2';
return END;
},
};
const rule = createWorkflowRule(wf, store);
await triggerWorkflow('metaend', 't1', 'go');
await rule.tick(); // step1
await rule.tick(); // step2
await rule.tick(); // __end__
const endEvents = await store.queryByKind('metaend.__end__');
expect(endEvents.length).toBe(1);
const meta = JSON.parse(endEvents[0].meta!);
expect(meta.lastRole).toBe('step2');
});
it('默认保护参数正确', async () => {
setup();
@@ -124,12 +124,29 @@ export function createWorkflowRule(
// 3. Call moderator for each topic
const actions: WorkflowAction[] = [];
for (const [topicId, summary] of topics) {
// Skip already-ended topics
if (summary.lastRole === '__end__') continue;
const input: ModeratorInput<any> =
summary.lastRole === START_SUFFIX
? { role: START, meta: summary.meta }
: { role: summary.lastRole, meta: summary.meta ?? {} };
const next = wf.moderator(input, topicId);
if (next !== END && next != null) {
if (next === END || next == null) {
// Write __end__ event to mark workflow completion
const endEvent = {
occurredAt: Date.now(),
kind: `${wf.name}.__end__`,
key: topicId,
meta: JSON.stringify({ lastRole: summary.lastRole, reason: 'moderator returned END' }),
};
const written = await store.appendEvent(endEvent);
// Update checkpoint so subsequent ticks skip this topic
if (checkpoint && written) {
checkpoint.lastEventId = written.id;
checkpoint.topicSummaries.set(topicId, { lastRole: '__end__', meta: { lastRole: summary.lastRole, reason: 'moderator returned END' } });
}
} else {
actions.push({ topicId, role: next as string });
}
}