feat(workflow): add in-memory checkpoint for incremental tick
CI / test (push) Has been cancelled

- WorkflowCheckpoint caches lastEventId, topic summaries, and per-topic events
- First tick: full read (getAfter(0)), subsequent ticks: incremental (getAfter(lastEventId))
- Newly written role events also update checkpoint in-place
- No interface changes, no new files
This commit is contained in:
2026-04-18 01:27:33 +00:00
parent 90f7ba5a62
commit 37e3331ccb
3 changed files with 67 additions and 18 deletions
+2 -4
View File
@@ -75,9 +75,7 @@ const logStore = createStore({
// 1. Coding workflow (with mock roles for now)
const codingWf = createCodingWorkflow();
const codingRule = createWorkflowRule(codingWf, store, logStore, {
cooldownMs: 0,
});
const codingRule = createWorkflowRule(codingWf, store, logStore);
// 2. Meta workflow (real LLM + Cursor roles)
const metaWf = createMetaWorkflow({
@@ -91,7 +89,7 @@ const metaWf = createMetaWorkflow({
branch: 'main',
}),
});
const metaRule = createWorkflowRule(metaWf, store, logStore, { cooldownMs: 0 });
const metaRule = createWorkflowRule(metaWf, store, logStore);
// ── Ticker ─────────────────────────────────────────────────────
+1 -1
View File
@@ -63,7 +63,7 @@ async function main() {
}),
});
const rule = createWorkflowRule(wf, store, undefined, { cooldownMs: 0 });
const rule = createWorkflowRule(wf, store, undefined);
const taskDescription = `# 重构 coding workflow 为 TDD 驱动流程
@@ -40,6 +40,18 @@ export interface WorkflowRule {
const START_SUFFIX = '__start__';
/** In-memory checkpoint for incremental tick */
interface WorkflowCheckpoint {
lastEventId: number;
/** Per-topic summary: lastRole + meta */
topicSummaries: Map<
string,
{ lastRole: string; meta: Record<string, unknown> | null }
>;
/** Per-topic chain of relevant events (for building WorkflowMessage[]) */
topicEvents: Map<string, import('../store.js').EventRecord[]>;
}
/**
* Create a workflow rule from a WorkflowType and a PulseStore.
*/
@@ -49,18 +61,32 @@ export function createWorkflowRule(
logStore?: PulseStore,
): WorkflowRule {
let prevSnapshotJson = '';
let checkpoint: WorkflowCheckpoint | null = null;
return {
async tick(): Promise<WorkflowTickResult> {
const events = store.getAfter(0);
const prefix = `${wf.name}.`;
// 1. Build per-topic state: lastRole + meta
const topics = new Map<
string,
{ lastRole: string; meta: Record<string, unknown> | null }
>();
for (const e of events) {
// Incremental read: first tick reads all, subsequent ticks read only new events
const afterId = checkpoint ? checkpoint.lastEventId : 0;
const newEvents = store.getAfter(afterId);
// Initialize or reuse checkpoint
if (!checkpoint) {
checkpoint = {
lastEventId: 0,
topicSummaries: new Map(),
topicEvents: new Map(),
};
}
// Update lastEventId from new events
if (newEvents.length > 0) {
checkpoint.lastEventId = newEvents[newEvents.length - 1].id;
}
// Merge new events into checkpoint
for (const e of newEvents) {
if (!e.kind.startsWith(prefix)) continue;
const topicId = e.key;
if (!topicId) continue;
@@ -71,9 +97,19 @@ export function createWorkflowRule(
meta = JSON.parse(e.meta);
} catch {}
}
topics.set(topicId, { lastRole: role, meta });
checkpoint.topicSummaries.set(topicId, { lastRole: role, meta });
// Append to per-topic event cache
let topicEvts = checkpoint.topicEvents.get(topicId);
if (!topicEvts) {
topicEvts = [];
checkpoint.topicEvents.set(topicId, topicEvts);
}
topicEvts.push(e);
}
const topics = checkpoint.topicSummaries;
// 2. Moore diff
const summaryObj: Record<
string,
@@ -107,10 +143,9 @@ export function createWorkflowRule(
const roleFn = wf.roles[action.role];
if (!roleFn) continue;
// Build message chain
const chain: WorkflowMessage[] = events
.filter((e) => e.key === action.topicId && e.kind.startsWith(prefix))
.map((e) => ({
// Build message chain from cached topic events
const cachedEvents = checkpoint.topicEvents.get(action.topicId) ?? [];
const chain: WorkflowMessage[] = cachedEvents.map((e) => ({
role: e.kind.slice(prefix.length),
content: e.hash
? (() => {
@@ -153,7 +188,7 @@ export function createWorkflowRule(
// Adapter writes CAS + event
const hash = store.putObject(result.content);
store.appendEvent({
const written = store.appendEvent({
occurredAt: Date.now(),
kind: `${wf.name}.${action.role}`,
key: action.topicId,
@@ -161,6 +196,22 @@ export function createWorkflowRule(
meta: result.meta != null ? JSON.stringify(result.meta) : undefined,
});
// Update checkpoint with the newly written event
if (checkpoint && written) {
checkpoint.lastEventId = written.id;
const parsedMeta = result.meta != null ? (result.meta as Record<string, unknown>) : null;
checkpoint.topicSummaries.set(action.topicId, {
lastRole: action.role,
meta: parsedMeta,
});
let topicEvts = checkpoint.topicEvents.get(action.topicId);
if (!topicEvts) {
topicEvts = [];
checkpoint.topicEvents.set(action.topicId, topicEvts);
}
topicEvts.push(written);
}
if (logStore) {
logStore.appendEvent({
occurredAt: Date.now(),