feat: GET /events?after=N incremental query support (#26)

- Engine: findEventsByRef accepts optional afterId parameter
- SQL: WHERE id > ? with ASC ordering for after queries
- Route: GET /events?after=N, combinable with ?ref=X
- CLI: events find --after <id> option
- Tests: 2 new tests (engine 105 total)

Closes #26
小橘 🍊(NEKO Team)
This commit is contained in:
小橘 2026-04-13 08:09:09 +00:00
parent 752f765132
commit a19b144de7
5 changed files with 106 additions and 7 deletions

View File

@ -233,8 +233,12 @@ export class OGraphClient {
return this.request<OEvent>(`/events/${id}`)
}
async findEventsByRef(ref: number): Promise<OEvent[]> {
const res = await this.request<{ events: OEvent[] }>(`/events?ref=${ref}`)
async findEventsByRef(ref?: number, after?: number): Promise<OEvent[]> {
const params = new URLSearchParams()
if (ref !== undefined) params.set('ref', String(ref))
if (after !== undefined) params.set('after', String(after))
const qs = params.toString()
const res = await this.request<{ events: OEvent[] }>(`/events${qs ? '?' + qs : ''}`)
return res.events
}

View File

@ -77,13 +77,21 @@ export function createEventsCommand(): Command {
// find
const find = new Command('find')
find.description('Find events by object reference')
find.requiredOption('--ref <object-id>', 'Object ID to find related events')
find.option('--ref <object-id>', 'Object ID to find related events')
find.option('--after <event-id>', 'Only return events with id > this value (incremental query)')
find.option('--json', 'output raw JSON')
find.action(async (opts: { ref: string; json?: boolean }) => {
find.action(async (opts: { ref?: string; after?: string; json?: boolean }) => {
const client = new OGraphClient()
try {
await client.init()
const events = await client.findEventsByRef(parseInt(opts.ref, 10))
const ref = opts.ref ? parseInt(opts.ref, 10) : undefined
const after = opts.after ? parseInt(opts.after, 10) : undefined
if (!ref && !after) {
fail('At least one of --ref or --after is required')
process.exit(1)
return
}
const events = await client.findEventsByRef(ref, after)
if (opts.json) {
console.log(JSON.stringify(events, null, 2))
return

View File

@ -340,9 +340,29 @@ export async function findEventsByRef(
refId?: number,
limit = 50,
offset = 0,
afterId?: number,
): Promise<{ events: Event[]; total: number }> {
let countQuery, dataQuery
if (refId !== undefined) {
if (refId !== undefined && afterId !== undefined) {
countQuery = db
.prepare(
`SELECT COUNT(DISTINCT e.id) as count
FROM events e
JOIN event_refs er ON e.id = er.event_id
WHERE er.ref_id = ? AND e.id > ?`,
)
.bind(refId, afterId)
dataQuery = db
.prepare(
`SELECT DISTINCT e.id, e.type_hash, e.payload, e.created_at
FROM events e
JOIN event_refs er ON e.id = er.event_id
WHERE er.ref_id = ? AND e.id > ?
ORDER BY e.id ASC
LIMIT ? OFFSET ?`,
)
.bind(refId, afterId, limit, offset)
} else if (refId !== undefined) {
countQuery = db
.prepare(
`SELECT COUNT(DISTINCT e.id) as count
@ -361,6 +381,13 @@ export async function findEventsByRef(
LIMIT ? OFFSET ?`,
)
.bind(refId, limit, offset)
} else if (afterId !== undefined) {
countQuery = db
.prepare('SELECT COUNT(*) as count FROM events WHERE id > ?')
.bind(afterId)
dataQuery = db
.prepare('SELECT id, type_hash, payload, created_at FROM events WHERE id > ? ORDER BY id ASC LIMIT ? OFFSET ?')
.bind(afterId, limit, offset)
} else {
countQuery = db.prepare('SELECT COUNT(*) as count FROM events')
dataQuery = db

View File

@ -1440,6 +1440,64 @@ describe('Error Cases: Events', () => {
})
})
describe('Incremental Query: GET /events?after=N', () => {
beforeEach(async () => {
await app.fetch(req('POST', '/object-defs', { name: 'item', schema: {} }), { DB: db, API_TOKEN: API_TOKEN })
const schema = { properties: { subject: { type: 'ref' as const, object_type: 'item' }, note: { type: 'string' as const } } }
await app.fetch(req('POST', '/event-defs', { name: 'item_noted', schema }), { DB: db, API_TOKEN: API_TOKEN })
})
it('GET /events?after=N returns only events with id > N', async () => {
const objRes = await app.fetch(req('POST', '/objects', { type: 'item' }), { DB: db, API_TOKEN: API_TOKEN })
const itemId = ((await objRes.json()) as any).id
// Emit 3 events
const eventIds: number[] = []
for (let i = 0; i < 3; i++) {
const r = await app.fetch(
req('POST', '/events', { type: 'item_noted', payload: { subject: itemId, note: `note${i}` } }),
{ DB: db, API_TOKEN: API_TOKEN },
)
const j = (await r.json()) as any
eventIds.push(j.event.id)
}
// Query after first event
const res = await app.fetch(req('GET', `/events?after=${eventIds[0]}`), { DB: db, API_TOKEN: API_TOKEN })
expect(res.status).toBe(200)
const json = (await res.json()) as any
expect(json.total).toBe(2)
expect(json.events.length).toBe(2)
// Should be ASC order
expect(json.events[0].id).toBe(eventIds[1])
expect(json.events[1].id).toBe(eventIds[2])
})
it('GET /events?ref=X&after=N returns only ref events with id > N', async () => {
const objRes = await app.fetch(req('POST', '/objects', { type: 'item' }), { DB: db, API_TOKEN: API_TOKEN })
const itemId = ((await objRes.json()) as any).id
// Emit 3 events for this item
const eventIds: number[] = []
for (let i = 0; i < 3; i++) {
const r = await app.fetch(
req('POST', '/events', { type: 'item_noted', payload: { subject: itemId, note: `v${i}` } }),
{ DB: db, API_TOKEN: API_TOKEN },
)
const j = (await r.json()) as any
eventIds.push(j.event.id)
}
// Query ref + after
const res = await app.fetch(req('GET', `/events?ref=${itemId}&after=${eventIds[0]}`), { DB: db, API_TOKEN: API_TOKEN })
expect(res.status).toBe(200)
const json = (await res.json()) as any
expect(json.total).toBe(2)
expect(json.events.length).toBe(2)
expect(json.events[0].id).toBe(eventIds[1])
})
})
describe('Error Cases: Projection Defs', () => {
beforeEach(async () => {
const schema = { properties: { field: { type: 'string' as const } } }

View File

@ -296,13 +296,15 @@ app.get('/events/:id', async (c) => {
app.get('/events', async (c) => {
const refParam = c.req.query('ref')
const refId = refParam ? parseInt(refParam, 10) : undefined
const afterParam = c.req.query('after')
const afterId = afterParam ? parseInt(afterParam, 10) : undefined
const limitParam = c.req.query('limit')
const offsetParam = c.req.query('offset')
const limit = Math.min(parseInt(limitParam || '50', 10), 200)
const offset = parseInt(offsetParam || '0', 10)
const result = await findEventsByRef(c.env.DB, refId, limit, offset)
const result = await findEventsByRef(c.env.DB, refId, limit, offset, afterId)
return c.json(result)
})