- Add discovery config with agentId and optional eventTypes filtering - Add fetchEvents method to OGraphClient for event stream access - Update Watcher to support both projection and event stream modes - Update Scheduler to format event stream messages appropriately - Add comprehensive tests for event stream diff logic and API client - Maintain backward compatibility with existing projection mode - Support OGRAPH_AGENT_ID environment variable override This enables P0 Task system to discover changes from event streams without requiring Projection fixes (#19). Event stream mode activated when config.discovery is present, otherwise falls back to projection mode.
257 lines
8.9 KiB
TypeScript
257 lines
8.9 KiB
TypeScript
// Tests for watcher diff logic
|
|
// We test the core diffing behaviour by exercising the internals directly
|
|
// without needing to spin up a real OGraph server.
|
|
|
|
import { describe, it, expect, vi, beforeEach } from 'vitest';
|
|
import type { DispatcherConfig, PendingEntry, OGraphEvent } from '../src/types.js';
|
|
|
|
// ── helpers ──────────────────────────────────────────────────────────────────
|
|
|
|
function makeConfig(overrides: Partial<DispatcherConfig['intervals']> = {}): DispatcherConfig {
|
|
return {
|
|
ograph: { endpoint: 'http://localhost', token: undefined, projections: [] },
|
|
oc: { statusEndpoint: 'http://localhost', statusToken: 'tok', minAvailable: 2 },
|
|
intervals: {
|
|
watcherIdle: 30_000,
|
|
watcherActive: 5_000,
|
|
schedulerIdle: 60_000,
|
|
schedulerActive: 5_000,
|
|
cooldownAfterPush: 60_000,
|
|
...overrides,
|
|
},
|
|
};
|
|
}
|
|
|
|
// Minimal inline diff engine extracted from the watcher logic so we can unit-test
|
|
// it without I/O dependencies.
|
|
function runDiff(
|
|
snapshot: Map<string, unknown>,
|
|
current: Map<string, unknown>,
|
|
pending: Map<string, PendingEntry>,
|
|
): { changed: boolean } {
|
|
let changed = false;
|
|
const now = Date.now();
|
|
|
|
for (const [name, value] of current.entries()) {
|
|
if (!snapshot.has(name)) {
|
|
// first-run: initialise snapshot, no change
|
|
snapshot.set(name, value);
|
|
continue;
|
|
}
|
|
|
|
const prev = snapshot.get(name);
|
|
if (JSON.stringify(prev) !== JSON.stringify(value)) {
|
|
changed = true;
|
|
const existing = pending.get(name);
|
|
if (existing) {
|
|
existing.currentValue = value;
|
|
existing.lastDetectedAt = now;
|
|
existing.changeCount += 1;
|
|
} else {
|
|
pending.set(name, {
|
|
name,
|
|
previousValue: prev,
|
|
currentValue: value,
|
|
firstDetectedAt: now,
|
|
lastDetectedAt: now,
|
|
changeCount: 1,
|
|
});
|
|
}
|
|
snapshot.set(name, value);
|
|
}
|
|
}
|
|
|
|
// projections that disappeared
|
|
for (const name of snapshot.keys()) {
|
|
if (!current.has(name)) {
|
|
const prev = snapshot.get(name);
|
|
if (prev !== undefined) {
|
|
changed = true;
|
|
snapshot.set(name, undefined);
|
|
pending.set(name, {
|
|
name,
|
|
previousValue: prev,
|
|
currentValue: undefined,
|
|
firstDetectedAt: now,
|
|
lastDetectedAt: now,
|
|
changeCount: 1,
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
return { changed };
|
|
}
|
|
|
|
// ── tests ─────────────────────────────────────────────────────────────────────
|
|
|
|
describe('watcher diff logic', () => {
|
|
let snapshot: Map<string, unknown>;
|
|
let pending: Map<string, PendingEntry>;
|
|
|
|
beforeEach(() => {
|
|
snapshot = new Map();
|
|
pending = new Map();
|
|
});
|
|
|
|
it('first run: initialises snapshot without reporting changes', () => {
|
|
const current = new Map([['proj-a', { count: 1 }]]);
|
|
const { changed } = runDiff(snapshot, current, pending);
|
|
|
|
expect(changed).toBe(false);
|
|
expect(pending.size).toBe(0);
|
|
expect(snapshot.get('proj-a')).toEqual({ count: 1 });
|
|
});
|
|
|
|
it('second run: no diff when value unchanged', () => {
|
|
const current = new Map([['proj-a', { count: 1 }]]);
|
|
runDiff(snapshot, current, pending); // first run — init
|
|
const { changed } = runDiff(snapshot, current, pending); // second run — same value
|
|
|
|
expect(changed).toBe(false);
|
|
expect(pending.size).toBe(0);
|
|
});
|
|
|
|
it('detects a value change between runs', () => {
|
|
const v1 = new Map([['proj-a', { count: 1 }]]);
|
|
const v2 = new Map([['proj-a', { count: 2 }]]);
|
|
|
|
runDiff(snapshot, v1, pending); // init
|
|
const { changed } = runDiff(snapshot, v2, pending);
|
|
|
|
expect(changed).toBe(true);
|
|
expect(pending.has('proj-a')).toBe(true);
|
|
expect(pending.get('proj-a')!.previousValue).toEqual({ count: 1 });
|
|
expect(pending.get('proj-a')!.currentValue).toEqual({ count: 2 });
|
|
});
|
|
|
|
it('merges multiple changes into one pending entry', () => {
|
|
runDiff(snapshot, new Map([['proj-a', 1]]), pending); // init
|
|
runDiff(snapshot, new Map([['proj-a', 2]]), pending); // change 1
|
|
runDiff(snapshot, new Map([['proj-a', 3]]), pending); // change 2
|
|
|
|
expect(pending.get('proj-a')!.changeCount).toBe(2);
|
|
expect(pending.get('proj-a')!.currentValue).toBe(3);
|
|
expect(pending.get('proj-a')!.previousValue).toBe(1); // kept from first change
|
|
});
|
|
|
|
it('detects a disappeared projection', () => {
|
|
runDiff(snapshot, new Map([['proj-a', 42]]), pending); // init
|
|
const { changed } = runDiff(snapshot, new Map(), pending); // proj-a gone
|
|
|
|
expect(changed).toBe(true);
|
|
expect(pending.get('proj-a')!.currentValue).toBeUndefined();
|
|
});
|
|
|
|
it('does not re-fire when a disappeared projection stays absent', () => {
|
|
runDiff(snapshot, new Map([['proj-a', 42]]), pending); // init
|
|
runDiff(snapshot, new Map(), pending); // gone → change
|
|
pending.clear();
|
|
const { changed } = runDiff(snapshot, new Map(), pending); // still gone → no change
|
|
|
|
expect(changed).toBe(false);
|
|
expect(pending.size).toBe(0);
|
|
});
|
|
});
|
|
|
|
// ── Event stream diff logic tests ───────────────────────────────────────────────────────────────────
|
|
|
|
// Helper function for testing event stream diff logic
|
|
function runEventDiff(
|
|
lastSeenEventId: number,
|
|
events: OGraphEvent[],
|
|
eventTypes: string[] | undefined,
|
|
pending: Map<string, PendingEntry>,
|
|
): { newLastSeenEventId: number; filtered: OGraphEvent[] } {
|
|
// 过滤:只取 id > lastSeenEventId 的(client-side,因为 API 可能不支持 after)
|
|
const newEvents = events.filter(e => e.id > lastSeenEventId);
|
|
|
|
// 按 eventTypes 过滤(如果配置了)
|
|
const filtered = eventTypes?.length
|
|
? newEvents.filter(e => eventTypes.includes(e.type_name))
|
|
: newEvents;
|
|
|
|
let newLastSeenEventId = lastSeenEventId;
|
|
if (filtered.length > 0) {
|
|
// 更新 lastSeenEventId
|
|
newLastSeenEventId = Math.max(...filtered.map(e => e.id));
|
|
|
|
// 合并到 pending queue
|
|
for (const event of filtered) {
|
|
const key = `event:${event.type_name}:${event.id}`;
|
|
pending.set(key, {
|
|
name: `event:${event.type_name}`,
|
|
previousValue: null,
|
|
currentValue: event.payload,
|
|
firstDetectedAt: Date.now(),
|
|
lastDetectedAt: Date.now(),
|
|
changeCount: 1,
|
|
events: [event],
|
|
});
|
|
}
|
|
}
|
|
|
|
return { newLastSeenEventId, filtered };
|
|
}
|
|
|
|
describe('event stream diff logic', () => {
|
|
let pending: Map<string, PendingEntry>;
|
|
|
|
beforeEach(() => {
|
|
pending = new Map();
|
|
});
|
|
|
|
it('filters events by lastSeenEventId', () => {
|
|
const events: OGraphEvent[] = [
|
|
{ id: 1, type_hash: 'h1', type_name: 'task_created', payload: { id: 100 }, created_at: 1000 },
|
|
{ id: 2, type_hash: 'h2', type_name: 'task_updated', payload: { id: 101 }, created_at: 2000 },
|
|
{ id: 3, type_hash: 'h3', type_name: 'task_created', payload: { id: 102 }, created_at: 3000 },
|
|
];
|
|
|
|
const { newLastSeenEventId, filtered } = runEventDiff(1, events, undefined, pending);
|
|
|
|
expect(newLastSeenEventId).toBe(3);
|
|
expect(filtered).toHaveLength(2);
|
|
expect(filtered.map(e => e.id)).toEqual([2, 3]);
|
|
expect(pending.size).toBe(2);
|
|
});
|
|
|
|
it('filters events by eventTypes when configured', () => {
|
|
const events: OGraphEvent[] = [
|
|
{ id: 1, type_hash: 'h1', type_name: 'task_created', payload: { id: 100 }, created_at: 1000 },
|
|
{ id: 2, type_hash: 'h2', type_name: 'task_updated', payload: { id: 101 }, created_at: 2000 },
|
|
{ id: 3, type_hash: 'h3', type_name: 'user_login', payload: { user: 'alice' }, created_at: 3000 },
|
|
];
|
|
|
|
const { filtered } = runEventDiff(0, events, ['task_created', 'task_updated'], pending);
|
|
|
|
expect(filtered).toHaveLength(2);
|
|
expect(filtered.map(e => e.type_name)).toEqual(['task_created', 'task_updated']);
|
|
expect(pending.size).toBe(2);
|
|
});
|
|
|
|
it('handles empty event list gracefully', () => {
|
|
const { newLastSeenEventId, filtered } = runEventDiff(5, [], undefined, pending);
|
|
|
|
expect(newLastSeenEventId).toBe(5); // unchanged
|
|
expect(filtered).toHaveLength(0);
|
|
expect(pending.size).toBe(0);
|
|
});
|
|
|
|
it('creates pending entries with event metadata', () => {
|
|
const events: OGraphEvent[] = [
|
|
{ id: 10, type_hash: 'h1', type_name: 'task_created', payload: { id: 200, title: 'Test' }, created_at: 1000 },
|
|
];
|
|
|
|
runEventDiff(0, events, undefined, pending);
|
|
|
|
const entry = pending.get('event:task_created:10');
|
|
expect(entry).toBeDefined();
|
|
expect(entry!.name).toBe('event:task_created');
|
|
expect(entry!.currentValue).toEqual({ id: 200, title: 'Test' });
|
|
expect(entry!.previousValue).toBeNull();
|
|
expect(entry!.events).toHaveLength(1);
|
|
expect(entry!.events![0].id).toBe(10);
|
|
});
|
|
});
|