feat(dispatcher): use server-side incremental query (after=) from #26

- fetchEvents no longer reverses results (after= returns ASC)
- Watcher removes client-side id filtering (server handles it)
- Updated test mock to match ASC order
This commit is contained in:
小墨 2026-04-13 08:12:39 +00:00
parent ce5e1005f7
commit c6d880fd71
3 changed files with 8 additions and 13 deletions

View File

@ -75,7 +75,6 @@ export class OGraphClient {
async fetchEvents(ref: number, afterId?: number): Promise<OGraphEvent[]> { async fetchEvents(ref: number, afterId?: number): Promise<OGraphEvent[]> {
const { endpoint, token } = this.config.ograph; const { endpoint, token } = this.config.ograph;
let url = `${endpoint}/events?ref=${ref}`; let url = `${endpoint}/events?ref=${ref}`;
// afterId 参数当前 API 可能不支持,先传着,API 忽略就拉全量
if (afterId !== undefined) url += `&after=${afterId}`; if (afterId !== undefined) url += `&after=${afterId}`;
const headers: Record<string, string> = { const headers: Record<string, string> = {
@ -97,9 +96,7 @@ export class OGraphClient {
throw new Error(result.error ?? `HTTP ${response.status} fetching events`); throw new Error(result.error ?? `HTTP ${response.status} fetching events`);
} }
// API 返回格式: { events: [...], total: N } // after= mode returns ASC order (oldest first), ideal for Dispatcher polling
// 返回顺序: 降序(newest first),需要 reverse() return result.events ?? [];
const events = result.events ?? [];
return events.reverse(); // 按 id 升序排列
} }
} }

View File

@ -70,15 +70,13 @@ export class ProjectionWatcher {
const { agentId, eventTypes } = this.config.discovery!; const { agentId, eventTypes } = this.config.discovery!;
try { try {
// Server-side incremental query: only returns events with id > lastSeenEventId
const events = await this.client.fetchEvents(agentId, this.lastSeenEventId); const events = await this.client.fetchEvents(agentId, this.lastSeenEventId);
// 过滤:只取 id > lastSeenEventId 的(client-side,因为 API 可能不支持 after) // Filter by eventTypes (if configured)
const newEvents = events.filter(e => e.id > this.lastSeenEventId);
// 按 eventTypes 过滤(如果配置了)
const filtered = eventTypes?.length const filtered = eventTypes?.length
? newEvents.filter(e => eventTypes.includes(e.type_name)) ? events.filter(e => eventTypes.includes(e.type_name))
: newEvents; : events;
if (filtered.length > 0) { if (filtered.length > 0) {
this.hasChanges = true; this.hasChanges = true;

View File

@ -46,9 +46,9 @@ describe('OGraphClient.fetchEvents', () => {
it('fetches events for a ref and returns them in ascending order', async () => { it('fetches events for a ref and returns them in ascending order', async () => {
const mockResponse = { const mockResponse = {
events: [ events: [
{ id: 3, type_hash: 'h3', type_name: 'task_created', payload: { id: 102 }, created_at: 3000 },
{ id: 2, type_hash: 'h2', type_name: 'task_created', payload: { id: 101 }, created_at: 2000 },
{ id: 1, type_hash: 'h1', type_name: 'task_updated', payload: { id: 100 }, created_at: 1000 }, { id: 1, type_hash: 'h1', type_name: 'task_updated', payload: { id: 100 }, created_at: 1000 },
{ id: 2, type_hash: 'h2', type_name: 'task_created', payload: { id: 101 }, created_at: 2000 },
{ id: 3, type_hash: 'h3', type_name: 'task_created', payload: { id: 102 }, created_at: 3000 },
], ],
total: 3, total: 3,
}; };