feat: add event stream discovery mode for P0 Task system

- Add discovery config with agentId and optional eventTypes filtering
- Add fetchEvents method to OGraphClient for event stream access
- Update Watcher to support both projection and event stream modes
- Update Scheduler to format event stream messages appropriately
- Add comprehensive tests for event stream diff logic and API client
- Maintain backward compatibility with existing projection mode
- Support OGRAPH_AGENT_ID environment variable override

This enables P0 Task system to discover changes from event streams
without requiring Projection fixes (#19). Event stream mode activated
when config.discovery is present, otherwise falls back to projection mode.
This commit is contained in:
小墨 2026-04-13 04:43:59 +00:00
parent cedd46a18d
commit a3aa77eb31
7 changed files with 427 additions and 3 deletions

View File

@ -16,6 +16,7 @@ const DEFAULTS: DispatcherConfig = {
token: undefined,
projections: [],
},
discovery: undefined,
oc: {
statusEndpoint: 'http://localhost:18789/plugins/session-status/status',
statusToken: 'ograph-status-token-2026',
@ -84,5 +85,11 @@ export function loadConfig(): DispatcherConfig {
if (!isNaN(n)) config.oc.minAvailable = n;
}
// ── Discovery configuration overrides ───────────────────────────────────────
if (process.env['OGRAPH_AGENT_ID']) {
config.discovery = config.discovery ?? { agentId: 0 };
config.discovery.agentId = parseInt(process.env['OGRAPH_AGENT_ID'], 10);
}
return config;
}

View File

@ -1,7 +1,7 @@
// OGraph API client (dispatcher-specific, zero external deps)
// Uses node built-in fetch (Node >= 18).
import type { DispatcherConfig } from './types.js';
import type { DispatcherConfig, OGraphEvent } from './types.js';
export interface ProjectionValue {
name: string;
@ -70,4 +70,36 @@ export class OGraphClient {
return { values, errors };
}
/** Fetch events for a specific agent/ref. Returns events in ascending ID order. */
async fetchEvents(ref: number, afterId?: number): Promise<OGraphEvent[]> {
const { endpoint, token } = this.config.ograph;
let url = `${endpoint}/events?ref=${ref}`;
// afterId 参数当前 API 可能不支持,先传着,API 忽略就拉全量
if (afterId !== undefined) url += `&after=${afterId}`;
const headers: Record<string, string> = {
'Content-Type': 'application/json',
};
if (token) headers['Authorization'] = `Bearer ${token}`;
const response = await fetch(url, { headers });
const contentType = response.headers.get('content-type') ?? '';
if (!contentType.includes('application/json')) {
const text = await response.text();
throw new Error(`Non-JSON response for events: ${text.slice(0, 120)}`);
}
const result = await response.json() as { events?: OGraphEvent[]; error?: string; total?: number };
if (!response.ok) {
throw new Error(result.error ?? `HTTP ${response.status} fetching events`);
}
// API 返回格式: { events: [...], total: N }
// 返回顺序: 降序(newest first),需要 reverse()
const events = result.events ?? [];
return events.reverse(); // 按 id 升序排列
}
}

View File

@ -138,6 +138,46 @@ export class OcScheduler {
}
private buildMessage(entries: PendingEntry[]): string {
// 检查是否有事件流模式的条目
const hasEvents = entries.some(entry => entry.name.startsWith('event:'));
if (hasEvents) {
return this.buildEventMessage(entries);
} else {
return this.buildProjectionMessage(entries);
}
}
private buildEventMessage(entries: PendingEntry[]): string {
const lines: string[] = ['📋 OGraph Event Stream Updates\n'];
for (const entry of entries) {
if (entry.events && entry.events.length > 0) {
const event = entry.events[0]; // 只显示第一个事件(每个 entry 对应一个 event)
const age = Math.round((Date.now() - entry.firstDetectedAt) / 1000);
// 根据 event type 格式化消息
if (event.type_name === 'task_created') {
const payload = event.payload as any;
lines.push(`📋 新任务: #${payload.id} ${payload.title} (${payload.priority}) — 由${payload.creator || '系统'}创建`);
} else if (event.type_name === 'task_updated') {
const payload = event.payload as any;
lines.push(`📋 #${payload.id} 状态更新: ${payload.previous_status}${payload.new_status}`);
} else {
// 通用格式
lines.push(`• **${event.type_name}** #${event.id}`);
lines.push(` Payload: ${JSON.stringify(event.payload)}`);
lines.push(` Age: ${age}s`);
}
lines.push('');
}
}
lines.push(`Total: ${entries.length} event(s) detected`);
return lines.join('\n');
}
private buildProjectionMessage(entries: PendingEntry[]): string {
const lines: string[] = ['🔔 OGraph Projection Changes Detected\n'];
for (const entry of entries) {

View File

@ -6,6 +6,10 @@ export interface DispatcherConfig {
token?: string; // OGraph API token
projections: string[]; // projection 名称列表
};
discovery?: {
agentId: number; // 当前 agent 的 object id
eventTypes?: string[]; // 只关注哪些 event type(可选过滤)
};
oc: {
statusEndpoint: string; // session-status plugin URL
statusToken: string; // Bearer token
@ -36,6 +40,20 @@ export interface PendingEntry {
firstDetectedAt: number;
lastDetectedAt: number;
changeCount: number;
events?: OGraphEvent[]; // 新增:关联的事件(事件流模式用)
}
export interface OGraphEvent {
id: number;
type_hash: string;
type_name: string;
payload: Record<string, unknown>;
created_at: number;
}
export interface EventChange {
event: OGraphEvent;
detectedAt: number;
}
/** OC session-status API response */

View File

@ -14,6 +14,8 @@ export class ProjectionWatcher {
private hasChanges = false;
private running = false;
private timer: ReturnType<typeof setTimeout> | null = null;
// 新增:事件流模式状态
private lastSeenEventId = 0; // 上次看到的最大 event id
constructor(
private readonly config: DispatcherConfig,
@ -26,7 +28,11 @@ export class ProjectionWatcher {
start(): void {
if (this.running) return;
this.running = true;
console.log(`[${ts()}] [watcher] started — watching: ${this.config.ograph.projections.join(', ') || '(none)'}`);
const mode = this.config.discovery ? 'events' : 'projections';
const target = this.config.discovery
? `agent:${this.config.discovery.agentId}`
: this.config.ograph.projections.join(', ') || '(none)';
console.log(`[${ts()}] [watcher] started — mode: ${mode}, watching: ${target}`);
void this.poll();
}
@ -49,6 +55,71 @@ export class ProjectionWatcher {
private async poll(): Promise<void> {
if (!this.running) return;
// 根据 config 决定走哪条路
if (this.config.discovery) {
await this.pollEvents(); // 事件流模式
} else {
await this.pollProjections(); // 原有逻辑
}
}
private async pollEvents(): Promise<void> {
if (!this.running) return;
const { watcherIdle, watcherActive } = this.config.intervals;
const { agentId, eventTypes } = this.config.discovery!;
try {
const events = await this.client.fetchEvents(agentId, this.lastSeenEventId);
// 过滤:只取 id > lastSeenEventId 的(client-side,因为 API 可能不支持 after)
const newEvents = events.filter(e => e.id > this.lastSeenEventId);
// 按 eventTypes 过滤(如果配置了)
const filtered = eventTypes?.length
? newEvents.filter(e => eventTypes.includes(e.type_name))
: newEvents;
if (filtered.length > 0) {
this.hasChanges = true;
// 更新 lastSeenEventId
this.lastSeenEventId = Math.max(...filtered.map(e => e.id));
// 合并到 pending queue
// 按 event.type_name 分组,每组一个 PendingEntry
for (const event of filtered) {
const key = `event:${event.type_name}:${event.id}`;
this.pending.set(key, {
name: `event:${event.type_name}`,
previousValue: null,
currentValue: event.payload,
firstDetectedAt: Date.now(),
lastDetectedAt: Date.now(),
changeCount: 1,
events: [event],
});
}
console.log(`[${ts()}] [watcher] ${filtered.length} new event(s) discovered`);
for (const e of filtered) {
console.log(`[${ts()}] [watcher] #${e.id} ${e.type_name}: ${JSON.stringify(e.payload)}`);
}
} else {
this.hasChanges = this.pending.size > 0;
}
} catch (err) {
console.error(`[${ts()}] [watcher] poll error: ${err instanceof Error ? err.message : String(err)}`);
// Graceful degradation: keep previous state, retry at idle interval
this.hasChanges = false;
}
const interval = this.hasChanges ? watcherActive : watcherIdle;
this.scheduleNext(interval);
}
private async pollProjections(): Promise<void> {
if (!this.running) return;
const { watcherIdle, watcherActive } = this.config.intervals;
try {

View File

@ -0,0 +1,155 @@
// Tests for OGraphClient event stream functionality
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { OGraphClient } from '../src/ograph-client.js';
import type { DispatcherConfig, OGraphEvent } from '../src/types.js';
// Mock fetch globally
const fetchMock = vi.fn();
global.fetch = fetchMock;
function makeConfig(): DispatcherConfig {
return {
ograph: {
endpoint: 'https://test.example.com',
token: 'test-token',
projections: []
},
oc: {
statusEndpoint: 'http://localhost',
statusToken: 'tok',
minAvailable: 2
},
intervals: {
watcherIdle: 30_000,
watcherActive: 5_000,
schedulerIdle: 60_000,
schedulerActive: 5_000,
cooldownAfterPush: 60_000,
},
};
}
describe('OGraphClient.fetchEvents', () => {
let client: OGraphClient;
let config: DispatcherConfig;
beforeEach(() => {
config = makeConfig();
client = new OGraphClient(config);
fetchMock.mockClear();
});
afterEach(() => {
vi.restoreAllMocks();
});
it('fetches events for a ref and returns them in ascending order', async () => {
const mockResponse = {
events: [
{ id: 3, type_hash: 'h3', type_name: 'task_created', payload: { id: 102 }, created_at: 3000 },
{ id: 2, type_hash: 'h2', type_name: 'task_created', payload: { id: 101 }, created_at: 2000 },
{ id: 1, type_hash: 'h1', type_name: 'task_updated', payload: { id: 100 }, created_at: 1000 },
],
total: 3,
};
fetchMock.mockResolvedValueOnce({
ok: true,
headers: { get: () => 'application/json' },
json: () => Promise.resolve(mockResponse),
});
const result = await client.fetchEvents(123);
expect(fetchMock).toHaveBeenCalledWith(
'https://test.example.com/events?ref=123',
{
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer test-token',
},
}
);
expect(result).toHaveLength(3);
// Should be reversed to ascending order by id
expect(result.map(e => e.id)).toEqual([1, 2, 3]);
expect(result[0]).toEqual({
id: 1,
type_hash: 'h1',
type_name: 'task_updated',
payload: { id: 100 },
created_at: 1000
});
});
it('includes afterId parameter when provided', async () => {
fetchMock.mockResolvedValueOnce({
ok: true,
headers: { get: () => 'application/json' },
json: () => Promise.resolve({ events: [], total: 0 }),
});
await client.fetchEvents(456, 10);
expect(fetchMock).toHaveBeenCalledWith(
'https://test.example.com/events?ref=456&after=10',
expect.any(Object)
);
});
it('omits Authorization header when token is not configured', async () => {
config.ograph.token = undefined;
client = new OGraphClient(config);
fetchMock.mockResolvedValueOnce({
ok: true,
headers: { get: () => 'application/json' },
json: () => Promise.resolve({ events: [], total: 0 }),
});
await client.fetchEvents(789);
expect(fetchMock).toHaveBeenCalledWith(
'https://test.example.com/events?ref=789',
{
headers: {
'Content-Type': 'application/json',
},
}
);
});
it('handles empty events array', async () => {
fetchMock.mockResolvedValueOnce({
ok: true,
headers: { get: () => 'application/json' },
json: () => Promise.resolve({ events: [], total: 0 }),
});
const result = await client.fetchEvents(999);
expect(result).toEqual([]);
});
it('throws error on HTTP failure', async () => {
fetchMock.mockResolvedValueOnce({
ok: false,
status: 500,
headers: { get: () => 'application/json' },
json: () => Promise.resolve({ error: 'Internal server error' }),
});
await expect(client.fetchEvents(123)).rejects.toThrow('Internal server error');
});
it('throws error on non-JSON response', async () => {
fetchMock.mockResolvedValueOnce({
ok: true,
headers: { get: () => 'text/plain' },
text: () => Promise.resolve('Not JSON'),
});
await expect(client.fetchEvents(123)).rejects.toThrow('Non-JSON response for events');
});
});

View File

@ -3,7 +3,7 @@
// without needing to spin up a real OGraph server.
import { describe, it, expect, vi, beforeEach } from 'vitest';
import type { DispatcherConfig, PendingEntry } from '../src/types.js';
import type { DispatcherConfig, PendingEntry, OGraphEvent } from '../src/types.js';
// ── helpers ──────────────────────────────────────────────────────────────────
@ -153,3 +153,104 @@ describe('watcher diff logic', () => {
expect(pending.size).toBe(0);
});
});
// ── Event stream diff logic tests ───────────────────────────────────────────────────────────────────
// Helper function for testing event stream diff logic
function runEventDiff(
lastSeenEventId: number,
events: OGraphEvent[],
eventTypes: string[] | undefined,
pending: Map<string, PendingEntry>,
): { newLastSeenEventId: number; filtered: OGraphEvent[] } {
// 过滤:只取 id > lastSeenEventId 的(client-side,因为 API 可能不支持 after)
const newEvents = events.filter(e => e.id > lastSeenEventId);
// 按 eventTypes 过滤(如果配置了)
const filtered = eventTypes?.length
? newEvents.filter(e => eventTypes.includes(e.type_name))
: newEvents;
let newLastSeenEventId = lastSeenEventId;
if (filtered.length > 0) {
// 更新 lastSeenEventId
newLastSeenEventId = Math.max(...filtered.map(e => e.id));
// 合并到 pending queue
for (const event of filtered) {
const key = `event:${event.type_name}:${event.id}`;
pending.set(key, {
name: `event:${event.type_name}`,
previousValue: null,
currentValue: event.payload,
firstDetectedAt: Date.now(),
lastDetectedAt: Date.now(),
changeCount: 1,
events: [event],
});
}
}
return { newLastSeenEventId, filtered };
}
describe('event stream diff logic', () => {
let pending: Map<string, PendingEntry>;
beforeEach(() => {
pending = new Map();
});
it('filters events by lastSeenEventId', () => {
const events: OGraphEvent[] = [
{ id: 1, type_hash: 'h1', type_name: 'task_created', payload: { id: 100 }, created_at: 1000 },
{ id: 2, type_hash: 'h2', type_name: 'task_updated', payload: { id: 101 }, created_at: 2000 },
{ id: 3, type_hash: 'h3', type_name: 'task_created', payload: { id: 102 }, created_at: 3000 },
];
const { newLastSeenEventId, filtered } = runEventDiff(1, events, undefined, pending);
expect(newLastSeenEventId).toBe(3);
expect(filtered).toHaveLength(2);
expect(filtered.map(e => e.id)).toEqual([2, 3]);
expect(pending.size).toBe(2);
});
it('filters events by eventTypes when configured', () => {
const events: OGraphEvent[] = [
{ id: 1, type_hash: 'h1', type_name: 'task_created', payload: { id: 100 }, created_at: 1000 },
{ id: 2, type_hash: 'h2', type_name: 'task_updated', payload: { id: 101 }, created_at: 2000 },
{ id: 3, type_hash: 'h3', type_name: 'user_login', payload: { user: 'alice' }, created_at: 3000 },
];
const { filtered } = runEventDiff(0, events, ['task_created', 'task_updated'], pending);
expect(filtered).toHaveLength(2);
expect(filtered.map(e => e.type_name)).toEqual(['task_created', 'task_updated']);
expect(pending.size).toBe(2);
});
it('handles empty event list gracefully', () => {
const { newLastSeenEventId, filtered } = runEventDiff(5, [], undefined, pending);
expect(newLastSeenEventId).toBe(5); // unchanged
expect(filtered).toHaveLength(0);
expect(pending.size).toBe(0);
});
it('creates pending entries with event metadata', () => {
const events: OGraphEvent[] = [
{ id: 10, type_hash: 'h1', type_name: 'task_created', payload: { id: 200, title: 'Test' }, created_at: 1000 },
];
runEventDiff(0, events, undefined, pending);
const entry = pending.get('event:task_created:10');
expect(entry).toBeDefined();
expect(entry!.name).toBe('event:task_created');
expect(entry!.currentValue).toEqual({ id: 200, title: 'Test' });
expect(entry!.previousValue).toBeNull();
expect(entry!.events).toHaveLength(1);
expect(entry!.events![0].id).toBe(10);
});
});