From 7b7a3300a71b1007f6613616d32264a71df1074b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Sat, 18 Apr 2026 23:04:13 +0000 Subject: [PATCH] feat: workflow abort via GuardProjection lifecycle (pulse#10) --- packages/pulse/src/store.ts | 11 +++ .../workflows/workflow-rule-adapter.test.ts | 73 +++++++++++++++++++ .../src/workflows/workflow-rule-adapter.ts | 19 +++++ 3 files changed, 103 insertions(+) diff --git a/packages/pulse/src/store.ts b/packages/pulse/src/store.ts index 43e7afe..d681b6c 100644 --- a/packages/pulse/src/store.ts +++ b/packages/pulse/src/store.ts @@ -108,6 +108,9 @@ export interface PulseStore { /** Read data from CAS store by hash. Returns null if not found. */ getObject(hash: string): Promise; + /** Get the underlying bun:sqlite Database handle (for guard projections etc.) */ + getDatabase(): Database; + /** Close the database */ close(): Promise; @@ -494,6 +497,10 @@ export function createStore(options: CreateStoreOptions): PulseStore { eventsDb.close(); }, + getDatabase(): Database { + return eventsDb; + }, + async archiveEvents(olderThan: number): Promise { const result = eventsDb .prepare('DELETE FROM events WHERE occurred_at < ?') @@ -752,6 +759,10 @@ function createScopeStore(db: Database, objectsDir: string): PulseStore { db.close(); }, + getDatabase(): Database { + return db; + }, + async archiveEvents(olderThan: number): Promise { const result = db .prepare('DELETE FROM events WHERE occurred_at < ?') diff --git a/packages/pulse/src/workflows/workflow-rule-adapter.test.ts b/packages/pulse/src/workflows/workflow-rule-adapter.test.ts index aad0712..8b84b9a 100644 --- a/packages/pulse/src/workflows/workflow-rule-adapter.test.ts +++ b/packages/pulse/src/workflows/workflow-rule-adapter.test.ts @@ -565,4 +565,77 @@ describe('createWorkflowRule', () => { expect(r1.executed).toMatchObject([{ topicId: 't1', role: 'test' }]); // 如果默认参数有问题,这个测试会失败 }); + + it('aborted workflow is skipped on subsequent ticks', async () => { + setup(); + + type EchoRoles = { echo: import('./workflow-type.js').Role<{ echoed: true }> }; + const echoType: WorkflowType = { + name: 'echo', + roles: { + echo: async (chain) => ({ + content: `Echo: ${chain.find((m) => m.role === '__start__')?.content ?? ''}`, + meta: { echoed: true as const }, + }), + }, + moderator: (output) => (output.role === START ? 'echo' : END), + }; + + const rule = createWorkflowRule(echoType, store, logStore); + + // Start workflow + await triggerWorkflow('echo', 't1', 'hello'); + const r1 = await rule.tick(); + expect(r1.executed.length).toBe(1); + + // Abort it + await store.appendEvent({ + occurredAt: Date.now(), + kind: 'echo.__abort__', + key: 't1', + }); + + // Tick again — should NOT execute anything for t1 + const r2 = await rule.tick(); + expect(r2.executed.length).toBe(0); + }); + + it('abort prevents restart of same topic key', async () => { + setup(); + + let callCount = 0; + type EchoRoles = { echo: import('./workflow-type.js').Role }; + const echoType: WorkflowType = { + name: 'echo', + roles: { + echo: async () => { callCount++; return { content: 'ok', meta: null }; }, + }, + moderator: (output) => (output.role === START ? 'echo' : END), + }; + + const rule = createWorkflowRule(echoType, store, logStore); + + // Start + run + await triggerWorkflow('echo', 't1', 'hello'); + await rule.tick(); + expect(callCount).toBe(1); + + // Abort + await store.appendEvent({ occurredAt: Date.now(), kind: 'echo.__abort__', key: 't1' }); + await rule.tick(); + + // Try to restart same key — guard should reject __start__ (status != 'unknown') + // appendEvent will throw GuardViolationError + let threw = false; + try { + await triggerWorkflow('echo', 't1', 'hello again'); + } catch (err: any) { + if (err.message?.includes('Guard') || err.constructor?.name === 'GuardViolationError') { + threw = true; + } else { + throw err; + } + } + expect(threw).toBe(true); + }); }); diff --git a/packages/pulse/src/workflows/workflow-rule-adapter.ts b/packages/pulse/src/workflows/workflow-rule-adapter.ts index ab36aba..d24a7fe 100644 --- a/packages/pulse/src/workflows/workflow-rule-adapter.ts +++ b/packages/pulse/src/workflows/workflow-rule-adapter.ts @@ -12,6 +12,7 @@ */ import type { PulseStore } from '../store.js'; +import { registerGuard, getGuardState } from '../guard-projection.js'; import { END, type ModeratorInput, @@ -63,6 +64,18 @@ export function createWorkflowRule( let prevSnapshotJson = ''; let checkpoint: WorkflowCheckpoint | null = null; + // Register lifecycle guard for abort/end tracking + const db = store.getDatabase(); + registerGuard(db, { + name: 'workflow-lifecycle', + initial_value: { status: 'unknown' }, + sources: [ + { kind: '*.__start__', check: "state.status = 'unknown'", transition: "{'status':'active'}" }, + { kind: '*.__abort__', check: "state.status = 'active'", transition: "{'status':'aborted'}" }, + { kind: '*.__end__', check: "state.status = 'active'", transition: "{'status':'ended'}" }, + ], + }); + return { async tick(): Promise { const prefix = `${wf.name}.`; @@ -127,6 +140,12 @@ export function createWorkflowRule( // Skip already-ended topics if (summary.lastRole === '__end__') continue; + // Skip aborted/ended topics via guard projection + const lifecycleState = getGuardState(db, 'workflow-lifecycle', topicId); + if (lifecycleState?.status === 'aborted' || lifecycleState?.status === 'ended') { + continue; + } + const input: ModeratorInput = summary.lastRole === START_SUFFIX ? { role: START, meta: summary.meta }