feat: GuardProjection — event admission control (pulse#9)
CI / test (push) Has been cancelled

- Add guard-projection.ts: registerGuard, checkGuards, applyGuardUpdates, getGuardState
- Guards use JSONata check + transition expressions, evaluated as data context
- Kind pattern matching with * wildcard (e.g. *.__start__)
- Integrate into store.ts appendEvent/appendEvents with transaction safety
- Handle nested transactions (inTransaction check)
- Export types and functions from index.ts
- 7 tests covering lifecycle, rejection, wildcards, complex expressions
This commit is contained in:
2026-04-18 13:34:48 +00:00
parent f59e3e7cb5
commit 6f4886db6a
4 changed files with 639 additions and 44 deletions
+251
View File
@@ -0,0 +1,251 @@
import { afterEach, expect, test } from 'bun:test';
import { mkdtempSync, rmSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import {
clearGuardExpressionCache,
getGuardState,
GuardViolationError,
matchEventKindPattern,
registerGuard,
} from './guard-projection.js';
import { createScopedStore } from './store.js';
function mkScoped() {
const root = mkdtempSync(join(tmpdir(), 'pulse-guard-'));
const scoped = createScopedStore({
basePath: join(root, 'scopes'),
objectsDir: join(root, 'objects'),
});
return { root, scoped, db: scoped.scopeDatabase('g'), store: scoped.scope('g') };
}
let last: ReturnType<typeof mkScoped> | null = null;
afterEach(() => {
clearGuardExpressionCache();
if (last) {
try {
void last.scoped.close();
} catch {
/* ignore */
}
rmSync(last.root, { recursive: true, force: true });
last = null;
}
});
test('register guard + legal event passes', async () => {
last = mkScoped();
const { db, store } = last;
registerGuard(db, {
name: 'task-lifecycle',
initial_value: { phase: 'idle' },
sources: [
{
kind: 'task.created',
check: '(event.kind != "task.created") or (state.phase = "idle")',
transition: '{ "phase": "open" }',
},
],
});
const r = await store.appendEvent({
kind: 'task.created',
key: 't1',
occurredAt: 100,
});
expect(r.id).toBeGreaterThan(0);
expect(getGuardState(db, 'task-lifecycle', 't1')).toEqual({ phase: 'open' });
});
test('illegal event rejected when task already open', async () => {
last = mkScoped();
const { db, store } = last;
registerGuard(db, {
name: 'task-lifecycle',
initial_value: { phase: 'idle' },
sources: [
{
kind: 'task.created',
check: '(event.kind != "task.created") or (state.phase = "idle")',
transition: '{ "phase": "open" }',
},
],
});
await store.appendEvent({
kind: 'task.created',
key: 't1',
occurredAt: 1,
});
expect(async () => {
await store.appendEvent({
kind: 'task.created',
key: 't1',
occurredAt: 2,
});
}).toThrow(GuardViolationError);
});
test('state transitions created → claimed → completed', async () => {
last = mkScoped();
const { db, store } = last;
registerGuard(db, {
name: 'task-lifecycle',
initial_value: { phase: 'idle' },
sources: [
{
kind: 'task.created',
check: '(event.kind != "task.created") or (state.phase = "idle")',
transition: '{ "phase": "open" }',
},
{
kind: 'task.claimed',
check: '(event.kind != "task.claimed") or (state.phase = "open")',
transition: '{ "phase": "claimed" }',
},
{
kind: 'task.completed',
check: '(event.kind != "task.completed") or (state.phase = "claimed")',
transition: '{ "phase": "done" }',
},
],
});
await store.appendEvent({
kind: 'task.created',
key: 'k',
occurredAt: 10,
});
expect(getGuardState(db, 'task-lifecycle', 'k')).toEqual({ phase: 'open' });
await store.appendEvent({
kind: 'task.claimed',
key: 'k',
occurredAt: 20,
});
expect(getGuardState(db, 'task-lifecycle', 'k')).toEqual({ phase: 'claimed' });
await store.appendEvent({
kind: 'task.completed',
key: 'k',
occurredAt: 30,
});
expect(getGuardState(db, 'task-lifecycle', 'k')).toEqual({ phase: 'done' });
});
test('unrelated event kind passes without guard match', async () => {
last = mkScoped();
const { db, store } = last;
registerGuard(db, {
name: 'only-task',
initial_value: { n: 0 },
sources: [
{
kind: 'task.*',
check: 'true',
transition: '{ "n": state.n + 1 }',
},
],
});
await store.appendEvent({
kind: 'other.thing',
key: 'x',
occurredAt: 1,
});
expect(getGuardState(db, 'only-task', 'x')).toEqual({ n: 0 });
});
test('getGuardState reflects latest state', async () => {
last = mkScoped();
const { db, store } = last;
registerGuard(db, {
name: 'g',
initial_value: { v: 0 },
sources: [
{
kind: 'tick',
check: 'true',
transition: '{ "v": state.v + 1 }',
},
],
});
await store.appendEvent({ kind: 'tick', key: 'k', occurredAt: 1 });
expect(getGuardState(db, 'g', 'k')).toEqual({ v: 1 });
await store.appendEvent({ kind: 'tick', key: 'k', occurredAt: 2 });
expect(getGuardState(db, 'g', 'k')).toEqual({ v: 2 });
});
test('complex JSONata check (and / or / in)', async () => {
last = mkScoped();
const { db, store } = last;
registerGuard(db, {
name: 'complex',
initial_value: { mode: 'a' },
sources: [
{
kind: 'x',
check:
'(event.kind != "x") or ((state.mode = "a" or state.mode = "b") and event.payload.role in ["u", "v"])',
transition: '{ "mode": "b" }',
},
],
});
await store.appendEvent({
kind: 'x',
key: 'k',
occurredAt: 1,
meta: JSON.stringify({ role: 'u' }),
});
expect(getGuardState(db, 'complex', 'k')).toEqual({ mode: 'b' });
});
test('kind wildcard *.__start__ matches meta.__start__ and coding.__start__', async () => {
last = mkScoped();
const { db, store } = last;
expect(matchEventKindPattern('*.__start__', 'meta.__start__')).toBe(true);
expect(matchEventKindPattern('*.__start__', 'coding.__start__')).toBe(true);
expect(matchEventKindPattern('*.__start__', 'other')).toBe(false);
registerGuard(db, {
name: 'starts',
initial_value: { kinds: [] },
sources: [
{
kind: '*.__start__',
check: 'true',
transition:
'{ "kinds": $append(state.kinds ? state.kinds : [], event.kind) }',
},
],
});
await store.appendEvent({
kind: 'meta.__start__',
key: 't',
occurredAt: 1,
});
await store.appendEvent({
kind: 'coding.__start__',
key: 't',
occurredAt: 2,
});
expect(getGuardState(db, 'starts', 't')).toEqual({
kinds: ['meta.__start__', 'coding.__start__'],
});
});
+319
View File
@@ -0,0 +1,319 @@
/**
* Guard projections: synchronous fold + check on append (JSONata).
*/
import type { Database } from 'bun:sqlite';
import jsonata, { type Expression } from 'jsonata';
// ── Schema ────────────────────────────────────────────────────
export const GUARD_SCHEMA = `
CREATE TABLE IF NOT EXISTS guard_defs (
name TEXT PRIMARY KEY,
initial_value TEXT NOT NULL,
sources TEXT NOT NULL,
created_at INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS guard_states (
guard_name TEXT NOT NULL,
key TEXT NOT NULL DEFAULT '',
value TEXT NOT NULL,
last_event_id INTEGER NOT NULL DEFAULT 0,
updated_at INTEGER NOT NULL,
PRIMARY KEY (guard_name, key)
);
`;
export function initGuardSchema(db: Database): void {
db.exec(GUARD_SCHEMA);
}
// ── Types ─────────────────────────────────────────────────────
export interface GuardSource {
kind: string;
key_prefix?: string;
check: string;
transition: string;
}
export interface GuardProjectionDef {
name: string;
initial_value: any;
sources: GuardSource[];
}
export class GuardViolationError extends Error {
constructor(
public guardName: string,
public eventKind: string,
public eventKey: string | undefined,
public reason: string,
) {
super(`Guard "${guardName}" rejected event ${eventKind}: ${reason}`);
this.name = 'GuardViolationError';
}
}
export interface GuardUpdate {
guardName: string;
key: string;
newState: any;
lastEventId: number;
}
// ── Expression cache ───────────────────────────────────────────
const exprCache = new Map<string, Expression>();
function expr(expressionStr: string): Expression {
let e = exprCache.get(expressionStr);
if (!e) {
e = jsonata(expressionStr);
exprCache.set(expressionStr, e);
}
return e;
}
/** @internal */
export function clearGuardExpressionCache(): void {
exprCache.clear();
guardDefMemory.clear();
}
// ── Kind pattern (* = one dot segment) ─────────────────────────
export function matchEventKindPattern(pattern: string, kind: string): boolean {
const ps = pattern.split('.');
const ks = kind.split('.');
if (ps.length !== ks.length) return false;
for (let i = 0; i < ps.length; i++) {
if (ps[i] === '*') continue;
if (ps[i] !== ks[i]) return false;
}
return true;
}
// ── DB helpers ─────────────────────────────────────────────────
const insertGuardDefStmt = (db: Database) =>
db.prepare(`
INSERT OR REPLACE INTO guard_defs (name, initial_value, sources, created_at)
VALUES (?, ?, ?, ?)
`);
const selectGuardDefsStmt = (db: Database) =>
db.prepare(`SELECT name, initial_value, sources FROM guard_defs`);
const selectGuardStateStmt = (db: Database) =>
db.prepare(`
SELECT value, last_event_id FROM guard_states
WHERE guard_name = ? AND key = ?
`);
const upsertGuardStateStmt = (db: Database) =>
db.prepare(`
INSERT INTO guard_states (guard_name, key, value, last_event_id, updated_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(guard_name, key) DO UPDATE SET
value = excluded.value,
last_event_id = excluded.last_event_id,
updated_at = excluded.updated_at
`);
function listGuardDefs(db: Database): GuardProjectionDef[] {
const rows = selectGuardDefsStmt(db).all() as {
name: string;
initial_value: string;
sources: string;
}[];
const byName = new Map<string, GuardProjectionDef>();
for (const r of rows) {
byName.set(r.name, {
name: r.name,
initial_value: JSON.parse(r.initial_value) as unknown,
sources: JSON.parse(r.sources) as GuardSource[],
});
}
for (const [name, def] of guardDefMemory) {
byName.set(name, def);
}
return [...byName.values()];
}
function loadGuardRow(
db: Database,
guardName: string,
key: string,
): { value: any; lastEventId: number } | null {
const row = selectGuardStateStmt(db).get(guardName, key) as {
value: string;
last_event_id: number;
} | null;
if (!row) return null;
return {
value: JSON.parse(row.value) as unknown,
lastEventId: Number(row.last_event_id),
};
}
function eventBindings(
event: {
kind: string;
key?: string;
meta?: string;
occurredAt: number;
},
eventId: number,
) {
return {
id: eventId,
occurred_at: event.occurredAt,
kind: event.kind,
key: event.key,
payload: event.meta ? JSON.parse(event.meta) : {},
};
}
async function evalBool(
label: string,
expressionStr: string,
context: Record<string, unknown>,
): Promise<boolean> {
const e = expr(expressionStr);
const out = await e.evaluate(context, {});
if (typeof out === 'boolean') return out;
if (out === null || out === undefined) return false;
throw new Error(`${label} must return boolean, got ${typeof out}`);
}
/**
* Register guard (persist + memory cache of compiled defs for this process).
*/
const guardDefMemory = new Map<string, GuardProjectionDef>();
export function registerGuard(db: Database, def: GuardProjectionDef): void {
const now = Date.now();
insertGuardDefStmt(db).run(
def.name,
JSON.stringify(def.initial_value),
JSON.stringify(def.sources),
now,
);
guardDefMemory.set(def.name, def);
}
/**
* Read guard's current state (same idea as lazy projection read).
*/
export function getGuardState(db: Database, guardName: string, key: string): any {
const row = loadGuardRow(db, guardName, key);
if (!row) {
const defs = listGuardDefs(db);
const def = defs.find((d) => d.name === guardName);
return def ? def.initial_value : undefined;
}
return row.value;
}
/**
* Before append: match guards, fold, check. Returns pending state updates
* (lastEventId is a placeholder; caller sets to written event id).
*/
export async function checkGuards(
db: Database,
event: { kind: string; key?: string; meta?: string; occurredAt: number },
): Promise<{ updates: GuardUpdate[] }> {
const defs = listGuardDefs(db);
const updates: GuardUpdate[] = [];
const pendingEventId = 0;
for (const def of defs) {
const stateKey = event.key ?? '';
const row = loadGuardRow(db, def.name, stateKey);
const baseState = row ? row.value : def.initial_value;
let working = baseState;
let matchedAny = false;
for (const source of def.sources) {
if (!matchEventKindPattern(source.kind, event.kind)) continue;
if (source.key_prefix !== undefined) {
const k = event.key;
if (k === undefined || !k.startsWith(source.key_prefix)) continue;
}
matchedAny = true;
const ev = eventBindings(event, pendingEventId);
const ctx = { state: working, event: ev };
let newState: any;
try {
newState = await expr(source.transition).evaluate(ctx, {});
} catch (err: any) {
throw new GuardViolationError(
def.name,
event.kind,
event.key,
`transition failed: ${err?.message ?? String(err)}`,
);
}
let ok: boolean;
try {
ok = await evalBool('check', source.check, ctx);
} catch (err: any) {
throw new GuardViolationError(
def.name,
event.kind,
event.key,
`check failed: ${err?.message ?? String(err)}`,
);
}
if (!ok) {
throw new GuardViolationError(
def.name,
event.kind,
event.key,
'check returned false',
);
}
working = newState;
}
if (matchedAny) {
updates.push({
guardName: def.name,
key: stateKey,
newState: working,
lastEventId: 0,
});
}
}
return { updates };
}
/**
* After event insert: persist guard states (same transaction as the event).
*/
export function applyGuardUpdates(db: Database, updates: GuardUpdate[]): void {
const now = Date.now();
const stmt = upsertGuardStateStmt(db);
for (const u of updates) {
stmt.run(
u.guardName,
u.key,
JSON.stringify(u.newState),
u.lastEventId,
now,
);
}
}
+9
View File
@@ -1018,6 +1018,15 @@ export {
// ── Projection Engine ───────────────────────────────────────────
export * from './projection-engine.js';
// ── Guard projections ───────────────────────────────────────────
export {
type GuardProjectionDef,
type GuardSource,
GuardViolationError,
getGuardState,
registerGuard,
} from './guard-projection.js';
// ── Task Event Types ────────────────────────────────────────────
export type {
ActiveProjectsData,
+60 -44
View File
@@ -16,6 +16,11 @@ import {
} from 'node:fs';
import { dirname, join } from 'node:path';
import { initDefsSchema } from './defs.js';
import {
applyGuardUpdates,
checkGuards,
initGuardSchema,
} from './guard-projection.js';
import { PROJECTIONS_SCHEMA } from './projection-engine.js';
// ── Types ──────────────────────────────────────────────────────
@@ -212,6 +217,55 @@ function rowToObjectInstance(row: RawObjectRow): ObjectInstance {
};
}
async function appendOneWithGuards(
db: Database,
insert: (event: Omit<EventRecord, 'id'>) => EventRecord,
event: Omit<EventRecord, 'id'>,
): Promise<EventRecord> {
const nested = db.inTransaction;
if (!nested) db.exec('BEGIN IMMEDIATE');
try {
const { updates } = await checkGuards(db, event);
const written = insert(event);
applyGuardUpdates(
db,
updates.map((u) => ({ ...u, lastEventId: written.id })),
);
if (!nested) db.exec('COMMIT');
return written;
} catch (e) {
if (!nested) db.exec('ROLLBACK');
throw e;
}
}
async function appendManyWithGuards(
db: Database,
insert: (event: Omit<EventRecord, 'id'>) => EventRecord,
events: Omit<EventRecord, 'id'>[],
): Promise<EventRecord[]> {
if (events.length === 0) return [];
const nested = db.inTransaction;
if (!nested) db.exec('BEGIN IMMEDIATE');
try {
const results: EventRecord[] = [];
for (const event of events) {
const { updates } = await checkGuards(db, event);
const written = insert(event);
applyGuardUpdates(
db,
updates.map((u) => ({ ...u, lastEventId: written.id })),
);
results.push(written);
}
if (!nested) db.exec('COMMIT');
return results;
} catch (e) {
if (!nested) db.exec('ROLLBACK');
throw e;
}
}
// ── Factory ────────────────────────────────────────────────────
export interface CreateStoreOptions {
@@ -232,6 +286,7 @@ export function createStore(options: CreateStoreOptions): PulseStore {
eventsDb.exec(EVENTS_SCHEMA);
eventsDb.exec(OBJECTS_SCHEMA);
migrateEventsObjectId(eventsDb);
initGuardSchema(eventsDb);
const insertEvent = eventsDb.prepare(`
INSERT INTO events (occurred_at, kind, key, hash, code_rev, meta, object_id)
@@ -263,35 +318,15 @@ export function createStore(options: CreateStoreOptions): PulseStore {
return { id, ...event };
}
const appendManyTx = eventsDb.transaction(
(events: Omit<EventRecord, 'id'>[]): EventRecord[] => {
const results: EventRecord[] = [];
for (const event of events) {
const result = insertEvent.run(
event.occurredAt,
event.kind,
event.key ?? null,
event.hash ?? null,
event.codeRev ?? null,
event.meta ?? null,
event.objectId ?? null,
);
const id = Number(result.lastInsertRowid);
results.push({ id, ...event });
}
return results;
},
);
return {
async appendEvent(event: Omit<EventRecord, 'id'>): Promise<EventRecord> {
return doAppendEvent(event);
return appendOneWithGuards(eventsDb, doAppendEvent, event);
},
async appendEvents(
events: Omit<EventRecord, 'id'>[],
): Promise<EventRecord[]> {
return appendManyTx(events);
return appendManyWithGuards(eventsDb, doAppendEvent, events);
},
async getLatest(kind: string, key?: string): Promise<EventRecord | null> {
@@ -538,6 +573,7 @@ function openScopeDb(path: string): Database {
// Each scope carries its own def tables (bun:sqlite is sync under async wrapper)
void initDefsSchema(db);
initGuardSchema(db);
return db;
}
@@ -573,33 +609,13 @@ function createScopeStore(db: Database, objectsDir: string): PulseStore {
return { id, ...event };
}
const appendManyTx = db.transaction(
(events: Omit<EventRecord, 'id'>[]): EventRecord[] => {
const results: EventRecord[] = [];
for (const event of events) {
const result = insertEvent.run(
event.occurredAt,
event.kind,
event.key ?? null,
event.hash ?? null,
event.codeRev ?? null,
event.meta ?? null,
event.objectId ?? null,
);
const id = Number(result.lastInsertRowid);
results.push({ id, ...event });
}
return results;
},
);
return {
async appendEvent(event) {
return doAppendEvent(event);
return appendOneWithGuards(db, doAppendEvent, event);
},
async appendEvents(events) {
return appendManyTx(events);
return appendManyWithGuards(db, doAppendEvent, events);
},
async getLatest(kind, key?) {