feat: workflow abort via GuardProjection lifecycle (pulse#10)
CI / test (push) Has been cancelled

This commit is contained in:
2026-04-18 23:04:13 +00:00
parent c6e715fb3b
commit 7b7a3300a7
3 changed files with 103 additions and 0 deletions
+11
View File
@@ -108,6 +108,9 @@ export interface PulseStore {
/** Read data from CAS store by hash. Returns null if not found. */
getObject(hash: string): Promise<unknown | null>;
/** Get the underlying bun:sqlite Database handle (for guard projections etc.) */
getDatabase(): Database;
/** Close the database */
close(): Promise<void>;
@@ -494,6 +497,10 @@ export function createStore(options: CreateStoreOptions): PulseStore {
eventsDb.close();
},
getDatabase(): Database {
return eventsDb;
},
async archiveEvents(olderThan: number): Promise<number> {
const result = eventsDb
.prepare('DELETE FROM events WHERE occurred_at < ?')
@@ -752,6 +759,10 @@ function createScopeStore(db: Database, objectsDir: string): PulseStore {
db.close();
},
getDatabase(): Database {
return db;
},
async archiveEvents(olderThan: number): Promise<number> {
const result = db
.prepare('DELETE FROM events WHERE occurred_at < ?')
@@ -565,4 +565,77 @@ describe('createWorkflowRule', () => {
expect(r1.executed).toMatchObject([{ topicId: 't1', role: 'test' }]);
// 如果默认参数有问题,这个测试会失败
});
it('aborted workflow is skipped on subsequent ticks', async () => {
setup();
type EchoRoles = { echo: import('./workflow-type.js').Role<{ echoed: true }> };
const echoType: WorkflowType<EchoRoles> = {
name: 'echo',
roles: {
echo: async (chain) => ({
content: `Echo: ${chain.find((m) => m.role === '__start__')?.content ?? ''}`,
meta: { echoed: true as const },
}),
},
moderator: (output) => (output.role === START ? 'echo' : END),
};
const rule = createWorkflowRule(echoType, store, logStore);
// Start workflow
await triggerWorkflow('echo', 't1', 'hello');
const r1 = await rule.tick();
expect(r1.executed.length).toBe(1);
// Abort it
await store.appendEvent({
occurredAt: Date.now(),
kind: 'echo.__abort__',
key: 't1',
});
// Tick again — should NOT execute anything for t1
const r2 = await rule.tick();
expect(r2.executed.length).toBe(0);
});
it('abort prevents restart of same topic key', async () => {
setup();
let callCount = 0;
type EchoRoles = { echo: import('./workflow-type.js').Role<null> };
const echoType: WorkflowType<EchoRoles> = {
name: 'echo',
roles: {
echo: async () => { callCount++; return { content: 'ok', meta: null }; },
},
moderator: (output) => (output.role === START ? 'echo' : END),
};
const rule = createWorkflowRule(echoType, store, logStore);
// Start + run
await triggerWorkflow('echo', 't1', 'hello');
await rule.tick();
expect(callCount).toBe(1);
// Abort
await store.appendEvent({ occurredAt: Date.now(), kind: 'echo.__abort__', key: 't1' });
await rule.tick();
// Try to restart same key — guard should reject __start__ (status != 'unknown')
// appendEvent will throw GuardViolationError
let threw = false;
try {
await triggerWorkflow('echo', 't1', 'hello again');
} catch (err: any) {
if (err.message?.includes('Guard') || err.constructor?.name === 'GuardViolationError') {
threw = true;
} else {
throw err;
}
}
expect(threw).toBe(true);
});
});
@@ -12,6 +12,7 @@
*/
import type { PulseStore } from '../store.js';
import { registerGuard, getGuardState } from '../guard-projection.js';
import {
END,
type ModeratorInput,
@@ -63,6 +64,18 @@ export function createWorkflowRule(
let prevSnapshotJson = '';
let checkpoint: WorkflowCheckpoint | null = null;
// Register lifecycle guard for abort/end tracking
const db = store.getDatabase();
registerGuard(db, {
name: 'workflow-lifecycle',
initial_value: { status: 'unknown' },
sources: [
{ kind: '*.__start__', check: "state.status = 'unknown'", transition: "{'status':'active'}" },
{ kind: '*.__abort__', check: "state.status = 'active'", transition: "{'status':'aborted'}" },
{ kind: '*.__end__', check: "state.status = 'active'", transition: "{'status':'ended'}" },
],
});
return {
async tick(): Promise<WorkflowTickResult> {
const prefix = `${wf.name}.`;
@@ -127,6 +140,12 @@ export function createWorkflowRule(
// Skip already-ended topics
if (summary.lastRole === '__end__') continue;
// Skip aborted/ended topics via guard projection
const lifecycleState = getGuardState(db, 'workflow-lifecycle', topicId);
if (lifecycleState?.status === 'aborted' || lifecycleState?.status === 'ended') {
continue;
}
const input: ModeratorInput<any> =
summary.lastRole === START_SUFFIX
? { role: START, meta: summary.meta }