diff --git a/packages/dispatcher/src/config.ts b/packages/dispatcher/src/config.ts index ff6b4b5..a0ead11 100644 --- a/packages/dispatcher/src/config.ts +++ b/packages/dispatcher/src/config.ts @@ -16,6 +16,7 @@ const DEFAULTS: DispatcherConfig = { token: undefined, projections: [], }, + discovery: undefined, oc: { statusEndpoint: 'http://localhost:18789/plugins/session-status/status', statusToken: 'ograph-status-token-2026', @@ -84,5 +85,11 @@ export function loadConfig(): DispatcherConfig { if (!isNaN(n)) config.oc.minAvailable = n; } + // ── Discovery configuration overrides ─────────────────────────────────────── + if (process.env['OGRAPH_AGENT_ID']) { + config.discovery = config.discovery ?? { agentId: 0 }; + config.discovery.agentId = parseInt(process.env['OGRAPH_AGENT_ID'], 10); + } + return config; } diff --git a/packages/dispatcher/src/ograph-client.ts b/packages/dispatcher/src/ograph-client.ts index 3945911..2a96408 100644 --- a/packages/dispatcher/src/ograph-client.ts +++ b/packages/dispatcher/src/ograph-client.ts @@ -1,7 +1,7 @@ // OGraph API client (dispatcher-specific, zero external deps) // Uses node built-in fetch (Node >= 18). -import type { DispatcherConfig } from './types.js'; +import type { DispatcherConfig, OGraphEvent } from './types.js'; export interface ProjectionValue { name: string; @@ -70,4 +70,36 @@ export class OGraphClient { return { values, errors }; } + + /** Fetch events for a specific agent/ref. Returns events in ascending ID order. */ + async fetchEvents(ref: number, afterId?: number): Promise { + const { endpoint, token } = this.config.ograph; + let url = `${endpoint}/events?ref=${ref}`; + // afterId 参数当前 API 可能不支持,先传着,API 忽略就拉全量 + if (afterId !== undefined) url += `&after=${afterId}`; + + const headers: Record = { + 'Content-Type': 'application/json', + }; + if (token) headers['Authorization'] = `Bearer ${token}`; + + const response = await fetch(url, { headers }); + + const contentType = response.headers.get('content-type') ?? ''; + if (!contentType.includes('application/json')) { + const text = await response.text(); + throw new Error(`Non-JSON response for events: ${text.slice(0, 120)}`); + } + + const result = await response.json() as { events?: OGraphEvent[]; error?: string; total?: number }; + + if (!response.ok) { + throw new Error(result.error ?? `HTTP ${response.status} fetching events`); + } + + // API 返回格式: { events: [...], total: N } + // 返回顺序: 降序(newest first),需要 reverse() + const events = result.events ?? []; + return events.reverse(); // 按 id 升序排列 + } } diff --git a/packages/dispatcher/src/scheduler.ts b/packages/dispatcher/src/scheduler.ts index 88bd761..21e07f0 100644 --- a/packages/dispatcher/src/scheduler.ts +++ b/packages/dispatcher/src/scheduler.ts @@ -138,6 +138,46 @@ export class OcScheduler { } private buildMessage(entries: PendingEntry[]): string { + // 检查是否有事件流模式的条目 + const hasEvents = entries.some(entry => entry.name.startsWith('event:')); + + if (hasEvents) { + return this.buildEventMessage(entries); + } else { + return this.buildProjectionMessage(entries); + } + } + + private buildEventMessage(entries: PendingEntry[]): string { + const lines: string[] = ['📋 OGraph Event Stream Updates\n']; + + for (const entry of entries) { + if (entry.events && entry.events.length > 0) { + const event = entry.events[0]; // 只显示第一个事件(每个 entry 对应一个 event) + const age = Math.round((Date.now() - entry.firstDetectedAt) / 1000); + + // 根据 event type 格式化消息 + if (event.type_name === 'task_created') { + const payload = event.payload as any; + lines.push(`📋 新任务: #${payload.id} ${payload.title} (${payload.priority}) — 由${payload.creator || '系统'}创建`); + } else if (event.type_name === 'task_updated') { + const payload = event.payload as any; + lines.push(`📋 #${payload.id} 状态更新: ${payload.previous_status} → ${payload.new_status}`); + } else { + // 通用格式 + lines.push(`• **${event.type_name}** #${event.id}`); + lines.push(` Payload: ${JSON.stringify(event.payload)}`); + lines.push(` Age: ${age}s`); + } + lines.push(''); + } + } + + lines.push(`Total: ${entries.length} event(s) detected`); + return lines.join('\n'); + } + + private buildProjectionMessage(entries: PendingEntry[]): string { const lines: string[] = ['🔔 OGraph Projection Changes Detected\n']; for (const entry of entries) { diff --git a/packages/dispatcher/src/types.ts b/packages/dispatcher/src/types.ts index 076f60e..3b3a016 100644 --- a/packages/dispatcher/src/types.ts +++ b/packages/dispatcher/src/types.ts @@ -6,6 +6,10 @@ export interface DispatcherConfig { token?: string; // OGraph API token projections: string[]; // projection 名称列表 }; + discovery?: { + agentId: number; // 当前 agent 的 object id + eventTypes?: string[]; // 只关注哪些 event type(可选过滤) + }; oc: { statusEndpoint: string; // session-status plugin URL statusToken: string; // Bearer token @@ -36,6 +40,20 @@ export interface PendingEntry { firstDetectedAt: number; lastDetectedAt: number; changeCount: number; + events?: OGraphEvent[]; // 新增:关联的事件(事件流模式用) +} + +export interface OGraphEvent { + id: number; + type_hash: string; + type_name: string; + payload: Record; + created_at: number; +} + +export interface EventChange { + event: OGraphEvent; + detectedAt: number; } /** OC session-status API response */ diff --git a/packages/dispatcher/src/watcher.ts b/packages/dispatcher/src/watcher.ts index 6284978..5805aff 100644 --- a/packages/dispatcher/src/watcher.ts +++ b/packages/dispatcher/src/watcher.ts @@ -14,6 +14,8 @@ export class ProjectionWatcher { private hasChanges = false; private running = false; private timer: ReturnType | null = null; + // 新增:事件流模式状态 + private lastSeenEventId = 0; // 上次看到的最大 event id constructor( private readonly config: DispatcherConfig, @@ -26,7 +28,11 @@ export class ProjectionWatcher { start(): void { if (this.running) return; this.running = true; - console.log(`[${ts()}] [watcher] started — watching: ${this.config.ograph.projections.join(', ') || '(none)'}`); + const mode = this.config.discovery ? 'events' : 'projections'; + const target = this.config.discovery + ? `agent:${this.config.discovery.agentId}` + : this.config.ograph.projections.join(', ') || '(none)'; + console.log(`[${ts()}] [watcher] started — mode: ${mode}, watching: ${target}`); void this.poll(); } @@ -49,6 +55,71 @@ export class ProjectionWatcher { private async poll(): Promise { if (!this.running) return; + // 根据 config 决定走哪条路 + if (this.config.discovery) { + await this.pollEvents(); // 事件流模式 + } else { + await this.pollProjections(); // 原有逻辑 + } + } + + private async pollEvents(): Promise { + if (!this.running) return; + + const { watcherIdle, watcherActive } = this.config.intervals; + const { agentId, eventTypes } = this.config.discovery!; + + try { + const events = await this.client.fetchEvents(agentId, this.lastSeenEventId); + + // 过滤:只取 id > lastSeenEventId 的(client-side,因为 API 可能不支持 after) + const newEvents = events.filter(e => e.id > this.lastSeenEventId); + + // 按 eventTypes 过滤(如果配置了) + const filtered = eventTypes?.length + ? newEvents.filter(e => eventTypes.includes(e.type_name)) + : newEvents; + + if (filtered.length > 0) { + this.hasChanges = true; + // 更新 lastSeenEventId + this.lastSeenEventId = Math.max(...filtered.map(e => e.id)); + + // 合并到 pending queue + // 按 event.type_name 分组,每组一个 PendingEntry + for (const event of filtered) { + const key = `event:${event.type_name}:${event.id}`; + this.pending.set(key, { + name: `event:${event.type_name}`, + previousValue: null, + currentValue: event.payload, + firstDetectedAt: Date.now(), + lastDetectedAt: Date.now(), + changeCount: 1, + events: [event], + }); + } + + console.log(`[${ts()}] [watcher] ${filtered.length} new event(s) discovered`); + for (const e of filtered) { + console.log(`[${ts()}] [watcher] #${e.id} ${e.type_name}: ${JSON.stringify(e.payload)}`); + } + } else { + this.hasChanges = this.pending.size > 0; + } + } catch (err) { + console.error(`[${ts()}] [watcher] poll error: ${err instanceof Error ? err.message : String(err)}`); + // Graceful degradation: keep previous state, retry at idle interval + this.hasChanges = false; + } + + const interval = this.hasChanges ? watcherActive : watcherIdle; + this.scheduleNext(interval); + } + + private async pollProjections(): Promise { + if (!this.running) return; + const { watcherIdle, watcherActive } = this.config.intervals; try { diff --git a/packages/dispatcher/test/ograph-client.test.ts b/packages/dispatcher/test/ograph-client.test.ts new file mode 100644 index 0000000..22e1352 --- /dev/null +++ b/packages/dispatcher/test/ograph-client.test.ts @@ -0,0 +1,155 @@ +// Tests for OGraphClient event stream functionality +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { OGraphClient } from '../src/ograph-client.js'; +import type { DispatcherConfig, OGraphEvent } from '../src/types.js'; + +// Mock fetch globally +const fetchMock = vi.fn(); +global.fetch = fetchMock; + +function makeConfig(): DispatcherConfig { + return { + ograph: { + endpoint: 'https://test.example.com', + token: 'test-token', + projections: [] + }, + oc: { + statusEndpoint: 'http://localhost', + statusToken: 'tok', + minAvailable: 2 + }, + intervals: { + watcherIdle: 30_000, + watcherActive: 5_000, + schedulerIdle: 60_000, + schedulerActive: 5_000, + cooldownAfterPush: 60_000, + }, + }; +} + +describe('OGraphClient.fetchEvents', () => { + let client: OGraphClient; + let config: DispatcherConfig; + + beforeEach(() => { + config = makeConfig(); + client = new OGraphClient(config); + fetchMock.mockClear(); + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('fetches events for a ref and returns them in ascending order', async () => { + const mockResponse = { + events: [ + { id: 3, type_hash: 'h3', type_name: 'task_created', payload: { id: 102 }, created_at: 3000 }, + { id: 2, type_hash: 'h2', type_name: 'task_created', payload: { id: 101 }, created_at: 2000 }, + { id: 1, type_hash: 'h1', type_name: 'task_updated', payload: { id: 100 }, created_at: 1000 }, + ], + total: 3, + }; + + fetchMock.mockResolvedValueOnce({ + ok: true, + headers: { get: () => 'application/json' }, + json: () => Promise.resolve(mockResponse), + }); + + const result = await client.fetchEvents(123); + + expect(fetchMock).toHaveBeenCalledWith( + 'https://test.example.com/events?ref=123', + { + headers: { + 'Content-Type': 'application/json', + 'Authorization': 'Bearer test-token', + }, + } + ); + + expect(result).toHaveLength(3); + // Should be reversed to ascending order by id + expect(result.map(e => e.id)).toEqual([1, 2, 3]); + expect(result[0]).toEqual({ + id: 1, + type_hash: 'h1', + type_name: 'task_updated', + payload: { id: 100 }, + created_at: 1000 + }); + }); + + it('includes afterId parameter when provided', async () => { + fetchMock.mockResolvedValueOnce({ + ok: true, + headers: { get: () => 'application/json' }, + json: () => Promise.resolve({ events: [], total: 0 }), + }); + + await client.fetchEvents(456, 10); + + expect(fetchMock).toHaveBeenCalledWith( + 'https://test.example.com/events?ref=456&after=10', + expect.any(Object) + ); + }); + + it('omits Authorization header when token is not configured', async () => { + config.ograph.token = undefined; + client = new OGraphClient(config); + + fetchMock.mockResolvedValueOnce({ + ok: true, + headers: { get: () => 'application/json' }, + json: () => Promise.resolve({ events: [], total: 0 }), + }); + + await client.fetchEvents(789); + + expect(fetchMock).toHaveBeenCalledWith( + 'https://test.example.com/events?ref=789', + { + headers: { + 'Content-Type': 'application/json', + }, + } + ); + }); + + it('handles empty events array', async () => { + fetchMock.mockResolvedValueOnce({ + ok: true, + headers: { get: () => 'application/json' }, + json: () => Promise.resolve({ events: [], total: 0 }), + }); + + const result = await client.fetchEvents(999); + + expect(result).toEqual([]); + }); + + it('throws error on HTTP failure', async () => { + fetchMock.mockResolvedValueOnce({ + ok: false, + status: 500, + headers: { get: () => 'application/json' }, + json: () => Promise.resolve({ error: 'Internal server error' }), + }); + + await expect(client.fetchEvents(123)).rejects.toThrow('Internal server error'); + }); + + it('throws error on non-JSON response', async () => { + fetchMock.mockResolvedValueOnce({ + ok: true, + headers: { get: () => 'text/plain' }, + text: () => Promise.resolve('Not JSON'), + }); + + await expect(client.fetchEvents(123)).rejects.toThrow('Non-JSON response for events'); + }); +}); \ No newline at end of file diff --git a/packages/dispatcher/test/watcher.test.ts b/packages/dispatcher/test/watcher.test.ts index 2ace13c..cc65621 100644 --- a/packages/dispatcher/test/watcher.test.ts +++ b/packages/dispatcher/test/watcher.test.ts @@ -3,7 +3,7 @@ // without needing to spin up a real OGraph server. import { describe, it, expect, vi, beforeEach } from 'vitest'; -import type { DispatcherConfig, PendingEntry } from '../src/types.js'; +import type { DispatcherConfig, PendingEntry, OGraphEvent } from '../src/types.js'; // ── helpers ────────────────────────────────────────────────────────────────── @@ -153,3 +153,104 @@ describe('watcher diff logic', () => { expect(pending.size).toBe(0); }); }); + +// ── Event stream diff logic tests ─────────────────────────────────────────────────────────────────── + +// Helper function for testing event stream diff logic +function runEventDiff( + lastSeenEventId: number, + events: OGraphEvent[], + eventTypes: string[] | undefined, + pending: Map, +): { newLastSeenEventId: number; filtered: OGraphEvent[] } { + // 过滤:只取 id > lastSeenEventId 的(client-side,因为 API 可能不支持 after) + const newEvents = events.filter(e => e.id > lastSeenEventId); + + // 按 eventTypes 过滤(如果配置了) + const filtered = eventTypes?.length + ? newEvents.filter(e => eventTypes.includes(e.type_name)) + : newEvents; + + let newLastSeenEventId = lastSeenEventId; + if (filtered.length > 0) { + // 更新 lastSeenEventId + newLastSeenEventId = Math.max(...filtered.map(e => e.id)); + + // 合并到 pending queue + for (const event of filtered) { + const key = `event:${event.type_name}:${event.id}`; + pending.set(key, { + name: `event:${event.type_name}`, + previousValue: null, + currentValue: event.payload, + firstDetectedAt: Date.now(), + lastDetectedAt: Date.now(), + changeCount: 1, + events: [event], + }); + } + } + + return { newLastSeenEventId, filtered }; +} + +describe('event stream diff logic', () => { + let pending: Map; + + beforeEach(() => { + pending = new Map(); + }); + + it('filters events by lastSeenEventId', () => { + const events: OGraphEvent[] = [ + { id: 1, type_hash: 'h1', type_name: 'task_created', payload: { id: 100 }, created_at: 1000 }, + { id: 2, type_hash: 'h2', type_name: 'task_updated', payload: { id: 101 }, created_at: 2000 }, + { id: 3, type_hash: 'h3', type_name: 'task_created', payload: { id: 102 }, created_at: 3000 }, + ]; + + const { newLastSeenEventId, filtered } = runEventDiff(1, events, undefined, pending); + + expect(newLastSeenEventId).toBe(3); + expect(filtered).toHaveLength(2); + expect(filtered.map(e => e.id)).toEqual([2, 3]); + expect(pending.size).toBe(2); + }); + + it('filters events by eventTypes when configured', () => { + const events: OGraphEvent[] = [ + { id: 1, type_hash: 'h1', type_name: 'task_created', payload: { id: 100 }, created_at: 1000 }, + { id: 2, type_hash: 'h2', type_name: 'task_updated', payload: { id: 101 }, created_at: 2000 }, + { id: 3, type_hash: 'h3', type_name: 'user_login', payload: { user: 'alice' }, created_at: 3000 }, + ]; + + const { filtered } = runEventDiff(0, events, ['task_created', 'task_updated'], pending); + + expect(filtered).toHaveLength(2); + expect(filtered.map(e => e.type_name)).toEqual(['task_created', 'task_updated']); + expect(pending.size).toBe(2); + }); + + it('handles empty event list gracefully', () => { + const { newLastSeenEventId, filtered } = runEventDiff(5, [], undefined, pending); + + expect(newLastSeenEventId).toBe(5); // unchanged + expect(filtered).toHaveLength(0); + expect(pending.size).toBe(0); + }); + + it('creates pending entries with event metadata', () => { + const events: OGraphEvent[] = [ + { id: 10, type_hash: 'h1', type_name: 'task_created', payload: { id: 200, title: 'Test' }, created_at: 1000 }, + ]; + + runEventDiff(0, events, undefined, pending); + + const entry = pending.get('event:task_created:10'); + expect(entry).toBeDefined(); + expect(entry!.name).toBe('event:task_created'); + expect(entry!.currentValue).toEqual({ id: 200, title: 'Test' }); + expect(entry!.previousValue).toBeNull(); + expect(entry!.events).toHaveLength(1); + expect(entry!.events![0].id).toBe(10); + }); +});