feat(engine): projection health status — errored projections stop updating (#20)

This commit is contained in:
小橘 2026-04-13 02:59:01 +00:00
parent 21e159ffd5
commit 98bee0aedd
4 changed files with 346 additions and 25 deletions

View File

@ -0,0 +1,3 @@
ALTER TABLE projections ADD COLUMN status TEXT NOT NULL DEFAULT 'healthy';
ALTER TABLE projections ADD COLUMN error_message TEXT;
ALTER TABLE projections ADD COLUMN error_at INTEGER;

View File

@ -671,7 +671,13 @@ function buildEventsQuery(
return { sql, binds } return { sql, binds }
} }
export async function getProjection(db: D1Database, defName: string, params: Record<string, any>): Promise<any> { export interface ProjectionResult {
value: any
status?: 'healthy' | 'errored'
error?: string
}
export async function getProjection(db: D1Database, defName: string, params: Record<string, any>): Promise<ProjectionResult> {
const defHash = await resolveProjectionDefName(db, defName) const defHash = await resolveProjectionDefName(db, defName)
if (!defHash) throw new Error(`Projection def ${defName} not found`) if (!defHash) throw new Error(`Projection def ${defName} not found`)
@ -698,16 +704,20 @@ export async function getProjection(db: D1Database, defName: string, params: Rec
})) }))
const cached = await db const cached = await db
.prepare('SELECT value, last_event_id FROM projections WHERE def_hash = ? AND params_hash = ?') .prepare('SELECT value, last_event_id, status, error_message FROM projections WHERE def_hash = ? AND params_hash = ?')
.bind(defHash, paramsHash) .bind(defHash, paramsHash)
.first<{ value: string; last_event_id: number }>() .first<{ value: string; last_event_id: number; status: string; error_message: string | null }>()
let baseState: any let baseState: any
let sinceEventId: number let sinceEventId: number
let currentStatus: string = 'healthy'
let currentError: string | null = null
if (cached) { if (cached) {
baseState = JSON.parse(cached.value) baseState = JSON.parse(cached.value)
sinceEventId = cached.last_event_id sinceEventId = cached.last_event_id
currentStatus = cached.status || 'healthy'
currentError = cached.error_message || null
} else { } else {
baseState = initialValue baseState = initialValue
sinceEventId = 0 sinceEventId = 0
@ -736,14 +746,19 @@ export async function getProjection(db: D1Database, defName: string, params: Rec
const now = Date.now() const now = Date.now()
await db await db
.prepare( .prepare(
`INSERT INTO projections (def_hash, params_hash, params, value, last_event_id, created_at) `INSERT INTO projections (def_hash, params_hash, params, value, last_event_id, status, created_at)
VALUES (?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(def_hash, params_hash) DO UPDATE SET value = excluded.value, last_event_id = excluded.last_event_id`, ON CONFLICT(def_hash, params_hash) DO UPDATE SET value = excluded.value, last_event_id = excluded.last_event_id`,
) )
.bind(defHash, paramsHash, JSON.stringify(params), JSON.stringify(baseState), 0, now) .bind(defHash, paramsHash, JSON.stringify(params), JSON.stringify(baseState), 0, 'healthy', now)
.run() .run()
} }
return baseState const result: ProjectionResult = { value: baseState }
if (currentStatus === 'errored') {
result.status = 'errored'
result.error = currentError || undefined
}
return result
} }
let state = baseState let state = baseState
@ -752,26 +767,74 @@ export async function getProjection(db: D1Database, defName: string, params: Rec
if (!source) continue if (!source) continue
const eventContext = buildEventContext(event) const eventContext = buildEventContext(event)
const expr = jsonata(source.expression) let evalResult: any
let evalError: string | null = null
try { try {
state = await expr.evaluate({ state, event: eventContext, params }) const expr = jsonata(source.expression)
} catch (err) { evalResult = await expr.evaluate({ state, event: eventContext, params })
throw new Error(`Projection expression eval failed: ${err}`) } catch (err: any) {
evalError = err?.message || String(err)
} }
if (evalError !== null || evalResult === undefined || evalResult === null) {
// Mark projection as errored — return baseState, do NOT update last_event_id
const errorMsg = evalError || `Expression returned ${evalResult === undefined ? 'undefined' : 'null'}`
const now = Date.now()
await db
.prepare(
`INSERT INTO projections (def_hash, params_hash, params, value, last_event_id, status, error_message, error_at, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(def_hash, params_hash) DO UPDATE SET status = excluded.status, error_message = excluded.error_message, error_at = excluded.error_at`,
)
.bind(defHash, paramsHash, JSON.stringify(params), JSON.stringify(baseState), sinceEventId, 'errored', errorMsg, now, now)
.run()
return { value: baseState, status: 'errored', error: errorMsg }
}
state = evalResult
}
// Guard: if state ended up undefined/null after loop, treat as error
if (state === undefined || state === null) {
const errorMsg = `Final state is ${state === undefined ? 'undefined' : 'null'}`
const now = Date.now()
await db
.prepare(
`INSERT INTO projections (def_hash, params_hash, params, value, last_event_id, status, error_message, error_at, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(def_hash, params_hash) DO UPDATE SET status = excluded.status, error_message = excluded.error_message, error_at = excluded.error_at`,
)
.bind(defHash, paramsHash, JSON.stringify(params), JSON.stringify(baseState), sinceEventId, 'errored', errorMsg, now, now)
.run()
return { value: baseState, status: 'errored', error: errorMsg }
} }
const lastEventId = allEvents[allEvents.length - 1].id const lastEventId = allEvents[allEvents.length - 1].id
const now = Date.now() const now = Date.now()
await db
.prepare(
`INSERT INTO projections (def_hash, params_hash, params, value, last_event_id, created_at)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(def_hash, params_hash) DO UPDATE SET value = excluded.value, last_event_id = excluded.last_event_id`,
)
.bind(defHash, paramsHash, JSON.stringify(params), JSON.stringify(state), lastEventId, now)
.run()
return state // If previously errored and now succeeds, recover to healthy
if (currentStatus === 'errored') {
await db
.prepare(
`INSERT INTO projections (def_hash, params_hash, params, value, last_event_id, status, error_message, error_at, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(def_hash, params_hash) DO UPDATE SET value = excluded.value, last_event_id = excluded.last_event_id, status = excluded.status, error_message = excluded.error_message, error_at = excluded.error_at`,
)
.bind(defHash, paramsHash, JSON.stringify(params), JSON.stringify(state), lastEventId, 'healthy', null, null, now)
.run()
} else {
await db
.prepare(
`INSERT INTO projections (def_hash, params_hash, params, value, last_event_id, status, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(def_hash, params_hash) DO UPDATE SET value = excluded.value, last_event_id = excluded.last_event_id`,
)
.bind(defHash, paramsHash, JSON.stringify(params), JSON.stringify(state), lastEventId, 'healthy', now)
.run()
}
return { value: state }
} }
// ============================================ // ============================================
@ -1110,15 +1173,28 @@ async function _triggerReactionChainInner(
.bind(row.projection_hash) .bind(row.projection_hash)
.first<{ initial_value: string }>() .first<{ initial_value: string }>()
const cachedRow = await db const cachedRow = await db
.prepare('SELECT value FROM projections WHERE def_hash = ? AND params_hash = ?') .prepare('SELECT value, status FROM projections WHERE def_hash = ? AND params_hash = ?')
.bind(row.projection_hash, paramsHash) .bind(row.projection_hash, paramsHash)
.first<{ value: string }>() .first<{ value: string; status: string }>()
// Skip errored projections — do not compute diff, do not fire reaction
if (cachedRow && cachedRow.status === 'errored') {
continue
}
const oldValue = cachedRow ? JSON.parse(cachedRow.value) : defRow ? JSON.parse(defRow.initial_value) : null const oldValue = cachedRow ? JSON.parse(cachedRow.value) : defRow ? JSON.parse(defRow.initial_value) : null
const startTime = Date.now() const startTime = Date.now()
const newValue = await getProjection(db, nameRow.name, params) const projectionResult = await getProjection(db, nameRow.name, params)
const durationMs = Date.now() - startTime const durationMs = Date.now() - startTime
// If projection just became errored, skip reaction firing
if (projectionResult.status === 'errored') {
continue
}
const newValue = projectionResult.value
if (JSON.stringify(oldValue) !== JSON.stringify(newValue)) { if (JSON.stringify(oldValue) !== JSON.stringify(newValue)) {
const reactionAction = reactionRow.action || 'webhook' const reactionAction = reactionRow.action || 'webhook'

View File

@ -2304,3 +2304,240 @@ describe('Request Logs', () => {
expect(allJson.request_logs.length).toBe(2) expect(allJson.request_logs.length).toBe(2)
}) })
}) })
// ============================================
// Projection Health (#20)
// ============================================
describe('Projection Health', () => {
let agentId: number
let taskId: number
beforeEach(async () => {
await app.fetch(req('POST', '/object-defs', { name: 'agent' }), { DB: db, API_TOKEN: API_TOKEN })
await app.fetch(req('POST', '/object-defs', { name: 'task' }), { DB: db, API_TOKEN: API_TOKEN })
const agentRes = await app.fetch(req('POST', '/objects', { type: 'agent' }), { DB: db, API_TOKEN: API_TOKEN })
agentId = (await agentRes.json()).id
const taskRes = await app.fetch(req('POST', '/objects', { type: 'task' }), { DB: db, API_TOKEN: API_TOKEN })
taskId = (await taskRes.json()).id
const schema = {
properties: {
participant: { type: 'ref' as const, object_type: 'agent' },
subject: { type: 'ref' as const, object_type: 'task' },
},
}
await app.fetch(req('POST', '/event-defs', { name: 'task_assigned', schema }), { DB: db, API_TOKEN: API_TOKEN })
})
it('expression returning undefined → projection marked errored, returns initial_value', async () => {
// Create projection with expression that returns undefined (accessing non-existent field)
const projDef = {
name: 'bad_projection',
sources: [
{
event_def: 'task_assigned',
bindings: { subject: '$task_id' },
expression: 'event.nonexistent_field',
},
],
params: { task_id: { type: 'ref' } },
value_schema: { type: 'string' },
initial_value: 'default_value',
}
await app.fetch(req('POST', '/projection-defs', projDef), { DB: db, API_TOKEN: API_TOKEN })
// Emit an event
await app.fetch(
req('POST', '/events', { type: 'task_assigned', payload: { participant: agentId, subject: taskId } }),
{ DB: db, API_TOKEN: API_TOKEN },
)
// Query the projection — should return initial_value + errored status
const res = await app.fetch(req('GET', `/projections/bad_projection?task_id=${taskId}`), {
DB: db,
API_TOKEN: API_TOKEN,
})
expect(res.status).toBe(200)
const json = await res.json()
expect(json.value).toBe('default_value')
expect(json._status).toBe('errored')
expect(json._error).toBeDefined()
})
it('expression that throws → projection marked errored, returns initial_value', async () => {
// Create projection with expression that will throw (invalid JSONata)
const projDef = {
name: 'throw_projection',
sources: [
{
event_def: 'task_assigned',
bindings: { subject: '$task_id' },
expression: '$unknown_function()',
},
],
params: { task_id: { type: 'ref' } },
value_schema: { type: 'number' },
initial_value: 0,
}
await app.fetch(req('POST', '/projection-defs', projDef), { DB: db, API_TOKEN: API_TOKEN })
// Emit an event
await app.fetch(
req('POST', '/events', { type: 'task_assigned', payload: { participant: agentId, subject: taskId } }),
{ DB: db, API_TOKEN: API_TOKEN },
)
// Query the projection — should return initial_value + errored status
const res = await app.fetch(req('GET', `/projections/throw_projection?task_id=${taskId}`), {
DB: db,
API_TOKEN: API_TOKEN,
})
expect(res.status).toBe(200)
const json = await res.json()
expect(json.value).toBe(0)
expect(json._status).toBe('errored')
expect(json._error).toBeDefined()
})
it('subsequent query on errored projection still returns initial_value', async () => {
const projDef = {
name: 'bad_projection',
sources: [
{
event_def: 'task_assigned',
bindings: { subject: '$task_id' },
expression: 'event.nonexistent_field',
},
],
params: { task_id: { type: 'ref' } },
value_schema: { type: 'string' },
initial_value: 'default_value',
}
await app.fetch(req('POST', '/projection-defs', projDef), { DB: db, API_TOKEN: API_TOKEN })
// Emit event
await app.fetch(
req('POST', '/events', { type: 'task_assigned', payload: { participant: agentId, subject: taskId } }),
{ DB: db, API_TOKEN: API_TOKEN },
)
// First query marks as errored
await app.fetch(req('GET', `/projections/bad_projection?task_id=${taskId}`), {
DB: db,
API_TOKEN: API_TOKEN,
})
// Second query — should still return initial_value
const res2 = await app.fetch(req('GET', `/projections/bad_projection?task_id=${taskId}`), {
DB: db,
API_TOKEN: API_TOKEN,
})
expect(res2.status).toBe(200)
const json2 = await res2.json()
expect(json2.value).toBe('default_value')
expect(json2._status).toBe('errored')
})
it('fix expression → projection recovers to healthy', async () => {
// Create broken projection
const projDef = {
name: 'recoverable_projection',
sources: [
{
event_def: 'task_assigned',
bindings: { subject: '$task_id' },
expression: 'event.nonexistent_field',
},
],
params: { task_id: { type: 'ref' } },
value_schema: { type: 'ref' },
initial_value: '',
}
await app.fetch(req('POST', '/projection-defs', projDef), { DB: db, API_TOKEN: API_TOKEN })
// Emit event
await app.fetch(
req('POST', '/events', { type: 'task_assigned', payload: { participant: agentId, subject: taskId } }),
{ DB: db, API_TOKEN: API_TOKEN },
)
// Query — marked errored
const res1 = await app.fetch(req('GET', `/projections/recoverable_projection?task_id=${taskId}`), {
DB: db,
API_TOKEN: API_TOKEN,
})
const json1 = await res1.json()
expect(json1._status).toBe('errored')
// Fix projection def with correct expression
const fixedProjDef = {
name: 'recoverable_projection',
sources: [
{
event_def: 'task_assigned',
bindings: { subject: '$task_id' },
expression: 'event.participant',
},
],
params: { task_id: { type: 'ref' } },
value_schema: { type: 'ref' },
initial_value: '',
}
await app.fetch(req('POST', '/projection-defs', fixedProjDef), { DB: db, API_TOKEN: API_TOKEN })
// Query again — should recover to healthy with correct value
const res2 = await app.fetch(req('GET', `/projections/recoverable_projection?task_id=${taskId}`), {
DB: db,
API_TOKEN: API_TOKEN,
})
expect(res2.status).toBe(200)
const json2 = await res2.json()
expect(json2.value).toBe(agentId)
expect(json2._status).toBeUndefined()
expect(json2._error).toBeUndefined()
})
it('errored projection does not trigger reactions', async () => {
// Create projection with bad expression
const projDef = {
name: 'bad_reaction_proj',
sources: [
{
event_def: 'task_assigned',
bindings: { subject: '$task_id' },
expression: 'event.nonexistent_field',
},
],
params: { task_id: { type: 'ref' } },
value_schema: { type: 'string' },
initial_value: 'default_value',
}
await app.fetch(req('POST', '/projection-defs', projDef), { DB: db, API_TOKEN: API_TOKEN })
// Create a reaction watching this projection
await app.fetch(
req('POST', '/reactions', {
projection_def: 'bad_reaction_proj',
params: { task_id: taskId },
webhook_url: 'https://hook.example.com',
}),
{ DB: db, API_TOKEN: API_TOKEN },
)
// First emit — this will make the projection errored during reaction chain
const res1 = await app.fetch(
req('POST', '/events', { type: 'task_assigned', payload: { participant: agentId, subject: taskId } }),
{ DB: db, API_TOKEN: API_TOKEN },
)
const json1 = await res1.json()
expect(json1.reactions_fired).toBe(0)
// Second emit — projection should still be errored, no reaction
const res2 = await app.fetch(
req('POST', '/events', { type: 'task_assigned', payload: { participant: agentId, subject: taskId } }),
{ DB: db, API_TOKEN: API_TOKEN },
)
const json2 = await res2.json()
expect(json2.reactions_fired).toBe(0)
})
})

View File

@ -357,8 +357,13 @@ app.get('/projections/:name', async (c) => {
for (const [key, values] of Object.entries(rawParams)) { for (const [key, values] of Object.entries(rawParams)) {
params[key] = values[0] // take first value params[key] = values[0] // take first value
} }
const value = await getProjection(c.env.DB, name, params) const result = await getProjection(c.env.DB, name, params)
return c.json({ value }) const response: Record<string, any> = { value: result.value }
if (result.status === 'errored') {
response._status = 'errored'
response._error = result.error
}
return c.json(response)
} catch (err: any) { } catch (err: any) {
return apiError(c, 500, ErrorCode.INTERNAL_ERROR, err.message || 'Internal error') return apiError(c, 500, ErrorCode.INTERNAL_ERROR, err.message || 'Internal error')
} }