ograph/packages/dispatcher/test/ograph-client.test.ts
小墨 a3aa77eb31 feat: add event stream discovery mode for P0 Task system
- Add discovery config with agentId and optional eventTypes filtering
- Add fetchEvents method to OGraphClient for event stream access
- Update Watcher to support both projection and event stream modes
- Update Scheduler to format event stream messages appropriately
- Add comprehensive tests for event stream diff logic and API client
- Maintain backward compatibility with existing projection mode
- Support OGRAPH_AGENT_ID environment variable override

This enables P0 Task system to discover changes from event streams
without requiring Projection fixes (#19). Event stream mode activated
when config.discovery is present, otherwise falls back to projection mode.
2026-04-13 04:52:07 +00:00

155 lines
4.2 KiB
TypeScript

// Tests for OGraphClient event stream functionality
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import { OGraphClient } from '../src/ograph-client.js';
import type { DispatcherConfig, OGraphEvent } from '../src/types.js';
// Mock fetch globally
const fetchMock = vi.fn();
global.fetch = fetchMock;
function makeConfig(): DispatcherConfig {
return {
ograph: {
endpoint: 'https://test.example.com',
token: 'test-token',
projections: []
},
oc: {
statusEndpoint: 'http://localhost',
statusToken: 'tok',
minAvailable: 2
},
intervals: {
watcherIdle: 30_000,
watcherActive: 5_000,
schedulerIdle: 60_000,
schedulerActive: 5_000,
cooldownAfterPush: 60_000,
},
};
}
describe('OGraphClient.fetchEvents', () => {
let client: OGraphClient;
let config: DispatcherConfig;
beforeEach(() => {
config = makeConfig();
client = new OGraphClient(config);
fetchMock.mockClear();
});
afterEach(() => {
vi.restoreAllMocks();
});
it('fetches events for a ref and returns them in ascending order', async () => {
const mockResponse = {
events: [
{ id: 3, type_hash: 'h3', type_name: 'task_created', payload: { id: 102 }, created_at: 3000 },
{ id: 2, type_hash: 'h2', type_name: 'task_created', payload: { id: 101 }, created_at: 2000 },
{ id: 1, type_hash: 'h1', type_name: 'task_updated', payload: { id: 100 }, created_at: 1000 },
],
total: 3,
};
fetchMock.mockResolvedValueOnce({
ok: true,
headers: { get: () => 'application/json' },
json: () => Promise.resolve(mockResponse),
});
const result = await client.fetchEvents(123);
expect(fetchMock).toHaveBeenCalledWith(
'https://test.example.com/events?ref=123',
{
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer test-token',
},
}
);
expect(result).toHaveLength(3);
// Should be reversed to ascending order by id
expect(result.map(e => e.id)).toEqual([1, 2, 3]);
expect(result[0]).toEqual({
id: 1,
type_hash: 'h1',
type_name: 'task_updated',
payload: { id: 100 },
created_at: 1000
});
});
it('includes afterId parameter when provided', async () => {
fetchMock.mockResolvedValueOnce({
ok: true,
headers: { get: () => 'application/json' },
json: () => Promise.resolve({ events: [], total: 0 }),
});
await client.fetchEvents(456, 10);
expect(fetchMock).toHaveBeenCalledWith(
'https://test.example.com/events?ref=456&after=10',
expect.any(Object)
);
});
it('omits Authorization header when token is not configured', async () => {
config.ograph.token = undefined;
client = new OGraphClient(config);
fetchMock.mockResolvedValueOnce({
ok: true,
headers: { get: () => 'application/json' },
json: () => Promise.resolve({ events: [], total: 0 }),
});
await client.fetchEvents(789);
expect(fetchMock).toHaveBeenCalledWith(
'https://test.example.com/events?ref=789',
{
headers: {
'Content-Type': 'application/json',
},
}
);
});
it('handles empty events array', async () => {
fetchMock.mockResolvedValueOnce({
ok: true,
headers: { get: () => 'application/json' },
json: () => Promise.resolve({ events: [], total: 0 }),
});
const result = await client.fetchEvents(999);
expect(result).toEqual([]);
});
it('throws error on HTTP failure', async () => {
fetchMock.mockResolvedValueOnce({
ok: false,
status: 500,
headers: { get: () => 'application/json' },
json: () => Promise.resolve({ error: 'Internal server error' }),
});
await expect(client.fetchEvents(123)).rejects.toThrow('Internal server error');
});
it('throws error on non-JSON response', async () => {
fetchMock.mockResolvedValueOnce({
ok: true,
headers: { get: () => 'text/plain' },
text: () => Promise.resolve('Not JSON'),
});
await expect(client.fetchEvents(123)).rejects.toThrow('Non-JSON response for events');
});
});