ograph/packages/dispatcher/test/watcher.test.ts
小橘 🍊 e82fe8eaba
feat: OGraph Dispatcher — dual-loop actor for task notification (#4 P0) (#17)
* feat: add packages/dispatcher — dual-loop OGraph projection watcher + OC scheduler

Adds a new Node.js daemon that:
- Loop A (ProjectionWatcher): polls OGraph projections, diffs against
  snapshot, merges changes into a pending queue.
  - Idle: 30s poll interval; active (changes detected): 5s
- Loop B (OcScheduler): polls OC session-status, pushes pending queue
  when OC has available slots (>= minAvailable).
  - Idle (no pending): 60s; active (pending): 5s
  - Cooldown of 60s after each push to avoid spam

Tech:
- TypeScript + esbuild (zero runtime external deps)
- Graceful error handling: each poll is independent try-catch, errors
  logged but never crash the process
- Config from ~/.config/ograph/dispatcher.json + env-var overrides
- OGRAPH_CONFIG_FILE env var for config path override
- Push via /tmp/ograph-dispatch.json + openclaw message send (best-effort)

Build: npm run build → dist/index.js
Run:   node dist/index.js

* fix: address PR #17 review — package name, tests, shell safety, first-run

---------

Co-authored-by: 小墨 <xiaomooo@shazhou.work>
2026-04-13 10:01:48 +08:00

156 lines
5.2 KiB
TypeScript

// Tests for watcher diff logic
// We test the core diffing behaviour by exercising the internals directly
// without needing to spin up a real OGraph server.
import { describe, it, expect, vi, beforeEach } from 'vitest';
import type { DispatcherConfig, PendingEntry } from '../src/types.js';
// ── helpers ──────────────────────────────────────────────────────────────────
function makeConfig(overrides: Partial<DispatcherConfig['intervals']> = {}): DispatcherConfig {
return {
ograph: { endpoint: 'http://localhost', token: undefined, projections: [] },
oc: { statusEndpoint: 'http://localhost', statusToken: 'tok', minAvailable: 2 },
intervals: {
watcherIdle: 30_000,
watcherActive: 5_000,
schedulerIdle: 60_000,
schedulerActive: 5_000,
cooldownAfterPush: 60_000,
...overrides,
},
};
}
// Minimal inline diff engine extracted from the watcher logic so we can unit-test
// it without I/O dependencies.
function runDiff(
snapshot: Map<string, unknown>,
current: Map<string, unknown>,
pending: Map<string, PendingEntry>,
): { changed: boolean } {
let changed = false;
const now = Date.now();
for (const [name, value] of current.entries()) {
if (!snapshot.has(name)) {
// first-run: initialise snapshot, no change
snapshot.set(name, value);
continue;
}
const prev = snapshot.get(name);
if (JSON.stringify(prev) !== JSON.stringify(value)) {
changed = true;
const existing = pending.get(name);
if (existing) {
existing.currentValue = value;
existing.lastDetectedAt = now;
existing.changeCount += 1;
} else {
pending.set(name, {
name,
previousValue: prev,
currentValue: value,
firstDetectedAt: now,
lastDetectedAt: now,
changeCount: 1,
});
}
snapshot.set(name, value);
}
}
// projections that disappeared
for (const name of snapshot.keys()) {
if (!current.has(name)) {
const prev = snapshot.get(name);
if (prev !== undefined) {
changed = true;
snapshot.set(name, undefined);
pending.set(name, {
name,
previousValue: prev,
currentValue: undefined,
firstDetectedAt: now,
lastDetectedAt: now,
changeCount: 1,
});
}
}
}
return { changed };
}
// ── tests ─────────────────────────────────────────────────────────────────────
describe('watcher diff logic', () => {
let snapshot: Map<string, unknown>;
let pending: Map<string, PendingEntry>;
beforeEach(() => {
snapshot = new Map();
pending = new Map();
});
it('first run: initialises snapshot without reporting changes', () => {
const current = new Map([['proj-a', { count: 1 }]]);
const { changed } = runDiff(snapshot, current, pending);
expect(changed).toBe(false);
expect(pending.size).toBe(0);
expect(snapshot.get('proj-a')).toEqual({ count: 1 });
});
it('second run: no diff when value unchanged', () => {
const current = new Map([['proj-a', { count: 1 }]]);
runDiff(snapshot, current, pending); // first run — init
const { changed } = runDiff(snapshot, current, pending); // second run — same value
expect(changed).toBe(false);
expect(pending.size).toBe(0);
});
it('detects a value change between runs', () => {
const v1 = new Map([['proj-a', { count: 1 }]]);
const v2 = new Map([['proj-a', { count: 2 }]]);
runDiff(snapshot, v1, pending); // init
const { changed } = runDiff(snapshot, v2, pending);
expect(changed).toBe(true);
expect(pending.has('proj-a')).toBe(true);
expect(pending.get('proj-a')!.previousValue).toEqual({ count: 1 });
expect(pending.get('proj-a')!.currentValue).toEqual({ count: 2 });
});
it('merges multiple changes into one pending entry', () => {
runDiff(snapshot, new Map([['proj-a', 1]]), pending); // init
runDiff(snapshot, new Map([['proj-a', 2]]), pending); // change 1
runDiff(snapshot, new Map([['proj-a', 3]]), pending); // change 2
expect(pending.get('proj-a')!.changeCount).toBe(2);
expect(pending.get('proj-a')!.currentValue).toBe(3);
expect(pending.get('proj-a')!.previousValue).toBe(1); // kept from first change
});
it('detects a disappeared projection', () => {
runDiff(snapshot, new Map([['proj-a', 42]]), pending); // init
const { changed } = runDiff(snapshot, new Map(), pending); // proj-a gone
expect(changed).toBe(true);
expect(pending.get('proj-a')!.currentValue).toBeUndefined();
});
it('does not re-fire when a disappeared projection stays absent', () => {
runDiff(snapshot, new Map([['proj-a', 42]]), pending); // init
runDiff(snapshot, new Map(), pending); // gone → change
pending.clear();
const { changed } = runDiff(snapshot, new Map(), pending); // still gone → no change
expect(changed).toBe(false);
expect(pending.size).toBe(0);
});
});