/** * OGraph Gateway + Engine (unified Worker) * * Route classification: * - PUBLIC: GET /health — no auth required * - EXTERNAL: POST /events — API key (Bearer) or API_TOKEN * - ADMIN: Everything else — API_TOKEN only * - INTERNAL: Reaction handler execution — engine internal only */ import { Hono } from 'hono' import { cors } from 'hono/cors' import { bearerAuth } from './auth' import UI_HTML from './ui.html' import { createObjectDef, listObjectDefs, getObjectDef, createObject, getObject, listObjects, createEventDef, listEventDefs, getEventDef, createEvent, getEvent, findEventsByRef, createProjectionDef, listProjectionDefs, getProjectionDef, getProjection, createReaction, listReactions, getReaction, deleteReaction, listReactionLogs, createApiKey, listApiKeys, deleteApiKey, validateApiKey, getSchema, ValidationError, } from './engine' import type { CreateObjectDefRequest, CreateObjectRequest, CreateEventDefRequest, CreateEventRequest, CreateProjectionDefRequest, CreateReactionRequest, CreateApiKeyRequest, ReactionPayload, } from './types' import { ErrorCode } from './types' type Bindings = { DB: D1Database API_TOKEN: string } type Variables = { apiKeyId: number | null apiKeyName: string | null } const app = new Hono<{ Bindings: Bindings; Variables: Variables }>() function apiError(c: any, status: number, code: string, message: string) { return c.json({ error: { code, message } }, status) } app.use('*', cors()) app.use('*', async (c, next) => { const start = Date.now() await next() const duration = Date.now() - start const path = new URL(c.req.url).pathname if (path === '/health' || path.startsWith('/ui')) return try { const SEVEN_DAYS_MS = 7 * 24 * 60 * 60 * 1000 const cutoff = Date.now() - SEVEN_DAYS_MS c.executionCtx.waitUntil( Promise.all([ c.env.DB.prepare( 'INSERT INTO request_logs (method, path, api_key_id, api_key_name, status_code, error, duration_ms, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)', ) .bind( c.req.method, path, c.get('apiKeyId') || null, c.get('apiKeyName') || null, c.res.status, c.res.status >= 400 ? await c.res .clone() .text() .catch(() => null) : null, duration, Date.now(), ) .run(), // Cleanup logs older than 7 days c.env.DB.prepare('DELETE FROM request_logs WHERE created_at < ?').bind(cutoff).run(), c.env.DB.prepare('DELETE FROM reaction_logs WHERE created_at < ?').bind(cutoff).run(), ]), ) } catch { // executionCtx not available in test, skip } }) // ============================================ // UI (no auth, served before auth middleware) // ============================================ app.get('/favicon.ico', (c) => { return new Response(null, { status: 204 }) }) app.get('/ui', (c) => { return c.html(UI_HTML) }) app.get('/ui/*', (c) => { return c.html(UI_HTML) }) // ============================================ // Health // ============================================ app.get('/health', (c) => { return c.json({ status: 'ok', version: '2.4.0' }) }) // ============================================ // Schema (self-describing, no auth) // ============================================ app.get('/schema', async (c) => { const schema = await getSchema(c.env.DB) return c.json(schema) }) // Auth middleware: API_TOKEN for admin ops, API Key allowed for reads + POST /events // Read-only paths that API Keys can access const API_KEY_READABLE_PREFIXES = ['/events', '/objects', '/projections', '/event-defs', '/projection-defs', '/object-defs'] app.use('*', async (c, next) => { if (c.req.path === '/health' || c.req.path === '/schema' || c.req.path === '/favicon.ico' || c.req.path.startsWith('/ui')) return next() if (c.req.method === 'POST' && c.req.path === '/events') return next() const authHeader = c.req.header('Authorization') const bearerToken = authHeader?.startsWith('Bearer ') ? authHeader.slice(7) : null if (!bearerToken) { return apiError(c, 401, ErrorCode.UNAUTHORIZED, 'Missing or invalid Authorization header') } // Admin token: full access if (bearerToken === c.env.API_TOKEN) return next() // API Key: read-only access to data endpoints if (c.req.method === 'GET') { const isReadable = API_KEY_READABLE_PREFIXES.some(p => c.req.path.startsWith(p)) if (isReadable) { const result = await validateApiKey(c.env.DB, bearerToken) if (result.valid) { if (result.apiKey) { c.set('apiKeyId', result.apiKey.id) c.set('apiKeyName', result.apiKey.name) } return next() } } } // Check if it's a valid API Key trying admin ops → 403 const keyCheck = await validateApiKey(c.env.DB, bearerToken) if (keyCheck.valid) { return apiError(c, 403, ErrorCode.FORBIDDEN, 'API key cannot perform admin operations') } // Invalid token entirely → 401 return apiError(c, 401, ErrorCode.UNAUTHORIZED, 'Invalid or unauthorized token') }) // ============================================ // Object Defs // ============================================ app.post('/object-defs', async (c) => { try { const body = await c.req.json() if (!body.name) return apiError(c, 400, ErrorCode.MISSING_FIELD, 'Missing name') const result = await createObjectDef(c.env.DB, body.name) return c.json(result, 201) } catch (err: any) { if (err?.name === 'ValidationError') return apiError(c, 400, ErrorCode.INVALID_INPUT, err.message); return apiError(c, 500, ErrorCode.INTERNAL_ERROR, err.message || 'Internal error') } }) app.get('/object-defs/:name', async (c) => { const name = c.req.param('name') const def = await getObjectDef(c.env.DB, name) if (!def) return apiError(c, 404, ErrorCode.NOT_FOUND, `Object def '${name}' not found`) return c.json(def) }) app.get('/object-defs', async (c) => { const defs = await listObjectDefs(c.env.DB) return c.json({ object_defs: defs, total: defs.length }) }) // ============================================ // Objects // ============================================ app.post('/objects', async (c) => { try { const body = await c.req.json() if (!body.type) return apiError(c, 400, ErrorCode.MISSING_FIELD, 'Missing type') const obj = await createObject(c.env.DB, body.type) return c.json(obj, 201) } catch (err: any) { if (err?.name === 'ValidationError') return apiError(c, 400, ErrorCode.INVALID_INPUT, err.message); return apiError(c, 500, ErrorCode.INTERNAL_ERROR, err.message || 'Internal error') } }) app.get('/objects/:id', async (c) => { const id = parseInt(c.req.param('id'), 10) if (isNaN(id)) return apiError(c, 400, ErrorCode.INVALID_INPUT, 'Invalid id') const obj = await getObject(c.env.DB, id) if (!obj) return apiError(c, 404, ErrorCode.NOT_FOUND, 'Not found') return c.json(obj) }) app.get('/objects', async (c) => { const type = c.req.query('type') const limitParam = c.req.query('limit') const offsetParam = c.req.query('offset') const limit = Math.min(parseInt(limitParam || '50', 10), 200) const offset = parseInt(offsetParam || '0', 10) const result = await listObjects(c.env.DB, type, limit, offset) return c.json(result) }) // ============================================ // Event Defs // ============================================ app.post('/event-defs', async (c) => { try { const body = await c.req.json() if (!body.name || !body.schema) return apiError(c, 400, ErrorCode.MISSING_FIELD, 'Missing name or schema') const result = await createEventDef(c.env.DB, body.name, body.schema) return c.json(result, 201) } catch (err: any) { if (err?.name === 'ValidationError') return apiError(c, 400, ErrorCode.INVALID_INPUT, err.message); return apiError(c, 500, ErrorCode.INTERNAL_ERROR, err.message || 'Internal error') } }) app.get('/event-defs/:name', async (c) => { const name = c.req.param('name') const def = await getEventDef(c.env.DB, name) if (!def) return apiError(c, 404, ErrorCode.NOT_FOUND, `Event def '${name}' not found`) return c.json(def) }) app.get('/event-defs', async (c) => { const defs = await listEventDefs(c.env.DB) return c.json({ event_defs: defs, total: defs.length }) }) // ============================================ // Events // ============================================ app.post('/events', async (c) => { // Dual auth: check if this request already passed API_TOKEN auth. // If not (i.e., Bearer token is not the API_TOKEN), validate as API key. const authHeader = c.req.header('Authorization') const bearerToken = authHeader?.startsWith('Bearer ') ? authHeader.slice(7) : null if (!bearerToken) { return apiError(c, 401, ErrorCode.UNAUTHORIZED, 'Missing or invalid Authorization header') } let body: CreateEventRequest try { body = await c.req.json() } catch { return apiError(c, 400, ErrorCode.INVALID_JSON, 'Invalid JSON body') } if (!body.type || !body.payload) return apiError(c, 400, ErrorCode.MISSING_FIELD, 'Missing type or payload') // If the token is not API_TOKEN, treat it as an API key if (bearerToken !== c.env.API_TOKEN) { const result = await validateApiKey(c.env.DB, bearerToken, body.type) if (!result.valid) { if (result.error === 'event_not_allowed') { return apiError(c, 403, ErrorCode.FORBIDDEN, 'Event type not allowed for this API key') } return apiError(c, 401, ErrorCode.UNAUTHORIZED, 'Invalid API key') } if (result.apiKey) { c.set('apiKeyId', result.apiKey.id) c.set('apiKeyName', result.apiKey.name) } } try { const { event, reactions_fired, reaction_results } = await createEvent(c.env.DB, body.type, body.payload) // Fire-and-forget webhook POSTs for reactions (if any) if (reaction_results.length > 0) { try { c.executionCtx.waitUntil(fireReactionWebhooks(c.env.DB, reaction_results)) } catch { // executionCtx not available in test environment, skip webhook firing } } return c.json({ event, reactions_fired, reaction_results }, 201) } catch (err: any) { if (err?.name === 'ValidationError') { return apiError(c, 400, ErrorCode.INVALID_INPUT, err.message) } if (err?.name === 'ValidationError') return apiError(c, 400, ErrorCode.INVALID_INPUT, err.message); return apiError(c, 500, ErrorCode.INTERNAL_ERROR, err.message || 'Internal error') } }) app.get('/events/:id', async (c) => { const id = parseInt(c.req.param('id'), 10) if (isNaN(id)) return apiError(c, 400, ErrorCode.INVALID_INPUT, 'Invalid id') const event = await getEvent(c.env.DB, id) if (!event) return apiError(c, 404, ErrorCode.NOT_FOUND, 'Not found') return c.json(event) }) app.get('/events', async (c) => { const refParam = c.req.query('ref') const refId = refParam ? parseInt(refParam, 10) : undefined const afterParam = c.req.query('after') const afterId = afterParam ? parseInt(afterParam, 10) : undefined const limitParam = c.req.query('limit') const offsetParam = c.req.query('offset') const limit = Math.min(parseInt(limitParam || '50', 10), 200) const offset = parseInt(offsetParam || '0', 10) const result = await findEventsByRef(c.env.DB, refId, limit, offset, afterId) return c.json(result) }) // ============================================ // Projection Defs // ============================================ app.post('/projection-defs', async (c) => { try { const body = await c.req.json() if (!body.name || !body.value_schema || body.initial_value === undefined) { return apiError(c, 400, ErrorCode.MISSING_FIELD, 'Missing name, value_schema, or initial_value') } if (!body.sources || !Array.isArray(body.sources) || body.sources.length === 0) { return apiError(c, 400, ErrorCode.MISSING_FIELD, 'Missing or empty sources array') } const result = await createProjectionDef( c.env.DB, body.name, body.sources, body.params, body.value_schema, body.initial_value, ) return c.json(result, 201) } catch (err: any) { if (err?.name === 'ValidationError') return apiError(c, 400, ErrorCode.INVALID_INPUT, err.message); return apiError(c, 500, ErrorCode.INTERNAL_ERROR, err.message || 'Internal error') } }) app.get('/projection-defs/:name', async (c) => { const name = c.req.param('name') const def = await getProjectionDef(c.env.DB, name) if (!def) return apiError(c, 404, ErrorCode.NOT_FOUND, `Projection def '${name}' not found`) return c.json(def) }) app.get('/projection-defs', async (c) => { const defs = await listProjectionDefs(c.env.DB) return c.json({ projection_defs: defs, total: defs.length }) }) // ============================================ // Projections // ============================================ app.get('/projections/:name', async (c) => { try { const name = c.req.param('name') const rawParams = c.req.queries() const params: Record = {} for (const [key, values] of Object.entries(rawParams)) { params[key] = values[0] // take first value } const result = await getProjection(c.env.DB, name, params) const response: Record = { value: result.value } if (result.status === 'errored') { response._status = 'errored' response._error = result.error } return c.json(response) } catch (err: any) { if (err?.name === 'ValidationError') return apiError(c, 400, ErrorCode.INVALID_INPUT, err.message); return apiError(c, 500, ErrorCode.INTERNAL_ERROR, err.message || 'Internal error') } }) // ============================================ // Reactions // ============================================ app.post('/reactions', async (c) => { try { const body = await c.req.json() const action = body.action || 'webhook' if (!body.projection_def || !body.params) { return apiError(c, 400, ErrorCode.MISSING_FIELD, 'Missing projection_def or params') } if (action === 'webhook' && !body.webhook_url) { return apiError(c, 400, ErrorCode.MISSING_FIELD, 'webhook_url is required when action is webhook') } if (action === 'emit_event' && !body.emit_event_type) { return apiError(c, 400, ErrorCode.MISSING_FIELD, 'emit_event_type is required when action is emit_event') } if (action === 'handler' && !body.handler_code) { return apiError(c, 400, ErrorCode.MISSING_FIELD, 'handler_code is required when action is handler') } const reaction = await createReaction(c.env.DB, body.projection_def, body.params, { action, webhook_url: body.webhook_url, emit_event_type: body.emit_event_type, emit_payload_template: body.emit_payload_template, handler_code: body.handler_code, handler_timeout_ms: body.handler_timeout_ms, }) return c.json(reaction, 201) } catch (err: any) { if (err?.name === 'ValidationError') return apiError(c, 400, ErrorCode.INVALID_INPUT, err.message); return apiError(c, 500, ErrorCode.INTERNAL_ERROR, err.message || 'Internal error') } }) app.get('/reactions/:id', async (c) => { const id = parseInt(c.req.param('id'), 10) if (isNaN(id)) return apiError(c, 400, ErrorCode.INVALID_INPUT, 'Invalid id') const reaction = await getReaction(c.env.DB, id) if (!reaction) return apiError(c, 404, ErrorCode.NOT_FOUND, `Reaction ${id} not found`) return c.json(reaction) }) app.get('/reactions', async (c) => { const limitParam = c.req.query('limit') const offsetParam = c.req.query('offset') const limit = Math.min(parseInt(limitParam || '50', 10), 200) const offset = parseInt(offsetParam || '0', 10) const result = await listReactions(c.env.DB, limit, offset) return c.json(result) }) app.delete('/reactions/:id', async (c) => { const id = parseInt(c.req.param('id'), 10) await deleteReaction(c.env.DB, id) return c.json({ deleted: id }) }) // ============================================ // Reaction Logs // ============================================ app.get('/reaction-logs', async (c) => { const limit = parseInt(c.req.query('limit') || '50', 10) const offset = parseInt(c.req.query('offset') || '0', 10) const reactionId = c.req.query('reaction_id') ? parseInt(c.req.query('reaction_id')!, 10) : undefined const result = await listReactionLogs(c.env.DB, limit, offset, reactionId) return c.json(result) }) // ============================================ // Request Logs // ============================================ app.get('/request-logs', async (c) => { const db = c.env.DB const limit = parseInt(c.req.query('limit') || '50', 10) const offset = parseInt(c.req.query('offset') || '0', 10) const apiKeyId = c.req.query('api_key_id') ? parseInt(c.req.query('api_key_id')!, 10) : undefined let query = 'SELECT * FROM request_logs' const binds: any[] = [] if (apiKeyId !== undefined) { query += ' WHERE api_key_id = ?' binds.push(apiKeyId) } query += ' ORDER BY id DESC LIMIT ? OFFSET ?' binds.push(limit, offset) let countQuery = 'SELECT COUNT(*) as total FROM request_logs' if (apiKeyId !== undefined) { countQuery += ' WHERE api_key_id = ?' } const [rows, countRow] = await Promise.all([ db .prepare(query) .bind(...binds) .all(), db .prepare(countQuery) .bind(...(apiKeyId !== undefined ? [apiKeyId] : [])) .first<{ total: number }>(), ]) return c.json({ request_logs: rows.results, total: countRow?.total || 0 }) }) // ============================================ // API Keys // ============================================ app.post('/api-keys', async (c) => { try { const body = await c.req.json() if (!body.name) return apiError(c, 400, ErrorCode.MISSING_FIELD, 'Missing name') // Role is always 'ingest' — admin/readonly not implemented const result = await createApiKey(c.env.DB, body.name, body.allowed_events, body.rate_limit) return c.json(result, 201) } catch (err: any) { if (err?.name === 'ValidationError') return apiError(c, 400, ErrorCode.INVALID_INPUT, err.message); return apiError(c, 500, ErrorCode.INTERNAL_ERROR, err.message || 'Internal error') } }) app.get('/api-keys', async (c) => { const limitParam = c.req.query('limit') const offsetParam = c.req.query('offset') const limit = Math.min(parseInt(limitParam || '50', 10), 200) const offset = parseInt(offsetParam || '0', 10) const result = await listApiKeys(c.env.DB, limit, offset) return c.json(result) }) app.delete('/api-keys/:id', async (c) => { const id = parseInt(c.req.param('id'), 10) await deleteApiKey(c.env.DB, id) return c.json({ deleted: id }) }) // ============================================ // Helper: Fire Reaction Webhooks // ============================================ async function fireReactionWebhooks(db: D1Database, payloads: ReactionPayload[]): Promise { for (const payload of payloads) { try { // Look up webhook URL for this reaction const reaction = await db .prepare('SELECT webhook_url FROM reactions WHERE id = ?') .bind(payload.reaction_id) .first<{ webhook_url: string }>() if (!reaction) continue await fetch(reaction.webhook_url, { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(payload), }) } catch { // Ignore webhook errors } } } export default app