From d6b607d610601e9eac92c7fd4adafc6e5e966f42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Sat, 18 Apr 2026 15:09:14 +0000 Subject: [PATCH] feat: write __end__ event when workflow completes --- .../workflows/workflow-rule-adapter.test.ts | 88 +++++++++++++++++++ .../src/workflows/workflow-rule-adapter.ts | 19 +++- 2 files changed, 106 insertions(+), 1 deletion(-) diff --git a/packages/pulse/src/workflows/workflow-rule-adapter.test.ts b/packages/pulse/src/workflows/workflow-rule-adapter.test.ts index 4ef51ce..aad0712 100644 --- a/packages/pulse/src/workflows/workflow-rule-adapter.test.ts +++ b/packages/pulse/src/workflows/workflow-rule-adapter.test.ts @@ -459,6 +459,94 @@ describe('createWorkflowRule', () => { expect(execCount).toBe(3); }); + it('writes __end__ event when moderator returns END', async () => { + setup(); + + type Roles = { echo: Role }; + const wf: WorkflowType = { + name: 'endtest', + roles: { + echo: async () => ({ content: 'done', meta: null }), + }, + moderator: (output) => (output.role === START ? 'echo' : END), + }; + + const rule = createWorkflowRule(wf, store); + await triggerWorkflow('endtest', 't1', 'go'); + + // tick 1: executes echo + await rule.tick(); + // tick 2: moderator returns END → writes __end__ + await rule.tick(); + + const endEvents = await store.queryByKind('endtest.__end__'); + expect(endEvents.length).toBe(1); + expect(endEvents[0].key).toBe('t1'); + const meta = JSON.parse(endEvents[0].meta!); + expect(meta.lastRole).toBe('echo'); + expect(meta.reason).toBe('moderator returned END'); + }); + + it('does not process topics that already have __end__', async () => { + setup(); + + let execCount = 0; + type Roles = { work: Role }; + const wf: WorkflowType = { + name: 'skipend', + roles: { + work: async () => { execCount++; return { content: 'ok', meta: null }; }, + }, + moderator: (output) => (output.role === START ? 'work' : END), + }; + + const rule = createWorkflowRule(wf, store); + await triggerWorkflow('skipend', 't1', 'go'); + + await rule.tick(); // executes work + await rule.tick(); // writes __end__ + expect(execCount).toBe(1); + + // Further ticks should not process t1 again + await rule.tick(); + await rule.tick(); + expect(execCount).toBe(1); + + // Only one __end__ event + const endEvents = await store.queryByKind('skipend.__end__'); + expect(endEvents.length).toBe(1); + }); + + it('__end__ event meta contains lastRole', async () => { + setup(); + + type Roles = { step1: Role; step2: Role }; + const wf: WorkflowType = { + name: 'metaend', + roles: { + step1: async () => ({ content: 's1', meta: null }), + step2: async () => ({ content: 's2', meta: null }), + }, + moderator: (output) => { + if (output.role === START) return 'step1'; + if (output.role === 'step1') return 'step2'; + return END; + }, + }; + + const rule = createWorkflowRule(wf, store); + await triggerWorkflow('metaend', 't1', 'go'); + + await rule.tick(); // step1 + await rule.tick(); // step2 + await rule.tick(); // __end__ + + const endEvents = await store.queryByKind('metaend.__end__'); + expect(endEvents.length).toBe(1); + const meta = JSON.parse(endEvents[0].meta!); + expect(meta.lastRole).toBe('step2'); + }); + it('默认保护参数正确', async () => { setup(); diff --git a/packages/pulse/src/workflows/workflow-rule-adapter.ts b/packages/pulse/src/workflows/workflow-rule-adapter.ts index 2771918..ab36aba 100644 --- a/packages/pulse/src/workflows/workflow-rule-adapter.ts +++ b/packages/pulse/src/workflows/workflow-rule-adapter.ts @@ -124,12 +124,29 @@ export function createWorkflowRule( // 3. Call moderator for each topic const actions: WorkflowAction[] = []; for (const [topicId, summary] of topics) { + // Skip already-ended topics + if (summary.lastRole === '__end__') continue; + const input: ModeratorInput = summary.lastRole === START_SUFFIX ? { role: START, meta: summary.meta } : { role: summary.lastRole, meta: summary.meta ?? {} }; const next = wf.moderator(input, topicId); - if (next !== END && next != null) { + if (next === END || next == null) { + // Write __end__ event to mark workflow completion + const endEvent = { + occurredAt: Date.now(), + kind: `${wf.name}.__end__`, + key: topicId, + meta: JSON.stringify({ lastRole: summary.lastRole, reason: 'moderator returned END' }), + }; + const written = await store.appendEvent(endEvent); + // Update checkpoint so subsequent ticks skip this topic + if (checkpoint && written) { + checkpoint.lastEventId = written.id; + checkpoint.topicSummaries.set(topicId, { lastRole: '__end__', meta: { lastRole: summary.lastRole, reason: 'moderator returned END' } }); + } + } else { actions.push({ topicId, role: next as string }); } }