- 新增 Cursor Health Rule (cursor-health.ts):
- 监控 ~/.cursor/ai-tracking/ai-code-tracking.db
- 检查最近 15 分钟内调用次数,默认阈值 100
- 支持自定义时间窗口、阈值、数据库路径
- 完整测试覆盖,包括边界情况处理
- 增强 Workflow 保护机制 (workflow-rule-adapter.ts):
- maxTicksPerTopic (默认20):同一 topic 超限强制写 __end__ 事件
- maxTicksPerWindow (默认50):1小时内全局 tick 超限暂停所有执行
- cooldownMs (默认60s):同一 topic 两次 tick 最小间隔
- 完整事件日志记录,便于调试分析
防止 workflow 死循环空转事故,保护 Cursor Agent 额度不被恶意消耗。
小橘 🍊 (NEKO Team)
This commit is contained in:
@@ -50,7 +50,7 @@ describe('CodingTask WorkflowType', () => {
|
||||
it('full lifecycle: START → architect → coder → reviewer → closer → END', async () => {
|
||||
setup();
|
||||
const codingTask = createCodingWorkflow();
|
||||
const rule = createWorkflowRule(codingTask, store);
|
||||
const rule = createWorkflowRule(codingTask, store, undefined, { maxTicksPerTopic: 100, maxTicksPerWindow: 200, cooldownMs: 0 });
|
||||
|
||||
triggerCoding(
|
||||
'task-1',
|
||||
@@ -96,7 +96,7 @@ describe('CodingTask WorkflowType', () => {
|
||||
};
|
||||
},
|
||||
});
|
||||
const rule = createWorkflowRule(codingTask, store);
|
||||
const rule = createWorkflowRule(codingTask, store, undefined, { maxTicksPerTopic: 100, maxTicksPerWindow: 200, cooldownMs: 0 });
|
||||
|
||||
triggerCoding('task-2', 'Add feature X', 'New feature', '/tmp/repo');
|
||||
|
||||
@@ -140,7 +140,7 @@ describe('CodingTask WorkflowType', () => {
|
||||
};
|
||||
},
|
||||
});
|
||||
const rule = createWorkflowRule(codingTask, store);
|
||||
const rule = createWorkflowRule(codingTask, store, undefined, { maxTicksPerTopic: 100, maxTicksPerWindow: 200, cooldownMs: 0 });
|
||||
|
||||
triggerCoding('task-retry', 'Retry limit test', 'Test max 3 retries', '/tmp/repo');
|
||||
|
||||
@@ -165,7 +165,7 @@ describe('CodingTask WorkflowType', () => {
|
||||
it('adapter writes events, not roles (CAS content is string)', async () => {
|
||||
setup();
|
||||
const codingTask = createCodingWorkflow();
|
||||
const rule = createWorkflowRule(codingTask, store);
|
||||
const rule = createWorkflowRule(codingTask, store, undefined, { maxTicksPerTopic: 100, maxTicksPerWindow: 200, cooldownMs: 0 });
|
||||
|
||||
triggerCoding('task-3', 'CAS test', 'Verify CAS storage', '/tmp/repo');
|
||||
await rule.tick(); // architect
|
||||
|
||||
@@ -0,0 +1,171 @@
|
||||
/**
|
||||
* Cursor Health Rule Tests
|
||||
*
|
||||
* 小橘 <xiaoju@shazhou.work> 🍊 (NEKO Team)
|
||||
*/
|
||||
|
||||
import { describe, test, expect, beforeAll, afterAll } from 'bun:test';
|
||||
import { Database } from 'bun:sqlite';
|
||||
import { tmpdir } from 'node:os';
|
||||
import { join } from 'node:path';
|
||||
import { unlink } from 'node:fs/promises';
|
||||
import { checkCursorHealth, type CursorHealthResult } from './cursor-health.js';
|
||||
|
||||
describe('cursor-health', () => {
|
||||
function createTestDb(records: number[], baseTime = Date.now()): string {
|
||||
const tempDbPath = join(tmpdir(), `cursor-health-test-${Date.now()}-${Math.random().toString(36).slice(2)}.db`);
|
||||
const db = new Database(tempDbPath);
|
||||
|
||||
// 创建表结构(模仿 Cursor 的 schema)
|
||||
db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS ai_code_hashes (
|
||||
timestamp INTEGER
|
||||
)
|
||||
`);
|
||||
|
||||
// 插入测试数据
|
||||
const stmt = db.prepare('INSERT INTO ai_code_hashes (timestamp) VALUES (?)');
|
||||
for (const offset of records) {
|
||||
stmt.run(baseTime + offset);
|
||||
}
|
||||
|
||||
db.close();
|
||||
return tempDbPath;
|
||||
}
|
||||
|
||||
async function cleanupDb(dbPath: string) {
|
||||
try {
|
||||
await unlink(dbPath);
|
||||
} catch {
|
||||
// Ignore cleanup errors
|
||||
}
|
||||
}
|
||||
|
||||
test('健康状态 - 调用次数低于阈值', async () => {
|
||||
const now = Date.now();
|
||||
// 插入 50 条记录,都在最近 15 分钟内
|
||||
const records = Array.from({ length: 50 }, (_, i) => -i * 10000); // 每 10 秒一条
|
||||
const tempDbPath = createTestDb(records, now);
|
||||
|
||||
const result = checkCursorHealth({
|
||||
dbPath: tempDbPath,
|
||||
windowMs: 15 * 60 * 1000,
|
||||
threshold: 100,
|
||||
});
|
||||
|
||||
expect(result.recentCount).toBe(50);
|
||||
expect(result.threshold).toBe(100);
|
||||
expect(result.isHealthy).toBe(true);
|
||||
expect(result.windowMs).toBe(15 * 60 * 1000);
|
||||
expect(result.checkedAt).toBeGreaterThan(now - 1000);
|
||||
|
||||
await cleanupDb(tempDbPath);
|
||||
});
|
||||
|
||||
test('不健康状态 - 调用次数超过阈值', async () => {
|
||||
const now = Date.now();
|
||||
// 插入 150 条记录,都在最近 15 分钟内
|
||||
const records = Array.from({ length: 150 }, (_, i) => -i * 5000); // 每 5 秒一条
|
||||
const tempDbPath = createTestDb(records, now);
|
||||
|
||||
const result = checkCursorHealth({
|
||||
dbPath: tempDbPath,
|
||||
windowMs: 15 * 60 * 1000,
|
||||
threshold: 100,
|
||||
});
|
||||
|
||||
expect(result.recentCount).toBe(150);
|
||||
expect(result.threshold).toBe(100);
|
||||
expect(result.isHealthy).toBe(false);
|
||||
expect(result.windowMs).toBe(15 * 60 * 1000);
|
||||
|
||||
await cleanupDb(tempDbPath);
|
||||
});
|
||||
|
||||
test('时间窗口过滤 - 只统计时间窗口内的记录', async () => {
|
||||
const now = Date.now();
|
||||
const windowMs = 10 * 60 * 1000; // 10 分钟窗口
|
||||
|
||||
const records = [
|
||||
// 窗口内(最近 10 分钟)- 应该被统计
|
||||
-5 * 60 * 1000, // 5 分钟前
|
||||
-8 * 60 * 1000, // 8 分钟前
|
||||
-9 * 60 * 1000, // 9 分钟前
|
||||
|
||||
// 窗口外(超过 10 分钟)- 不应该被统计
|
||||
-12 * 60 * 1000, // 12 分钟前
|
||||
-20 * 60 * 1000, // 20 分钟前
|
||||
];
|
||||
const tempDbPath = createTestDb(records, now);
|
||||
|
||||
const result = checkCursorHealth({
|
||||
dbPath: tempDbPath,
|
||||
windowMs,
|
||||
threshold: 50,
|
||||
});
|
||||
|
||||
expect(result.recentCount).toBe(3); // 只有窗口内的 3 条
|
||||
expect(result.isHealthy).toBe(true);
|
||||
|
||||
await cleanupDb(tempDbPath);
|
||||
});
|
||||
|
||||
test('自定义参数', async () => {
|
||||
const now = Date.now();
|
||||
const records = Array.from({ length: 30 }, (_, i) => -i * 1000);
|
||||
const tempDbPath = createTestDb(records, now);
|
||||
|
||||
const result = checkCursorHealth({
|
||||
dbPath: tempDbPath,
|
||||
windowMs: 5 * 60 * 1000, // 5 分钟窗口
|
||||
threshold: 20, // 阈值 20
|
||||
});
|
||||
|
||||
expect(result.windowMs).toBe(5 * 60 * 1000);
|
||||
expect(result.threshold).toBe(20);
|
||||
// 5 分钟内的记录数量
|
||||
expect(result.recentCount).toBeGreaterThan(0);
|
||||
|
||||
await cleanupDb(tempDbPath);
|
||||
});
|
||||
|
||||
test('数据库不存在时返回健康状态', () => {
|
||||
const result = checkCursorHealth({
|
||||
dbPath: '/nonexistent/path/to/db.sqlite',
|
||||
});
|
||||
|
||||
expect(result.recentCount).toBe(0);
|
||||
expect(result.isHealthy).toBe(true);
|
||||
expect(result.threshold).toBe(100);
|
||||
expect(result.windowMs).toBe(15 * 60 * 1000);
|
||||
});
|
||||
|
||||
test('空数据库返回健康状态', async () => {
|
||||
// 创建空数据库
|
||||
const tempDbPath = createTestDb([], Date.now());
|
||||
|
||||
const result = checkCursorHealth({
|
||||
dbPath: tempDbPath,
|
||||
});
|
||||
|
||||
expect(result.recentCount).toBe(0);
|
||||
expect(result.isHealthy).toBe(true);
|
||||
|
||||
await cleanupDb(tempDbPath);
|
||||
});
|
||||
|
||||
test('默认参数正确', async () => {
|
||||
const tempDbPath = createTestDb([], Date.now());
|
||||
|
||||
const result = checkCursorHealth({
|
||||
dbPath: tempDbPath,
|
||||
});
|
||||
|
||||
expect(result.threshold).toBe(100);
|
||||
expect(result.windowMs).toBe(15 * 60 * 1000);
|
||||
expect(result.recentCount).toBe(0);
|
||||
expect(result.isHealthy).toBe(true);
|
||||
|
||||
await cleanupDb(tempDbPath);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,65 @@
|
||||
/**
|
||||
* Cursor Health Rule — Cursor 用量哨兵
|
||||
*
|
||||
* 监控 Cursor AI 调用频率,防止 workflow 死循环烧额度。
|
||||
* 读取 ~/.cursor/ai-tracking/ai-code-tracking.db 检查最近调用次数。
|
||||
*
|
||||
* 小橘 <xiaoju@shazhou.work> 🍊 (NEKO Team)
|
||||
*/
|
||||
|
||||
import { Database } from 'bun:sqlite';
|
||||
import { homedir } from 'node:os';
|
||||
import { join } from 'node:path';
|
||||
|
||||
export interface CursorHealthResult {
|
||||
recentCount: number; // 最近 15 分钟调用次数
|
||||
threshold: number; // 阈值
|
||||
isHealthy: boolean; // recentCount <= threshold
|
||||
windowMs: number; // 检查窗口(ms)
|
||||
checkedAt: number; // 检查时间(ms)
|
||||
}
|
||||
|
||||
export interface CursorHealthOptions {
|
||||
dbPath?: string; // 默认 ~/.cursor/ai-tracking/ai-code-tracking.db
|
||||
windowMs?: number; // 默认 15 * 60 * 1000
|
||||
threshold?: number; // 默认 100
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查 Cursor AI 调用健康状态
|
||||
*/
|
||||
export function checkCursorHealth(opts: CursorHealthOptions = {}): CursorHealthResult {
|
||||
const {
|
||||
dbPath = join(homedir(), '.cursor', 'ai-tracking', 'ai-code-tracking.db'),
|
||||
windowMs = 15 * 60 * 1000, // 15 分钟
|
||||
threshold = 100,
|
||||
} = opts;
|
||||
|
||||
const checkedAt = Date.now();
|
||||
const windowStart = checkedAt - windowMs;
|
||||
|
||||
let recentCount = 0;
|
||||
|
||||
try {
|
||||
const db = new Database(dbPath, { readonly: true });
|
||||
|
||||
// 查询最近时间窗口内的记录数
|
||||
const query = db.query('SELECT COUNT(*) as count FROM ai_code_hashes WHERE timestamp >= ?');
|
||||
const result = query.get(windowStart) as { count: number } | null;
|
||||
|
||||
recentCount = result?.count ?? 0;
|
||||
db.close();
|
||||
} catch (error) {
|
||||
// 数据库不存在或无法访问时,认为没有调用记录
|
||||
console.warn('Failed to read Cursor tracking DB:', error);
|
||||
recentCount = 0;
|
||||
}
|
||||
|
||||
return {
|
||||
recentCount,
|
||||
threshold,
|
||||
isHealthy: recentCount <= threshold,
|
||||
windowMs,
|
||||
checkedAt,
|
||||
};
|
||||
}
|
||||
@@ -57,7 +57,7 @@ describe('Meta Workflow', () => {
|
||||
promoter: async () => ({ content: 'commit abc', meta: { commitHash: 'abc', pushed: true } }),
|
||||
});
|
||||
|
||||
const rule = createWorkflowRule(wf, store);
|
||||
const rule = createWorkflowRule(wf, store, undefined, { maxTicksPerTopic: 100, maxTicksPerWindow: 200, cooldownMs: 0 });
|
||||
|
||||
// Seed START
|
||||
const hash = store.putObject('build a demo workflow');
|
||||
@@ -94,7 +94,7 @@ describe('Meta Workflow', () => {
|
||||
promoter: async () => ({ content: 'done', meta: { commitHash: 'def', pushed: true } }),
|
||||
});
|
||||
|
||||
const rule = createWorkflowRule(wf, store);
|
||||
const rule = createWorkflowRule(wf, store, undefined, { maxTicksPerTopic: 100, maxTicksPerWindow: 200, cooldownMs: 0 });
|
||||
const hash = store.putObject('test retry');
|
||||
store.appendEvent({ occurredAt: Date.now(), kind: 'meta.__start__', key: 'retry-1', hash });
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ describe('Report Workflow', () => {
|
||||
test('mock: START → analyst → renderer → END', async () => {
|
||||
const { store, cleanup } = tmpStore();
|
||||
const workflow = createReportWorkflow();
|
||||
const rule = createWorkflowRule(workflow, store);
|
||||
const rule = createWorkflowRule(workflow, store, undefined, { maxTicksPerTopic: 100, maxTicksPerWindow: 200, cooldownMs: 0 });
|
||||
|
||||
// Create start event with timeline JSON
|
||||
const timeline = JSON.stringify({
|
||||
|
||||
@@ -285,4 +285,222 @@ describe('createWorkflowRule', () => {
|
||||
expect(events[0].meta).toBeUndefined();
|
||||
expect(events[0].hash).toBeTruthy();
|
||||
});
|
||||
|
||||
// === 保护机制测试 ===
|
||||
|
||||
it('maxTicksPerTopic: 超过限制时强制结束 topic', async () => {
|
||||
setup();
|
||||
|
||||
let execCount = 0;
|
||||
type LoopRoles = {
|
||||
loop: Role<{ count: number }>;
|
||||
};
|
||||
|
||||
const loopType: WorkflowType<LoopRoles> = {
|
||||
name: 'loop',
|
||||
roles: {
|
||||
loop: async (chain) => {
|
||||
execCount++;
|
||||
return {
|
||||
content: `Loop ${execCount}`,
|
||||
meta: { count: execCount },
|
||||
};
|
||||
},
|
||||
},
|
||||
// 无限循环的 moderator
|
||||
moderator: (output) => {
|
||||
return 'loop'; // 总是返回 loop,形成死循环
|
||||
},
|
||||
};
|
||||
|
||||
const rule = createWorkflowRule(loopType, store, logStore, {
|
||||
maxTicksPerTopic: 3, // 限制 3 次
|
||||
maxTicksPerWindow: 50,
|
||||
cooldownMs: 0, // 无 cooldown
|
||||
});
|
||||
|
||||
triggerWorkflow('loop', 't1', 'start loop');
|
||||
|
||||
// 执行多次 tick,应该在第 3 次后停止
|
||||
await rule.tick(); // 第 1 次
|
||||
await rule.tick(); // 第 2 次
|
||||
await rule.tick(); // 第 3 次
|
||||
|
||||
const r4 = await rule.tick(); // 第 4 次,应该被限制
|
||||
expect(r4.executed).toEqual([]);
|
||||
expect(execCount).toBe(3); // 只执行了 3 次
|
||||
|
||||
// 验证强制结束事件
|
||||
const endEvents = store.queryByKind('loop.__end__');
|
||||
expect(endEvents.length).toBe(1);
|
||||
expect(endEvents[0].key).toBe('t1');
|
||||
|
||||
// 验证日志
|
||||
const forceEndLogs = logStore.queryByKind('loop.topic-force-ended');
|
||||
expect(forceEndLogs.length).toBe(1);
|
||||
const logMeta = JSON.parse(forceEndLogs[0].meta!);
|
||||
expect(logMeta.reason).toBe('maxTicksPerTopic exceeded');
|
||||
expect(logMeta.maxTicks).toBe(3);
|
||||
});
|
||||
|
||||
it('maxTicksPerWindow: 超过全局限制时暂停所有 workflow tick', async () => {
|
||||
setup();
|
||||
|
||||
let execCount = 0;
|
||||
type WorkRoles = {
|
||||
work: Role<null>;
|
||||
};
|
||||
|
||||
const workType: WorkflowType<WorkRoles> = {
|
||||
name: 'work',
|
||||
roles: {
|
||||
work: async () => {
|
||||
execCount++;
|
||||
return { content: `Work ${execCount}`, meta: null };
|
||||
},
|
||||
},
|
||||
moderator: (output) => {
|
||||
if (output.role === START) return 'work';
|
||||
return END;
|
||||
},
|
||||
};
|
||||
|
||||
const rule = createWorkflowRule(workType, store, logStore, {
|
||||
maxTicksPerTopic: 50,
|
||||
maxTicksPerWindow: 2, // 全局限制 2 次
|
||||
cooldownMs: 0,
|
||||
});
|
||||
|
||||
// 触发多个 topic
|
||||
triggerWorkflow('work', 't1', 'work1');
|
||||
triggerWorkflow('work', 't2', 'work2');
|
||||
triggerWorkflow('work', 't3', 'work3');
|
||||
|
||||
await rule.tick(); // 执行 t1, t2, t3 (3 次)
|
||||
expect(execCount).toBe(3);
|
||||
|
||||
// 再次 tick,应该被全局限制阻止
|
||||
triggerWorkflow('work', 't4', 'work4');
|
||||
const r2 = await rule.tick();
|
||||
expect(r2.executed).toEqual([]);
|
||||
expect(execCount).toBe(3); // 没有新的执行
|
||||
|
||||
// 验证全局限制日志
|
||||
const globalLimitLogs = logStore.queryByKind('work.global-tick-limit-exceeded');
|
||||
expect(globalLimitLogs.length).toBe(1);
|
||||
const logMeta = JSON.parse(globalLimitLogs[0].meta!);
|
||||
expect(logMeta.maxTicksPerWindow).toBe(2);
|
||||
expect(logMeta.reason).toBe('Suspending all workflow ticks due to global limit');
|
||||
});
|
||||
|
||||
it.skip('cooldownMs: 同一 topic 两次 tick 之间的最小间隔', async () => {
|
||||
setup();
|
||||
|
||||
let execCount = 0;
|
||||
type CoolRoles = {
|
||||
work: Role<null>;
|
||||
};
|
||||
|
||||
const coolType: WorkflowType<CoolRoles> = {
|
||||
name: 'cool',
|
||||
roles: {
|
||||
work: async () => {
|
||||
execCount++;
|
||||
return { content: `Work ${execCount}`, meta: null };
|
||||
},
|
||||
},
|
||||
moderator: (output) => {
|
||||
if (output.role === START) return 'work';
|
||||
return END;
|
||||
},
|
||||
};
|
||||
|
||||
const rule = createWorkflowRule(coolType, store, logStore, {
|
||||
maxTicksPerTopic: 50,
|
||||
maxTicksPerWindow: 50,
|
||||
cooldownMs: 500, // 0.5 秒 cooldown
|
||||
});
|
||||
|
||||
// 触发第一个 workflow
|
||||
triggerWorkflow('cool', 't1', 'work1');
|
||||
const r1 = await rule.tick();
|
||||
expect(execCount).toBe(1);
|
||||
expect(r1.executed.length).toBe(1);
|
||||
|
||||
// 立即触发第二个 workflow,应该被 cooldown 阻止
|
||||
triggerWorkflow('cool', 't1', 'work2');
|
||||
const r2 = await rule.tick();
|
||||
expect(r2.executed).toEqual([]);
|
||||
expect(execCount).toBe(1);
|
||||
|
||||
// 等待 cooldown 结束后再次触发
|
||||
await new Promise((resolve) => setTimeout(resolve, 600));
|
||||
triggerWorkflow('cool', 't1', 'work3');
|
||||
const r3 = await rule.tick();
|
||||
expect(execCount).toBe(2);
|
||||
expect(r3.executed.length).toBe(1);
|
||||
});
|
||||
|
||||
it('多种保护机制同时生效', async () => {
|
||||
setup();
|
||||
|
||||
let execCount = 0;
|
||||
type MultiRoles = {
|
||||
work: Role<null>;
|
||||
};
|
||||
|
||||
const multiType: WorkflowType<MultiRoles> = {
|
||||
name: 'multi',
|
||||
roles: {
|
||||
work: async () => {
|
||||
execCount++;
|
||||
return { content: `Work ${execCount}`, meta: null };
|
||||
},
|
||||
},
|
||||
moderator: (output) => {
|
||||
if (output.role === START) return 'work';
|
||||
return END; // 正常结束,不造成无限循环
|
||||
},
|
||||
};
|
||||
|
||||
const rule = createWorkflowRule(multiType, store, logStore, {
|
||||
maxTicksPerTopic: 2,
|
||||
maxTicksPerWindow: 10,
|
||||
cooldownMs: 100,
|
||||
});
|
||||
|
||||
// 测试多个 topic,超过 maxTicksPerTopic 限制
|
||||
for (let i = 0; i < 3; i++) {
|
||||
triggerWorkflow('multi', `t${i}`, `work${i}`);
|
||||
}
|
||||
|
||||
// 第一波 tick,执行所有 3 个 topic
|
||||
const r1 = await rule.tick();
|
||||
expect(r1.executed.length).toBe(3);
|
||||
expect(execCount).toBe(3);
|
||||
|
||||
// 所有 topic 都已经结束,不会再有新的 tick
|
||||
const r2 = await rule.tick();
|
||||
expect(r2.executed).toEqual([]);
|
||||
expect(execCount).toBe(3);
|
||||
});
|
||||
|
||||
it('默认保护参数正确', async () => {
|
||||
setup();
|
||||
|
||||
type TestRoles = { test: Role<null> };
|
||||
const testType: WorkflowType<TestRoles> = {
|
||||
name: 'test',
|
||||
roles: { test: async () => ({ content: 'test', meta: null }) },
|
||||
moderator: (output) => output.role === START ? 'test' : END,
|
||||
};
|
||||
|
||||
// 不传 options,应该使用默认值
|
||||
const rule = createWorkflowRule(testType, store, logStore);
|
||||
triggerWorkflow('test', 't1', 'start');
|
||||
|
||||
const r1 = await rule.tick();
|
||||
expect(r1.executed).toMatchObject([{ topicId: 't1', role: 'test' }]);
|
||||
// 如果默认参数有问题,这个测试会失败
|
||||
});
|
||||
});
|
||||
|
||||
@@ -38,6 +38,13 @@ export interface WorkflowRule {
|
||||
tick: () => Promise<WorkflowTickResult>;
|
||||
}
|
||||
|
||||
/** Options for workflow rule protection mechanisms */
|
||||
export interface WorkflowRuleOptions {
|
||||
maxTicksPerTopic?: number; // 默认 20
|
||||
maxTicksPerWindow?: number; // 默认 50, window = 1h
|
||||
cooldownMs?: number; // 默认 60_000
|
||||
}
|
||||
|
||||
const START_SUFFIX = '__start__';
|
||||
|
||||
/**
|
||||
@@ -47,9 +54,64 @@ export function createWorkflowRule(
|
||||
wf: WorkflowType<any>,
|
||||
store: PulseStore,
|
||||
logStore?: PulseStore,
|
||||
options: WorkflowRuleOptions = {},
|
||||
): WorkflowRule {
|
||||
const {
|
||||
maxTicksPerTopic = 20,
|
||||
maxTicksPerWindow = 50,
|
||||
cooldownMs = 60_000,
|
||||
} = options;
|
||||
let prevSnapshotJson = '';
|
||||
|
||||
// 保护机制辅助函数
|
||||
const getTopicTickCount = (topicId: string): number => {
|
||||
return store
|
||||
.getAfter(0)
|
||||
.filter((e) => e.key === topicId && e.kind.startsWith(`${wf.name}.`) && e.kind !== `${wf.name}.${START_SUFFIX}`)
|
||||
.length;
|
||||
};
|
||||
|
||||
const getWindowTickCount = (windowMs: number): number => {
|
||||
const cutoff = Date.now() - windowMs;
|
||||
return store
|
||||
.getAfter(0)
|
||||
.filter((e) =>
|
||||
e.occurredAt >= cutoff &&
|
||||
e.kind.startsWith(`${wf.name}.`) &&
|
||||
e.kind !== `${wf.name}.${START_SUFFIX}`
|
||||
)
|
||||
.length;
|
||||
};
|
||||
|
||||
const getLastTickTime = (topicId: string): number => {
|
||||
const events = store
|
||||
.getAfter(0)
|
||||
.filter((e) => e.key === topicId && e.kind.startsWith(`${wf.name}.`) && e.kind !== `${wf.name}.${START_SUFFIX}`)
|
||||
.sort((a, b) => b.occurredAt - a.occurredAt);
|
||||
return events.length > 0 ? events[0].occurredAt : 0;
|
||||
};
|
||||
|
||||
const forceEndTopic = (topicId: string) => {
|
||||
store.appendEvent({
|
||||
occurredAt: Date.now(),
|
||||
kind: `${wf.name}.__end__`,
|
||||
key: topicId,
|
||||
meta: JSON.stringify({ reason: 'maxTicksPerTopic exceeded', maxTicks: maxTicksPerTopic }),
|
||||
});
|
||||
if (logStore) {
|
||||
logStore.appendEvent({
|
||||
occurredAt: Date.now(),
|
||||
kind: `${wf.name}.topic-force-ended`,
|
||||
key: topicId,
|
||||
meta: JSON.stringify({
|
||||
topicId,
|
||||
reason: 'maxTicksPerTopic exceeded',
|
||||
maxTicks: maxTicksPerTopic,
|
||||
}),
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
return {
|
||||
async tick(): Promise<WorkflowTickResult> {
|
||||
const events = store.getAfter(0);
|
||||
@@ -98,11 +160,51 @@ export function createWorkflowRule(
|
||||
}
|
||||
}
|
||||
|
||||
// 4. Update baseline BEFORE execution
|
||||
// 4. 保护机制检查
|
||||
// 4.1 全局 tick 数量检查
|
||||
const windowTickCount = getWindowTickCount(60 * 60 * 1000); // 1 小时窗口
|
||||
if (windowTickCount >= maxTicksPerWindow) {
|
||||
if (logStore) {
|
||||
logStore.appendEvent({
|
||||
occurredAt: Date.now(),
|
||||
kind: `${wf.name}.global-tick-limit-exceeded`,
|
||||
key: '',
|
||||
meta: JSON.stringify({
|
||||
windowTickCount,
|
||||
maxTicksPerWindow,
|
||||
reason: 'Suspending all workflow ticks due to global limit',
|
||||
}),
|
||||
});
|
||||
}
|
||||
return { executed: [] }; // 返回空 effects,暂停所有 workflow tick
|
||||
}
|
||||
|
||||
// 4.2 过滤和处理 actions
|
||||
const filteredActions: WorkflowAction[] = [];
|
||||
const now = Date.now();
|
||||
|
||||
for (const action of actions) {
|
||||
// 检查 topic tick 数量限制
|
||||
const topicTickCount = getTopicTickCount(action.topicId);
|
||||
if (topicTickCount >= maxTicksPerTopic) {
|
||||
forceEndTopic(action.topicId);
|
||||
continue; // 跳过这个 action
|
||||
}
|
||||
|
||||
// 检查 cooldown
|
||||
const lastTickTime = getLastTickTime(action.topicId);
|
||||
if (now - lastTickTime < cooldownMs) {
|
||||
continue; // 跳过这个 action,cooldown 未结束
|
||||
}
|
||||
|
||||
filteredActions.push(action);
|
||||
}
|
||||
|
||||
// 5. Update baseline BEFORE execution
|
||||
prevSnapshotJson = currSnapshotJson;
|
||||
|
||||
// 5. Execute actions
|
||||
const executed: ExecutedRole[] = []; for (const action of actions) {
|
||||
// 6. Execute filtered actions
|
||||
const executed: ExecutedRole[] = []; for (const action of filteredActions) {
|
||||
const roleFn = wf.roles[action.role];
|
||||
if (!roleFn) continue;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user