refactor: remainingRounds countdown in event meta, replace maxTicksPerTopic with workflow-declared maxRounds
CI / test (push) Has been cancelled

This commit is contained in:
2026-04-17 11:51:57 +00:00
parent e1a96fdc71
commit a54c36f758
3 changed files with 69 additions and 51 deletions
@@ -282,7 +282,7 @@ describe('createWorkflowRule', () => {
const events = store.queryByKind('nm.closer');
expect(events.length).toBe(1);
expect(events[0].meta).toBeUndefined();
expect(events[0].meta).toBe('{"remainingRounds":19}');
expect(events[0].hash).toBeTruthy();
});
@@ -314,7 +314,7 @@ describe('createWorkflowRule', () => {
};
const rule = createWorkflowRule(loopType, store, logStore, {
maxTicksPerTopic: 3, // 限制 3
defaultMaxRounds: 3, // 限制 3
maxTicksPerWindow: 50,
cooldownMs: 0, // 无 cooldown
});
@@ -336,11 +336,10 @@ describe('createWorkflowRule', () => {
expect(endEvents[0].key).toBe('t1');
// 验证日志
const forceEndLogs = logStore.queryByKind('loop.topic-force-ended');
const forceEndLogs = logStore.queryByKind('loop.rounds-exhausted');
expect(forceEndLogs.length).toBe(1);
const logMeta = JSON.parse(forceEndLogs[0].meta!);
expect(logMeta.reason).toBe('maxTicksPerTopic exceeded');
expect(logMeta.maxTicks).toBe(3);
expect(logMeta.maxRounds).toBe(3);
});
it('maxTicksPerWindow: 超过全局限制时暂停所有 workflow tick', async () => {
@@ -366,7 +365,7 @@ describe('createWorkflowRule', () => {
};
const rule = createWorkflowRule(workType, store, logStore, {
maxTicksPerTopic: 50,
defaultMaxRounds: 50,
maxTicksPerWindow: 2, // 全局限制 2 次
cooldownMs: 0,
});
@@ -416,7 +415,7 @@ describe('createWorkflowRule', () => {
};
const rule = createWorkflowRule(coolType, store, logStore, {
maxTicksPerTopic: 50,
defaultMaxRounds: 50,
maxTicksPerWindow: 50,
cooldownMs: 500, // 0.5 秒 cooldown
});
@@ -464,7 +463,7 @@ describe('createWorkflowRule', () => {
};
const rule = createWorkflowRule(multiType, store, logStore, {
maxTicksPerTopic: 2,
defaultMaxRounds: 2,
maxTicksPerWindow: 10,
cooldownMs: 100,
});
@@ -39,17 +39,20 @@ export interface WorkflowRule {
}
/** Options for workflow rule protection mechanisms */
export interface WorkflowRuleOptions {
maxTicksPerTopic?: number; // 默认 20
maxTicksPerWindow?: number; // 默认 50, window = 1h
cooldownMs?: number; // 默认 60_000
}
const START_SUFFIX = '__start__';
/**
* Create a workflow rule from a WorkflowType and a PulseStore.
*/
export interface WorkflowRuleOptions {
/** Global max ticks across all topics in 1h window (default: 50) */
maxTicksPerWindow?: number;
/** Fallback maxRounds if workflow doesn't declare limits.maxRounds (default: 20) */
defaultMaxRounds?: number;
/** Min ms between ticks on same topic (default: 60_000) */
cooldownMs?: number;
}
export function createWorkflowRule(
wf: WorkflowType<any>,
store: PulseStore,
@@ -57,20 +60,14 @@ export function createWorkflowRule(
options: WorkflowRuleOptions = {},
): WorkflowRule {
const {
maxTicksPerTopic = 20,
maxTicksPerWindow = 50,
defaultMaxRounds = 20,
cooldownMs = 60_000,
} = options;
const maxRounds = wf.limits?.maxRounds ?? defaultMaxRounds;
let prevSnapshotJson = '';
// 保护机制辅助函数
const getTopicTickCount = (topicId: string): number => {
return store
.getAfter(0)
.filter((e) => e.key === topicId && e.kind.startsWith(`${wf.name}.`) && e.kind !== `${wf.name}.${START_SUFFIX}`)
.length;
};
const getWindowTickCount = (windowMs: number): number => {
const cutoff = Date.now() - windowMs;
return store
@@ -91,25 +88,20 @@ export function createWorkflowRule(
return events.length > 0 ? events[0].occurredAt : 0;
};
const forceEndTopic = (topicId: string) => {
store.appendEvent({
occurredAt: Date.now(),
kind: `${wf.name}.__end__`,
key: topicId,
meta: JSON.stringify({ reason: 'maxTicksPerTopic exceeded', maxTicks: maxTicksPerTopic }),
});
if (logStore) {
logStore.appendEvent({
occurredAt: Date.now(),
kind: `${wf.name}.topic-force-ended`,
key: topicId,
meta: JSON.stringify({
topicId,
reason: 'maxTicksPerTopic exceeded',
maxTicks: maxTicksPerTopic,
}),
});
/** Read remainingRounds from the latest non-start event for a topic */
const getRemainingRounds = (topicId: string): number => {
const topicEvents = store
.getAfter(0)
.filter((e) => e.key === topicId && e.kind.startsWith(`${wf.name}.`) && e.kind !== `${wf.name}.${START_SUFFIX}` && e.kind !== `${wf.name}.__end__`);
if (topicEvents.length === 0) return maxRounds; // first round
const last = topicEvents[topicEvents.length - 1];
if (last.meta) {
try {
const meta = JSON.parse(last.meta);
if (typeof meta.remainingRounds === 'number') return meta.remainingRounds;
} catch {}
}
return maxRounds; // fallback
};
return {
@@ -154,7 +146,8 @@ export function createWorkflowRule(
summary.lastRole === START_SUFFIX
? { role: START, meta: summary.meta }
: { role: summary.lastRole, meta: summary.meta ?? {} };
const next = wf.moderator(input, topicId);
const remaining = getRemainingRounds(topicId);
const next = wf.moderator(input, topicId, remaining);
if (next !== END && next != null) {
actions.push({ topicId, role: next as string });
}
@@ -179,22 +172,35 @@ export function createWorkflowRule(
return { executed: [] }; // 返回空 effects,暂停所有 workflow tick
}
// 4.2 过滤和处理 actions
// 4.2 过滤 actions — remainingRounds countdown + cooldown
const filteredActions: WorkflowAction[] = [];
const now = Date.now();
for (const action of actions) {
// 检查 topic tick 数量限制
const topicTickCount = getTopicTickCount(action.topicId);
if (topicTickCount >= maxTicksPerTopic) {
forceEndTopic(action.topicId);
continue; // 跳过这个 action
const remaining = getRemainingRounds(action.topicId);
// remainingRounds already 0 → force END
if (remaining <= 0) {
store.appendEvent({
occurredAt: Date.now(),
kind: `${wf.name}.__end__`,
key: action.topicId,
meta: JSON.stringify({ reason: 'maxRounds exhausted', maxRounds }),
});
if (logStore) {
logStore.appendEvent({
occurredAt: Date.now(),
kind: `${wf.name}.rounds-exhausted`,
key: action.topicId,
meta: JSON.stringify({ topicId: action.topicId, maxRounds }),
});
}
continue;
}
// 检查 cooldown
// Cooldown check
const lastTickTime = getLastTickTime(action.topicId);
if (now - lastTickTime < cooldownMs) {
continue; // 跳过这个 action,cooldown 未结束
continue;
}
filteredActions.push(action);
@@ -252,14 +258,19 @@ export function createWorkflowRule(
try {
const result = await roleFn(chain, action.topicId, store);
// Adapter writes CAS + event
// Compute countdown: remaining - 1 for this round
const currentRemaining = getRemainingRounds(action.topicId);
const nextRemaining = Math.max(0, currentRemaining - 1);
// Adapter writes CAS + event (with remainingRounds in meta)
const hash = store.putObject(result.content);
const eventMeta = { ...(result.meta ?? {}), remainingRounds: nextRemaining };
store.appendEvent({
occurredAt: Date.now(),
kind: `${wf.name}.${action.role}`,
key: action.topicId,
hash,
meta: result.meta != null ? JSON.stringify(result.meta) : undefined,
meta: JSON.stringify(eventMeta),
});
if (logStore) {
@@ -100,9 +100,17 @@ export interface WorkflowType<
* Automaton transition function:
* input = START (new workflow) | RoleOutput (after role execution)
* output = next role name | END (workflow complete)
* remainingRounds = countdown from maxRounds (undefined if no limit)
*/
moderator: (
output: ModeratorInput<Roles>,
topicId: string,
remainingRounds?: number,
) => (keyof Roles & string) | END;
/** Per-workflow safety limits (optional, adapter provides defaults) */
limits?: {
/** Max rounds (moderator calls) per topic before forced END (default: 20) */
maxRounds?: number;
};
}