diff --git a/packages/cli/src/client.ts b/packages/cli/src/client.ts index 789a936..a0710a6 100644 --- a/packages/cli/src/client.ts +++ b/packages/cli/src/client.ts @@ -233,8 +233,12 @@ export class OGraphClient { return this.request(`/events/${id}`) } - async findEventsByRef(ref: number): Promise { - const res = await this.request<{ events: OEvent[] }>(`/events?ref=${ref}`) + async findEventsByRef(ref?: number, after?: number): Promise { + const params = new URLSearchParams() + if (ref !== undefined) params.set('ref', String(ref)) + if (after !== undefined) params.set('after', String(after)) + const qs = params.toString() + const res = await this.request<{ events: OEvent[] }>(`/events${qs ? '?' + qs : ''}`) return res.events } diff --git a/packages/cli/src/commands/events.ts b/packages/cli/src/commands/events.ts index a4b6a98..77dc74e 100644 --- a/packages/cli/src/commands/events.ts +++ b/packages/cli/src/commands/events.ts @@ -77,13 +77,21 @@ export function createEventsCommand(): Command { // find const find = new Command('find') find.description('Find events by object reference') - find.requiredOption('--ref ', 'Object ID to find related events') + find.option('--ref ', 'Object ID to find related events') + find.option('--after ', 'Only return events with id > this value (incremental query)') find.option('--json', 'output raw JSON') - find.action(async (opts: { ref: string; json?: boolean }) => { + find.action(async (opts: { ref?: string; after?: string; json?: boolean }) => { const client = new OGraphClient() try { await client.init() - const events = await client.findEventsByRef(parseInt(opts.ref, 10)) + const ref = opts.ref ? parseInt(opts.ref, 10) : undefined + const after = opts.after ? parseInt(opts.after, 10) : undefined + if (!ref && !after) { + fail('At least one of --ref or --after is required') + process.exit(1) + return + } + const events = await client.findEventsByRef(ref, after) if (opts.json) { console.log(JSON.stringify(events, null, 2)) return diff --git a/packages/engine/src/engine.ts b/packages/engine/src/engine.ts index 490a10d..aaf5a19 100644 --- a/packages/engine/src/engine.ts +++ b/packages/engine/src/engine.ts @@ -340,9 +340,29 @@ export async function findEventsByRef( refId?: number, limit = 50, offset = 0, + afterId?: number, ): Promise<{ events: Event[]; total: number }> { let countQuery, dataQuery - if (refId !== undefined) { + if (refId !== undefined && afterId !== undefined) { + countQuery = db + .prepare( + `SELECT COUNT(DISTINCT e.id) as count + FROM events e + JOIN event_refs er ON e.id = er.event_id + WHERE er.ref_id = ? AND e.id > ?`, + ) + .bind(refId, afterId) + dataQuery = db + .prepare( + `SELECT DISTINCT e.id, e.type_hash, e.payload, e.created_at + FROM events e + JOIN event_refs er ON e.id = er.event_id + WHERE er.ref_id = ? AND e.id > ? + ORDER BY e.id ASC + LIMIT ? OFFSET ?`, + ) + .bind(refId, afterId, limit, offset) + } else if (refId !== undefined) { countQuery = db .prepare( `SELECT COUNT(DISTINCT e.id) as count @@ -361,6 +381,13 @@ export async function findEventsByRef( LIMIT ? OFFSET ?`, ) .bind(refId, limit, offset) + } else if (afterId !== undefined) { + countQuery = db + .prepare('SELECT COUNT(*) as count FROM events WHERE id > ?') + .bind(afterId) + dataQuery = db + .prepare('SELECT id, type_hash, payload, created_at FROM events WHERE id > ? ORDER BY id ASC LIMIT ? OFFSET ?') + .bind(afterId, limit, offset) } else { countQuery = db.prepare('SELECT COUNT(*) as count FROM events') dataQuery = db diff --git a/packages/engine/src/index.test.ts b/packages/engine/src/index.test.ts index d4621ad..ca67b82 100644 --- a/packages/engine/src/index.test.ts +++ b/packages/engine/src/index.test.ts @@ -1440,6 +1440,64 @@ describe('Error Cases: Events', () => { }) }) +describe('Incremental Query: GET /events?after=N', () => { + beforeEach(async () => { + await app.fetch(req('POST', '/object-defs', { name: 'item', schema: {} }), { DB: db, API_TOKEN: API_TOKEN }) + const schema = { properties: { subject: { type: 'ref' as const, object_type: 'item' }, note: { type: 'string' as const } } } + await app.fetch(req('POST', '/event-defs', { name: 'item_noted', schema }), { DB: db, API_TOKEN: API_TOKEN }) + }) + + it('GET /events?after=N returns only events with id > N', async () => { + const objRes = await app.fetch(req('POST', '/objects', { type: 'item' }), { DB: db, API_TOKEN: API_TOKEN }) + const itemId = ((await objRes.json()) as any).id + + // Emit 3 events + const eventIds: number[] = [] + for (let i = 0; i < 3; i++) { + const r = await app.fetch( + req('POST', '/events', { type: 'item_noted', payload: { subject: itemId, note: `note${i}` } }), + { DB: db, API_TOKEN: API_TOKEN }, + ) + const j = (await r.json()) as any + eventIds.push(j.event.id) + } + + // Query after first event + const res = await app.fetch(req('GET', `/events?after=${eventIds[0]}`), { DB: db, API_TOKEN: API_TOKEN }) + expect(res.status).toBe(200) + const json = (await res.json()) as any + expect(json.total).toBe(2) + expect(json.events.length).toBe(2) + // Should be ASC order + expect(json.events[0].id).toBe(eventIds[1]) + expect(json.events[1].id).toBe(eventIds[2]) + }) + + it('GET /events?ref=X&after=N returns only ref events with id > N', async () => { + const objRes = await app.fetch(req('POST', '/objects', { type: 'item' }), { DB: db, API_TOKEN: API_TOKEN }) + const itemId = ((await objRes.json()) as any).id + + // Emit 3 events for this item + const eventIds: number[] = [] + for (let i = 0; i < 3; i++) { + const r = await app.fetch( + req('POST', '/events', { type: 'item_noted', payload: { subject: itemId, note: `v${i}` } }), + { DB: db, API_TOKEN: API_TOKEN }, + ) + const j = (await r.json()) as any + eventIds.push(j.event.id) + } + + // Query ref + after + const res = await app.fetch(req('GET', `/events?ref=${itemId}&after=${eventIds[0]}`), { DB: db, API_TOKEN: API_TOKEN }) + expect(res.status).toBe(200) + const json = (await res.json()) as any + expect(json.total).toBe(2) + expect(json.events.length).toBe(2) + expect(json.events[0].id).toBe(eventIds[1]) + }) +}) + describe('Error Cases: Projection Defs', () => { beforeEach(async () => { const schema = { properties: { field: { type: 'string' as const } } } diff --git a/packages/engine/src/index.ts b/packages/engine/src/index.ts index 8afc369..63cdef6 100644 --- a/packages/engine/src/index.ts +++ b/packages/engine/src/index.ts @@ -296,13 +296,15 @@ app.get('/events/:id', async (c) => { app.get('/events', async (c) => { const refParam = c.req.query('ref') const refId = refParam ? parseInt(refParam, 10) : undefined + const afterParam = c.req.query('after') + const afterId = afterParam ? parseInt(afterParam, 10) : undefined const limitParam = c.req.query('limit') const offsetParam = c.req.query('offset') const limit = Math.min(parseInt(limitParam || '50', 10), 200) const offset = parseInt(offsetParam || '0', 10) - const result = await findEventsByRef(c.env.DB, refId, limit, offset) + const result = await findEventsByRef(c.env.DB, refId, limit, offset, afterId) return c.json(result) })