ograph/packages/dispatcher/test/watcher.test.ts
小墨 a3aa77eb31 feat: add event stream discovery mode for P0 Task system
- Add discovery config with agentId and optional eventTypes filtering
- Add fetchEvents method to OGraphClient for event stream access
- Update Watcher to support both projection and event stream modes
- Update Scheduler to format event stream messages appropriately
- Add comprehensive tests for event stream diff logic and API client
- Maintain backward compatibility with existing projection mode
- Support OGRAPH_AGENT_ID environment variable override

This enables P0 Task system to discover changes from event streams
without requiring Projection fixes (#19). Event stream mode activated
when config.discovery is present, otherwise falls back to projection mode.
2026-04-13 04:52:07 +00:00

257 lines
8.9 KiB
TypeScript

// Tests for watcher diff logic
// We test the core diffing behaviour by exercising the internals directly
// without needing to spin up a real OGraph server.
import { describe, it, expect, vi, beforeEach } from 'vitest';
import type { DispatcherConfig, PendingEntry, OGraphEvent } from '../src/types.js';
// ── helpers ──────────────────────────────────────────────────────────────────
function makeConfig(overrides: Partial<DispatcherConfig['intervals']> = {}): DispatcherConfig {
return {
ograph: { endpoint: 'http://localhost', token: undefined, projections: [] },
oc: { statusEndpoint: 'http://localhost', statusToken: 'tok', minAvailable: 2 },
intervals: {
watcherIdle: 30_000,
watcherActive: 5_000,
schedulerIdle: 60_000,
schedulerActive: 5_000,
cooldownAfterPush: 60_000,
...overrides,
},
};
}
// Minimal inline diff engine extracted from the watcher logic so we can unit-test
// it without I/O dependencies.
function runDiff(
snapshot: Map<string, unknown>,
current: Map<string, unknown>,
pending: Map<string, PendingEntry>,
): { changed: boolean } {
let changed = false;
const now = Date.now();
for (const [name, value] of current.entries()) {
if (!snapshot.has(name)) {
// first-run: initialise snapshot, no change
snapshot.set(name, value);
continue;
}
const prev = snapshot.get(name);
if (JSON.stringify(prev) !== JSON.stringify(value)) {
changed = true;
const existing = pending.get(name);
if (existing) {
existing.currentValue = value;
existing.lastDetectedAt = now;
existing.changeCount += 1;
} else {
pending.set(name, {
name,
previousValue: prev,
currentValue: value,
firstDetectedAt: now,
lastDetectedAt: now,
changeCount: 1,
});
}
snapshot.set(name, value);
}
}
// projections that disappeared
for (const name of snapshot.keys()) {
if (!current.has(name)) {
const prev = snapshot.get(name);
if (prev !== undefined) {
changed = true;
snapshot.set(name, undefined);
pending.set(name, {
name,
previousValue: prev,
currentValue: undefined,
firstDetectedAt: now,
lastDetectedAt: now,
changeCount: 1,
});
}
}
}
return { changed };
}
// ── tests ─────────────────────────────────────────────────────────────────────
describe('watcher diff logic', () => {
let snapshot: Map<string, unknown>;
let pending: Map<string, PendingEntry>;
beforeEach(() => {
snapshot = new Map();
pending = new Map();
});
it('first run: initialises snapshot without reporting changes', () => {
const current = new Map([['proj-a', { count: 1 }]]);
const { changed } = runDiff(snapshot, current, pending);
expect(changed).toBe(false);
expect(pending.size).toBe(0);
expect(snapshot.get('proj-a')).toEqual({ count: 1 });
});
it('second run: no diff when value unchanged', () => {
const current = new Map([['proj-a', { count: 1 }]]);
runDiff(snapshot, current, pending); // first run — init
const { changed } = runDiff(snapshot, current, pending); // second run — same value
expect(changed).toBe(false);
expect(pending.size).toBe(0);
});
it('detects a value change between runs', () => {
const v1 = new Map([['proj-a', { count: 1 }]]);
const v2 = new Map([['proj-a', { count: 2 }]]);
runDiff(snapshot, v1, pending); // init
const { changed } = runDiff(snapshot, v2, pending);
expect(changed).toBe(true);
expect(pending.has('proj-a')).toBe(true);
expect(pending.get('proj-a')!.previousValue).toEqual({ count: 1 });
expect(pending.get('proj-a')!.currentValue).toEqual({ count: 2 });
});
it('merges multiple changes into one pending entry', () => {
runDiff(snapshot, new Map([['proj-a', 1]]), pending); // init
runDiff(snapshot, new Map([['proj-a', 2]]), pending); // change 1
runDiff(snapshot, new Map([['proj-a', 3]]), pending); // change 2
expect(pending.get('proj-a')!.changeCount).toBe(2);
expect(pending.get('proj-a')!.currentValue).toBe(3);
expect(pending.get('proj-a')!.previousValue).toBe(1); // kept from first change
});
it('detects a disappeared projection', () => {
runDiff(snapshot, new Map([['proj-a', 42]]), pending); // init
const { changed } = runDiff(snapshot, new Map(), pending); // proj-a gone
expect(changed).toBe(true);
expect(pending.get('proj-a')!.currentValue).toBeUndefined();
});
it('does not re-fire when a disappeared projection stays absent', () => {
runDiff(snapshot, new Map([['proj-a', 42]]), pending); // init
runDiff(snapshot, new Map(), pending); // gone → change
pending.clear();
const { changed } = runDiff(snapshot, new Map(), pending); // still gone → no change
expect(changed).toBe(false);
expect(pending.size).toBe(0);
});
});
// ── Event stream diff logic tests ───────────────────────────────────────────────────────────────────
// Helper function for testing event stream diff logic
function runEventDiff(
lastSeenEventId: number,
events: OGraphEvent[],
eventTypes: string[] | undefined,
pending: Map<string, PendingEntry>,
): { newLastSeenEventId: number; filtered: OGraphEvent[] } {
// 过滤:只取 id > lastSeenEventId 的(client-side,因为 API 可能不支持 after)
const newEvents = events.filter(e => e.id > lastSeenEventId);
// 按 eventTypes 过滤(如果配置了)
const filtered = eventTypes?.length
? newEvents.filter(e => eventTypes.includes(e.type_name))
: newEvents;
let newLastSeenEventId = lastSeenEventId;
if (filtered.length > 0) {
// 更新 lastSeenEventId
newLastSeenEventId = Math.max(...filtered.map(e => e.id));
// 合并到 pending queue
for (const event of filtered) {
const key = `event:${event.type_name}:${event.id}`;
pending.set(key, {
name: `event:${event.type_name}`,
previousValue: null,
currentValue: event.payload,
firstDetectedAt: Date.now(),
lastDetectedAt: Date.now(),
changeCount: 1,
events: [event],
});
}
}
return { newLastSeenEventId, filtered };
}
describe('event stream diff logic', () => {
let pending: Map<string, PendingEntry>;
beforeEach(() => {
pending = new Map();
});
it('filters events by lastSeenEventId', () => {
const events: OGraphEvent[] = [
{ id: 1, type_hash: 'h1', type_name: 'task_created', payload: { id: 100 }, created_at: 1000 },
{ id: 2, type_hash: 'h2', type_name: 'task_updated', payload: { id: 101 }, created_at: 2000 },
{ id: 3, type_hash: 'h3', type_name: 'task_created', payload: { id: 102 }, created_at: 3000 },
];
const { newLastSeenEventId, filtered } = runEventDiff(1, events, undefined, pending);
expect(newLastSeenEventId).toBe(3);
expect(filtered).toHaveLength(2);
expect(filtered.map(e => e.id)).toEqual([2, 3]);
expect(pending.size).toBe(2);
});
it('filters events by eventTypes when configured', () => {
const events: OGraphEvent[] = [
{ id: 1, type_hash: 'h1', type_name: 'task_created', payload: { id: 100 }, created_at: 1000 },
{ id: 2, type_hash: 'h2', type_name: 'task_updated', payload: { id: 101 }, created_at: 2000 },
{ id: 3, type_hash: 'h3', type_name: 'user_login', payload: { user: 'alice' }, created_at: 3000 },
];
const { filtered } = runEventDiff(0, events, ['task_created', 'task_updated'], pending);
expect(filtered).toHaveLength(2);
expect(filtered.map(e => e.type_name)).toEqual(['task_created', 'task_updated']);
expect(pending.size).toBe(2);
});
it('handles empty event list gracefully', () => {
const { newLastSeenEventId, filtered } = runEventDiff(5, [], undefined, pending);
expect(newLastSeenEventId).toBe(5); // unchanged
expect(filtered).toHaveLength(0);
expect(pending.size).toBe(0);
});
it('creates pending entries with event metadata', () => {
const events: OGraphEvent[] = [
{ id: 10, type_hash: 'h1', type_name: 'task_created', payload: { id: 200, title: 'Test' }, created_at: 1000 },
];
runEventDiff(0, events, undefined, pending);
const entry = pending.get('event:task_created:10');
expect(entry).toBeDefined();
expect(entry!.name).toBe('event:task_created');
expect(entry!.currentValue).toEqual({ id: 200, title: 'Test' });
expect(entry!.previousValue).toBeNull();
expect(entry!.events).toHaveLength(1);
expect(entry!.events![0].id).toBe(10);
});
});