// Tests for watcher diff logic // We test the core diffing behaviour by exercising the internals directly // without needing to spin up a real OGraph server. import { describe, it, expect, vi, beforeEach } from 'vitest'; import type { DispatcherConfig, PendingEntry, OGraphEvent } from '../src/types.js'; // ── helpers ────────────────────────────────────────────────────────────────── function makeConfig(overrides: Partial = {}): DispatcherConfig { return { ograph: { endpoint: 'http://localhost', token: undefined, projections: [] }, oc: { statusEndpoint: 'http://localhost', statusToken: 'tok', minAvailable: 2 }, intervals: { watcherIdle: 30_000, watcherActive: 5_000, schedulerIdle: 60_000, schedulerActive: 5_000, cooldownAfterPush: 60_000, ...overrides, }, }; } // Minimal inline diff engine extracted from the watcher logic so we can unit-test // it without I/O dependencies. function runDiff( snapshot: Map, current: Map, pending: Map, ): { changed: boolean } { let changed = false; const now = Date.now(); for (const [name, value] of current.entries()) { if (!snapshot.has(name)) { // first-run: initialise snapshot, no change snapshot.set(name, value); continue; } const prev = snapshot.get(name); if (JSON.stringify(prev) !== JSON.stringify(value)) { changed = true; const existing = pending.get(name); if (existing) { existing.currentValue = value; existing.lastDetectedAt = now; existing.changeCount += 1; } else { pending.set(name, { name, previousValue: prev, currentValue: value, firstDetectedAt: now, lastDetectedAt: now, changeCount: 1, }); } snapshot.set(name, value); } } // projections that disappeared for (const name of snapshot.keys()) { if (!current.has(name)) { const prev = snapshot.get(name); if (prev !== undefined) { changed = true; snapshot.set(name, undefined); pending.set(name, { name, previousValue: prev, currentValue: undefined, firstDetectedAt: now, lastDetectedAt: now, changeCount: 1, }); } } } return { changed }; } // ── tests ───────────────────────────────────────────────────────────────────── describe('watcher diff logic', () => { let snapshot: Map; let pending: Map; beforeEach(() => { snapshot = new Map(); pending = new Map(); }); it('first run: initialises snapshot without reporting changes', () => { const current = new Map([['proj-a', { count: 1 }]]); const { changed } = runDiff(snapshot, current, pending); expect(changed).toBe(false); expect(pending.size).toBe(0); expect(snapshot.get('proj-a')).toEqual({ count: 1 }); }); it('second run: no diff when value unchanged', () => { const current = new Map([['proj-a', { count: 1 }]]); runDiff(snapshot, current, pending); // first run — init const { changed } = runDiff(snapshot, current, pending); // second run — same value expect(changed).toBe(false); expect(pending.size).toBe(0); }); it('detects a value change between runs', () => { const v1 = new Map([['proj-a', { count: 1 }]]); const v2 = new Map([['proj-a', { count: 2 }]]); runDiff(snapshot, v1, pending); // init const { changed } = runDiff(snapshot, v2, pending); expect(changed).toBe(true); expect(pending.has('proj-a')).toBe(true); expect(pending.get('proj-a')!.previousValue).toEqual({ count: 1 }); expect(pending.get('proj-a')!.currentValue).toEqual({ count: 2 }); }); it('merges multiple changes into one pending entry', () => { runDiff(snapshot, new Map([['proj-a', 1]]), pending); // init runDiff(snapshot, new Map([['proj-a', 2]]), pending); // change 1 runDiff(snapshot, new Map([['proj-a', 3]]), pending); // change 2 expect(pending.get('proj-a')!.changeCount).toBe(2); expect(pending.get('proj-a')!.currentValue).toBe(3); expect(pending.get('proj-a')!.previousValue).toBe(1); // kept from first change }); it('detects a disappeared projection', () => { runDiff(snapshot, new Map([['proj-a', 42]]), pending); // init const { changed } = runDiff(snapshot, new Map(), pending); // proj-a gone expect(changed).toBe(true); expect(pending.get('proj-a')!.currentValue).toBeUndefined(); }); it('does not re-fire when a disappeared projection stays absent', () => { runDiff(snapshot, new Map([['proj-a', 42]]), pending); // init runDiff(snapshot, new Map(), pending); // gone → change pending.clear(); const { changed } = runDiff(snapshot, new Map(), pending); // still gone → no change expect(changed).toBe(false); expect(pending.size).toBe(0); }); }); // ── Event stream diff logic tests ─────────────────────────────────────────────────────────────────── // Helper function for testing event stream diff logic function runEventDiff( lastSeenEventId: number, events: OGraphEvent[], eventTypes: string[] | undefined, pending: Map, ): { newLastSeenEventId: number; filtered: OGraphEvent[] } { // 过滤:只取 id > lastSeenEventId 的(client-side,因为 API 可能不支持 after) const newEvents = events.filter(e => e.id > lastSeenEventId); // 按 eventTypes 过滤(如果配置了) const filtered = eventTypes?.length ? newEvents.filter(e => eventTypes.includes(e.type_name)) : newEvents; let newLastSeenEventId = lastSeenEventId; if (filtered.length > 0) { // 更新 lastSeenEventId newLastSeenEventId = Math.max(...filtered.map(e => e.id)); // 合并到 pending queue for (const event of filtered) { const key = `event:${event.type_name}:${event.id}`; pending.set(key, { name: `event:${event.type_name}`, previousValue: null, currentValue: event.payload, firstDetectedAt: Date.now(), lastDetectedAt: Date.now(), changeCount: 1, events: [event], }); } } return { newLastSeenEventId, filtered }; } describe('event stream diff logic', () => { let pending: Map; beforeEach(() => { pending = new Map(); }); it('filters events by lastSeenEventId', () => { const events: OGraphEvent[] = [ { id: 1, type_hash: 'h1', type_name: 'task_created', payload: { id: 100 }, created_at: 1000 }, { id: 2, type_hash: 'h2', type_name: 'task_updated', payload: { id: 101 }, created_at: 2000 }, { id: 3, type_hash: 'h3', type_name: 'task_created', payload: { id: 102 }, created_at: 3000 }, ]; const { newLastSeenEventId, filtered } = runEventDiff(1, events, undefined, pending); expect(newLastSeenEventId).toBe(3); expect(filtered).toHaveLength(2); expect(filtered.map(e => e.id)).toEqual([2, 3]); expect(pending.size).toBe(2); }); it('filters events by eventTypes when configured', () => { const events: OGraphEvent[] = [ { id: 1, type_hash: 'h1', type_name: 'task_created', payload: { id: 100 }, created_at: 1000 }, { id: 2, type_hash: 'h2', type_name: 'task_updated', payload: { id: 101 }, created_at: 2000 }, { id: 3, type_hash: 'h3', type_name: 'user_login', payload: { user: 'alice' }, created_at: 3000 }, ]; const { filtered } = runEventDiff(0, events, ['task_created', 'task_updated'], pending); expect(filtered).toHaveLength(2); expect(filtered.map(e => e.type_name)).toEqual(['task_created', 'task_updated']); expect(pending.size).toBe(2); }); it('handles empty event list gracefully', () => { const { newLastSeenEventId, filtered } = runEventDiff(5, [], undefined, pending); expect(newLastSeenEventId).toBe(5); // unchanged expect(filtered).toHaveLength(0); expect(pending.size).toBe(0); }); it('creates pending entries with event metadata', () => { const events: OGraphEvent[] = [ { id: 10, type_hash: 'h1', type_name: 'task_created', payload: { id: 200, title: 'Test' }, created_at: 1000 }, ]; runEventDiff(0, events, undefined, pending); const entry = pending.get('event:task_created:10'); expect(entry).toBeDefined(); expect(entry!.name).toBe('event:task_created'); expect(entry!.currentValue).toEqual({ id: 200, title: 'Test' }); expect(entry!.previousValue).toBeNull(); expect(entry!.events).toHaveLength(1); expect(entry!.events![0].id).toBe(10); }); });