From 98bee0aeddca5f73ecf08dd79162af0f3b406785 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Mon, 13 Apr 2026 02:59:01 +0000 Subject: [PATCH] =?UTF-8?q?feat(engine):=20projection=20health=20status=20?= =?UTF-8?q?=E2=80=94=20errored=20projections=20stop=20updating=20(#20)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../migrations/0022_projection_health.sql | 3 + packages/engine/src/engine.ts | 122 +++++++-- packages/engine/src/index.test.ts | 237 ++++++++++++++++++ packages/engine/src/index.ts | 9 +- 4 files changed, 346 insertions(+), 25 deletions(-) create mode 100644 packages/engine/migrations/0022_projection_health.sql diff --git a/packages/engine/migrations/0022_projection_health.sql b/packages/engine/migrations/0022_projection_health.sql new file mode 100644 index 0000000..f84e343 --- /dev/null +++ b/packages/engine/migrations/0022_projection_health.sql @@ -0,0 +1,3 @@ +ALTER TABLE projections ADD COLUMN status TEXT NOT NULL DEFAULT 'healthy'; +ALTER TABLE projections ADD COLUMN error_message TEXT; +ALTER TABLE projections ADD COLUMN error_at INTEGER; diff --git a/packages/engine/src/engine.ts b/packages/engine/src/engine.ts index c66993d..490a10d 100644 --- a/packages/engine/src/engine.ts +++ b/packages/engine/src/engine.ts @@ -671,7 +671,13 @@ function buildEventsQuery( return { sql, binds } } -export async function getProjection(db: D1Database, defName: string, params: Record): Promise { +export interface ProjectionResult { + value: any + status?: 'healthy' | 'errored' + error?: string +} + +export async function getProjection(db: D1Database, defName: string, params: Record): Promise { const defHash = await resolveProjectionDefName(db, defName) if (!defHash) throw new Error(`Projection def ${defName} not found`) @@ -698,16 +704,20 @@ export async function getProjection(db: D1Database, defName: string, params: Rec })) const cached = await db - .prepare('SELECT value, last_event_id FROM projections WHERE def_hash = ? AND params_hash = ?') + .prepare('SELECT value, last_event_id, status, error_message FROM projections WHERE def_hash = ? AND params_hash = ?') .bind(defHash, paramsHash) - .first<{ value: string; last_event_id: number }>() + .first<{ value: string; last_event_id: number; status: string; error_message: string | null }>() let baseState: any let sinceEventId: number + let currentStatus: string = 'healthy' + let currentError: string | null = null if (cached) { baseState = JSON.parse(cached.value) sinceEventId = cached.last_event_id + currentStatus = cached.status || 'healthy' + currentError = cached.error_message || null } else { baseState = initialValue sinceEventId = 0 @@ -736,14 +746,19 @@ export async function getProjection(db: D1Database, defName: string, params: Rec const now = Date.now() await db .prepare( - `INSERT INTO projections (def_hash, params_hash, params, value, last_event_id, created_at) - VALUES (?, ?, ?, ?, ?, ?) + `INSERT INTO projections (def_hash, params_hash, params, value, last_event_id, status, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT(def_hash, params_hash) DO UPDATE SET value = excluded.value, last_event_id = excluded.last_event_id`, ) - .bind(defHash, paramsHash, JSON.stringify(params), JSON.stringify(baseState), 0, now) + .bind(defHash, paramsHash, JSON.stringify(params), JSON.stringify(baseState), 0, 'healthy', now) .run() } - return baseState + const result: ProjectionResult = { value: baseState } + if (currentStatus === 'errored') { + result.status = 'errored' + result.error = currentError || undefined + } + return result } let state = baseState @@ -752,26 +767,74 @@ export async function getProjection(db: D1Database, defName: string, params: Rec if (!source) continue const eventContext = buildEventContext(event) - const expr = jsonata(source.expression) + let evalResult: any + let evalError: string | null = null + try { - state = await expr.evaluate({ state, event: eventContext, params }) - } catch (err) { - throw new Error(`Projection expression eval failed: ${err}`) + const expr = jsonata(source.expression) + evalResult = await expr.evaluate({ state, event: eventContext, params }) + } catch (err: any) { + evalError = err?.message || String(err) } + + if (evalError !== null || evalResult === undefined || evalResult === null) { + // Mark projection as errored — return baseState, do NOT update last_event_id + const errorMsg = evalError || `Expression returned ${evalResult === undefined ? 'undefined' : 'null'}` + const now = Date.now() + await db + .prepare( + `INSERT INTO projections (def_hash, params_hash, params, value, last_event_id, status, error_message, error_at, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(def_hash, params_hash) DO UPDATE SET status = excluded.status, error_message = excluded.error_message, error_at = excluded.error_at`, + ) + .bind(defHash, paramsHash, JSON.stringify(params), JSON.stringify(baseState), sinceEventId, 'errored', errorMsg, now, now) + .run() + return { value: baseState, status: 'errored', error: errorMsg } + } + + state = evalResult + } + + // Guard: if state ended up undefined/null after loop, treat as error + if (state === undefined || state === null) { + const errorMsg = `Final state is ${state === undefined ? 'undefined' : 'null'}` + const now = Date.now() + await db + .prepare( + `INSERT INTO projections (def_hash, params_hash, params, value, last_event_id, status, error_message, error_at, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(def_hash, params_hash) DO UPDATE SET status = excluded.status, error_message = excluded.error_message, error_at = excluded.error_at`, + ) + .bind(defHash, paramsHash, JSON.stringify(params), JSON.stringify(baseState), sinceEventId, 'errored', errorMsg, now, now) + .run() + return { value: baseState, status: 'errored', error: errorMsg } } const lastEventId = allEvents[allEvents.length - 1].id const now = Date.now() - await db - .prepare( - `INSERT INTO projections (def_hash, params_hash, params, value, last_event_id, created_at) - VALUES (?, ?, ?, ?, ?, ?) - ON CONFLICT(def_hash, params_hash) DO UPDATE SET value = excluded.value, last_event_id = excluded.last_event_id`, - ) - .bind(defHash, paramsHash, JSON.stringify(params), JSON.stringify(state), lastEventId, now) - .run() - return state + // If previously errored and now succeeds, recover to healthy + if (currentStatus === 'errored') { + await db + .prepare( + `INSERT INTO projections (def_hash, params_hash, params, value, last_event_id, status, error_message, error_at, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(def_hash, params_hash) DO UPDATE SET value = excluded.value, last_event_id = excluded.last_event_id, status = excluded.status, error_message = excluded.error_message, error_at = excluded.error_at`, + ) + .bind(defHash, paramsHash, JSON.stringify(params), JSON.stringify(state), lastEventId, 'healthy', null, null, now) + .run() + } else { + await db + .prepare( + `INSERT INTO projections (def_hash, params_hash, params, value, last_event_id, status, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(def_hash, params_hash) DO UPDATE SET value = excluded.value, last_event_id = excluded.last_event_id`, + ) + .bind(defHash, paramsHash, JSON.stringify(params), JSON.stringify(state), lastEventId, 'healthy', now) + .run() + } + + return { value: state } } // ============================================ @@ -1110,15 +1173,28 @@ async function _triggerReactionChainInner( .bind(row.projection_hash) .first<{ initial_value: string }>() const cachedRow = await db - .prepare('SELECT value FROM projections WHERE def_hash = ? AND params_hash = ?') + .prepare('SELECT value, status FROM projections WHERE def_hash = ? AND params_hash = ?') .bind(row.projection_hash, paramsHash) - .first<{ value: string }>() + .first<{ value: string; status: string }>() + + // Skip errored projections — do not compute diff, do not fire reaction + if (cachedRow && cachedRow.status === 'errored') { + continue + } + const oldValue = cachedRow ? JSON.parse(cachedRow.value) : defRow ? JSON.parse(defRow.initial_value) : null const startTime = Date.now() - const newValue = await getProjection(db, nameRow.name, params) + const projectionResult = await getProjection(db, nameRow.name, params) const durationMs = Date.now() - startTime + // If projection just became errored, skip reaction firing + if (projectionResult.status === 'errored') { + continue + } + + const newValue = projectionResult.value + if (JSON.stringify(oldValue) !== JSON.stringify(newValue)) { const reactionAction = reactionRow.action || 'webhook' diff --git a/packages/engine/src/index.test.ts b/packages/engine/src/index.test.ts index 3eeedf4..d4621ad 100644 --- a/packages/engine/src/index.test.ts +++ b/packages/engine/src/index.test.ts @@ -2304,3 +2304,240 @@ describe('Request Logs', () => { expect(allJson.request_logs.length).toBe(2) }) }) + +// ============================================ +// Projection Health (#20) +// ============================================ + +describe('Projection Health', () => { + let agentId: number + let taskId: number + + beforeEach(async () => { + await app.fetch(req('POST', '/object-defs', { name: 'agent' }), { DB: db, API_TOKEN: API_TOKEN }) + await app.fetch(req('POST', '/object-defs', { name: 'task' }), { DB: db, API_TOKEN: API_TOKEN }) + const agentRes = await app.fetch(req('POST', '/objects', { type: 'agent' }), { DB: db, API_TOKEN: API_TOKEN }) + agentId = (await agentRes.json()).id + const taskRes = await app.fetch(req('POST', '/objects', { type: 'task' }), { DB: db, API_TOKEN: API_TOKEN }) + taskId = (await taskRes.json()).id + const schema = { + properties: { + participant: { type: 'ref' as const, object_type: 'agent' }, + subject: { type: 'ref' as const, object_type: 'task' }, + }, + } + await app.fetch(req('POST', '/event-defs', { name: 'task_assigned', schema }), { DB: db, API_TOKEN: API_TOKEN }) + }) + + it('expression returning undefined → projection marked errored, returns initial_value', async () => { + // Create projection with expression that returns undefined (accessing non-existent field) + const projDef = { + name: 'bad_projection', + sources: [ + { + event_def: 'task_assigned', + bindings: { subject: '$task_id' }, + expression: 'event.nonexistent_field', + }, + ], + params: { task_id: { type: 'ref' } }, + value_schema: { type: 'string' }, + initial_value: 'default_value', + } + await app.fetch(req('POST', '/projection-defs', projDef), { DB: db, API_TOKEN: API_TOKEN }) + + // Emit an event + await app.fetch( + req('POST', '/events', { type: 'task_assigned', payload: { participant: agentId, subject: taskId } }), + { DB: db, API_TOKEN: API_TOKEN }, + ) + + // Query the projection — should return initial_value + errored status + const res = await app.fetch(req('GET', `/projections/bad_projection?task_id=${taskId}`), { + DB: db, + API_TOKEN: API_TOKEN, + }) + expect(res.status).toBe(200) + const json = await res.json() + expect(json.value).toBe('default_value') + expect(json._status).toBe('errored') + expect(json._error).toBeDefined() + }) + + it('expression that throws → projection marked errored, returns initial_value', async () => { + // Create projection with expression that will throw (invalid JSONata) + const projDef = { + name: 'throw_projection', + sources: [ + { + event_def: 'task_assigned', + bindings: { subject: '$task_id' }, + expression: '$unknown_function()', + }, + ], + params: { task_id: { type: 'ref' } }, + value_schema: { type: 'number' }, + initial_value: 0, + } + await app.fetch(req('POST', '/projection-defs', projDef), { DB: db, API_TOKEN: API_TOKEN }) + + // Emit an event + await app.fetch( + req('POST', '/events', { type: 'task_assigned', payload: { participant: agentId, subject: taskId } }), + { DB: db, API_TOKEN: API_TOKEN }, + ) + + // Query the projection — should return initial_value + errored status + const res = await app.fetch(req('GET', `/projections/throw_projection?task_id=${taskId}`), { + DB: db, + API_TOKEN: API_TOKEN, + }) + expect(res.status).toBe(200) + const json = await res.json() + expect(json.value).toBe(0) + expect(json._status).toBe('errored') + expect(json._error).toBeDefined() + }) + + it('subsequent query on errored projection still returns initial_value', async () => { + const projDef = { + name: 'bad_projection', + sources: [ + { + event_def: 'task_assigned', + bindings: { subject: '$task_id' }, + expression: 'event.nonexistent_field', + }, + ], + params: { task_id: { type: 'ref' } }, + value_schema: { type: 'string' }, + initial_value: 'default_value', + } + await app.fetch(req('POST', '/projection-defs', projDef), { DB: db, API_TOKEN: API_TOKEN }) + + // Emit event + await app.fetch( + req('POST', '/events', { type: 'task_assigned', payload: { participant: agentId, subject: taskId } }), + { DB: db, API_TOKEN: API_TOKEN }, + ) + + // First query marks as errored + await app.fetch(req('GET', `/projections/bad_projection?task_id=${taskId}`), { + DB: db, + API_TOKEN: API_TOKEN, + }) + + // Second query — should still return initial_value + const res2 = await app.fetch(req('GET', `/projections/bad_projection?task_id=${taskId}`), { + DB: db, + API_TOKEN: API_TOKEN, + }) + expect(res2.status).toBe(200) + const json2 = await res2.json() + expect(json2.value).toBe('default_value') + expect(json2._status).toBe('errored') + }) + + it('fix expression → projection recovers to healthy', async () => { + // Create broken projection + const projDef = { + name: 'recoverable_projection', + sources: [ + { + event_def: 'task_assigned', + bindings: { subject: '$task_id' }, + expression: 'event.nonexistent_field', + }, + ], + params: { task_id: { type: 'ref' } }, + value_schema: { type: 'ref' }, + initial_value: '', + } + await app.fetch(req('POST', '/projection-defs', projDef), { DB: db, API_TOKEN: API_TOKEN }) + + // Emit event + await app.fetch( + req('POST', '/events', { type: 'task_assigned', payload: { participant: agentId, subject: taskId } }), + { DB: db, API_TOKEN: API_TOKEN }, + ) + + // Query — marked errored + const res1 = await app.fetch(req('GET', `/projections/recoverable_projection?task_id=${taskId}`), { + DB: db, + API_TOKEN: API_TOKEN, + }) + const json1 = await res1.json() + expect(json1._status).toBe('errored') + + // Fix projection def with correct expression + const fixedProjDef = { + name: 'recoverable_projection', + sources: [ + { + event_def: 'task_assigned', + bindings: { subject: '$task_id' }, + expression: 'event.participant', + }, + ], + params: { task_id: { type: 'ref' } }, + value_schema: { type: 'ref' }, + initial_value: '', + } + await app.fetch(req('POST', '/projection-defs', fixedProjDef), { DB: db, API_TOKEN: API_TOKEN }) + + // Query again — should recover to healthy with correct value + const res2 = await app.fetch(req('GET', `/projections/recoverable_projection?task_id=${taskId}`), { + DB: db, + API_TOKEN: API_TOKEN, + }) + expect(res2.status).toBe(200) + const json2 = await res2.json() + expect(json2.value).toBe(agentId) + expect(json2._status).toBeUndefined() + expect(json2._error).toBeUndefined() + }) + + it('errored projection does not trigger reactions', async () => { + // Create projection with bad expression + const projDef = { + name: 'bad_reaction_proj', + sources: [ + { + event_def: 'task_assigned', + bindings: { subject: '$task_id' }, + expression: 'event.nonexistent_field', + }, + ], + params: { task_id: { type: 'ref' } }, + value_schema: { type: 'string' }, + initial_value: 'default_value', + } + await app.fetch(req('POST', '/projection-defs', projDef), { DB: db, API_TOKEN: API_TOKEN }) + + // Create a reaction watching this projection + await app.fetch( + req('POST', '/reactions', { + projection_def: 'bad_reaction_proj', + params: { task_id: taskId }, + webhook_url: 'https://hook.example.com', + }), + { DB: db, API_TOKEN: API_TOKEN }, + ) + + // First emit — this will make the projection errored during reaction chain + const res1 = await app.fetch( + req('POST', '/events', { type: 'task_assigned', payload: { participant: agentId, subject: taskId } }), + { DB: db, API_TOKEN: API_TOKEN }, + ) + const json1 = await res1.json() + expect(json1.reactions_fired).toBe(0) + + // Second emit — projection should still be errored, no reaction + const res2 = await app.fetch( + req('POST', '/events', { type: 'task_assigned', payload: { participant: agentId, subject: taskId } }), + { DB: db, API_TOKEN: API_TOKEN }, + ) + const json2 = await res2.json() + expect(json2.reactions_fired).toBe(0) + }) +}) diff --git a/packages/engine/src/index.ts b/packages/engine/src/index.ts index 76e4931..8afc369 100644 --- a/packages/engine/src/index.ts +++ b/packages/engine/src/index.ts @@ -357,8 +357,13 @@ app.get('/projections/:name', async (c) => { for (const [key, values] of Object.entries(rawParams)) { params[key] = values[0] // take first value } - const value = await getProjection(c.env.DB, name, params) - return c.json({ value }) + const result = await getProjection(c.env.DB, name, params) + const response: Record = { value: result.value } + if (result.status === 'errored') { + response._status = 'errored' + response._error = result.error + } + return c.json(response) } catch (err: any) { return apiError(c, 500, ErrorCode.INTERNAL_ERROR, err.message || 'Internal error') }