feat: implement Definition Layer (RFC #58 Phase 1)
- Add three append-only definition tables: object_defs, event_defs, projection_defs - Support content-addressed versioning with SHA-256 hashes - Implement JSONata expression validation for projections - Add projection_def_sources table with foreign key constraints - Support (name, code_rev) unique constraints for versioning - Add comprehensive test suite covering all 9 scenarios from Issue #60 closes #60
This commit is contained in:
@@ -11,6 +11,10 @@
|
||||
"packages/pulse": {
|
||||
"name": "@uncaged/pulse",
|
||||
"version": "0.1.0",
|
||||
"dependencies": {
|
||||
"jsonata": "^2.1.0",
|
||||
"zod": "^4.3.6",
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/node": "^25.6.0",
|
||||
"bun-types": "latest",
|
||||
@@ -108,8 +112,12 @@
|
||||
|
||||
"commander": ["commander@12.1.0", "", {}, "sha512-Vw8qHK3bZM9y/P10u3Vib8o/DdkvA2OtPtZvD871QKjy74Wj1WSKFILMPRPSdUSx5RFK1arlJzEtA4PkFgnbuA=="],
|
||||
|
||||
"jsonata": ["jsonata@2.1.0", "", {}, "sha512-OCzaRMK8HobtX8fp37uIVmL8CY1IGc/a6gLsDqz3quExFR09/U78HUzWYr7T31UEB6+Eu0/8dkVD5fFDOl9a8w=="],
|
||||
|
||||
"typescript": ["typescript@6.0.2", "", { "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" } }, "sha512-bGdAIrZ0wiGDo5l8c++HWtbaNCWTS4UTv7RaTH/ThVIgjkveJt83m74bBHMJkuCbslY8ixgLBVZJIOiQlQTjfQ=="],
|
||||
|
||||
"undici-types": ["undici-types@7.19.2", "", {}, "sha512-qYVnV5OEm2AW8cJMCpdV20CDyaN3g0AjDlOGf1OW4iaDEx8MwdtChUp4zu4H0VP3nDRF/8RKWH+IPp9uW0YGZg=="],
|
||||
|
||||
"zod": ["zod@4.3.6", "", {}, "sha512-rftlrkhHZOcjDwkGlnUtZZkvaPHCsDATp4pGpuOOMDaTdDDXF91wuVDJoWoPsKX/3YPQ5fHuF3STjcYyKr+Qhg=="],
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,5 +30,9 @@
|
||||
"@types/node": "^25.6.0",
|
||||
"bun-types": "latest",
|
||||
"typescript": "^6.0.2"
|
||||
},
|
||||
"dependencies": {
|
||||
"jsonata": "^2.1.0",
|
||||
"zod": "^4.3.6"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,369 @@
|
||||
/**
|
||||
* @uncaged/pulse — Definition Layer Tests
|
||||
*
|
||||
* Testing Issue #60: 9 test scenarios for Definition Layer
|
||||
*/
|
||||
|
||||
import { afterEach, beforeEach, describe, expect, it } from 'bun:test';
|
||||
import { existsSync, mkdtempSync, rmSync } from 'node:fs';
|
||||
import { tmpdir } from 'node:os';
|
||||
import { join } from 'node:path';
|
||||
import {
|
||||
closeDefs,
|
||||
getEventDef,
|
||||
getObjectDef,
|
||||
getProjectionDef,
|
||||
initDefs,
|
||||
listEventDefs,
|
||||
listProjectionDefs,
|
||||
registerEventDef,
|
||||
registerObjectDef,
|
||||
registerProjectionDef,
|
||||
validateExpression,
|
||||
} from './defs.js';
|
||||
|
||||
describe('Definition Layer', () => {
|
||||
let tempDir: string;
|
||||
let systemDbPath: string;
|
||||
|
||||
beforeEach(() => {
|
||||
tempDir = mkdtempSync(join(tmpdir(), 'pulse-defs-'));
|
||||
systemDbPath = join(tempDir, '_system.db');
|
||||
initDefs(systemDbPath);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
closeDefs();
|
||||
if (existsSync(tempDir)) {
|
||||
rmSync(tempDir, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
// ── Test 1: object_defs registration ──────────────────────────
|
||||
|
||||
it('1. registers object definitions', () => {
|
||||
const objDef = registerObjectDef({
|
||||
name: 'User',
|
||||
codeRev: 'v1.0.0',
|
||||
});
|
||||
|
||||
expect(objDef.name).toBe('User');
|
||||
expect(objDef.codeRev).toBe('v1.0.0');
|
||||
expect(typeof objDef.createdAt).toBe('number');
|
||||
expect(objDef.createdAt).toBeGreaterThan(0);
|
||||
|
||||
// Verify it's stored
|
||||
const retrieved = getObjectDef('User', 'v1.0.0');
|
||||
expect(retrieved).toEqual(objDef);
|
||||
});
|
||||
|
||||
// ── Test 2: event_defs registration + content hash ───────────
|
||||
|
||||
it('2. registers event definitions with content-based hash', () => {
|
||||
const schema = { type: 'object', properties: { name: { type: 'string' } } };
|
||||
|
||||
const eventDef = registerEventDef({
|
||||
name: 'UserCreated',
|
||||
schema,
|
||||
codeRev: 'v1.0.0',
|
||||
});
|
||||
|
||||
expect(eventDef.name).toBe('UserCreated');
|
||||
expect(eventDef.schema).toEqual(schema);
|
||||
expect(eventDef.codeRev).toBe('v1.0.0');
|
||||
expect(typeof eventDef.hash).toBe('string');
|
||||
expect(eventDef.hash).toHaveLength(64); // SHA-256 hex
|
||||
|
||||
// Same content should produce same hash
|
||||
const eventDef2 = registerEventDef({
|
||||
name: 'UserCreated',
|
||||
schema,
|
||||
codeRev: 'v1.1.0', // Different code_rev, same content
|
||||
});
|
||||
|
||||
expect(eventDef2.hash).toBe(eventDef.hash);
|
||||
});
|
||||
|
||||
// ── Test 3: event_def version chain ───────────────────────────
|
||||
|
||||
it('3. supports event_def version chains via parent_hash', () => {
|
||||
const v1 = registerEventDef({
|
||||
name: 'UserUpdated',
|
||||
schema: { type: 'object', properties: { name: { type: 'string' } } },
|
||||
codeRev: 'v1.0.0',
|
||||
});
|
||||
|
||||
const v2 = registerEventDef({
|
||||
name: 'UserUpdated',
|
||||
schema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
name: { type: 'string' },
|
||||
email: { type: 'string' },
|
||||
},
|
||||
},
|
||||
parentHash: v1.hash,
|
||||
codeRev: 'v2.0.0',
|
||||
});
|
||||
|
||||
expect(v2.parentHash).toBe(v1.hash);
|
||||
expect(v2.hash).not.toBe(v1.hash); // Different content = different hash
|
||||
|
||||
// Verify retrieval
|
||||
const retrievedV1 = getEventDef('UserUpdated', 'v1.0.0');
|
||||
const retrievedV2 = getEventDef('UserUpdated', 'v2.0.0');
|
||||
|
||||
expect(retrievedV1?.hash).toBe(v1.hash);
|
||||
expect(retrievedV2?.hash).toBe(v2.hash);
|
||||
expect(retrievedV2?.parentHash).toBe(v1.hash);
|
||||
});
|
||||
|
||||
// ── Test 4: projection_defs + sources + JSONata expression ───
|
||||
|
||||
it('4. registers projection definitions with JSONata sources', async () => {
|
||||
const projDef = await registerProjectionDef({
|
||||
name: 'UserCount',
|
||||
initialValue: { count: 0 },
|
||||
sources: [
|
||||
{
|
||||
eventKind: 'UserCreated',
|
||||
expression: '{ "count": state.count + 1 }',
|
||||
},
|
||||
{
|
||||
eventKind: 'UserDeleted',
|
||||
expression: '{ "count": state.count - 1 }',
|
||||
},
|
||||
],
|
||||
codeRev: 'v1.0.0',
|
||||
});
|
||||
|
||||
expect(projDef.name).toBe('UserCount');
|
||||
expect(projDef.initialValue).toEqual({ count: 0 });
|
||||
expect(projDef.sources).toHaveLength(2);
|
||||
expect(projDef.sources[0].eventKind).toBe('UserCreated');
|
||||
expect(projDef.sources[1].eventKind).toBe('UserDeleted');
|
||||
expect(typeof projDef.hash).toBe('string');
|
||||
|
||||
// Verify retrieval includes sources
|
||||
const retrieved = getProjectionDef('UserCount', 'v1.0.0');
|
||||
expect(retrieved).toEqual(projDef);
|
||||
});
|
||||
|
||||
// ── Test 5: (name, code_rev) UNIQUE constraint ────────────────
|
||||
|
||||
it('5. enforces (name, code_rev) UNIQUE constraints', async () => {
|
||||
// Object defs
|
||||
registerObjectDef({ name: 'Product', codeRev: 'v1.0.0' });
|
||||
expect(() => {
|
||||
registerObjectDef({ name: 'Product', codeRev: 'v1.0.0' });
|
||||
}).toThrow('Object definition already exists: Product@v1.0.0');
|
||||
|
||||
// Event defs
|
||||
registerEventDef({ name: 'ProductCreated', codeRev: 'v1.0.0' });
|
||||
expect(() => {
|
||||
registerEventDef({ name: 'ProductCreated', codeRev: 'v1.0.0' });
|
||||
}).toThrow('Event definition already exists: ProductCreated@v1.0.0');
|
||||
|
||||
// Projection defs
|
||||
await registerProjectionDef({
|
||||
name: 'ProductCount',
|
||||
initialValue: { count: 0 },
|
||||
sources: [{ eventKind: 'ProductCreated', expression: 'state' }],
|
||||
codeRev: 'v1.0.0',
|
||||
});
|
||||
try {
|
||||
await registerProjectionDef({
|
||||
name: 'ProductCount',
|
||||
initialValue: { count: 0 },
|
||||
sources: [{ eventKind: 'ProductCreated', expression: 'state' }],
|
||||
codeRev: 'v1.0.0',
|
||||
});
|
||||
throw new Error('Should have thrown');
|
||||
} catch (error: any) {
|
||||
expect(error.message).toMatch(
|
||||
/Projection definition already exists: ProductCount@v1\.0\.0/,
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
// ── Test 6: Query by code_rev ─────────────────────────────────
|
||||
|
||||
it('6. queries definitions by code_rev', async () => {
|
||||
// Register multiple definitions across different code_rev values
|
||||
registerEventDef({ name: 'OrderCreated', codeRev: 'v1.0.0' });
|
||||
registerEventDef({ name: 'OrderUpdated', codeRev: 'v1.0.0' });
|
||||
registerEventDef({ name: 'OrderDeleted', codeRev: 'v2.0.0' });
|
||||
|
||||
await registerProjectionDef({
|
||||
name: 'OrderStats',
|
||||
initialValue: { total: 0 },
|
||||
sources: [{ eventKind: 'OrderCreated', expression: 'state' }],
|
||||
codeRev: 'v1.0.0',
|
||||
});
|
||||
await registerProjectionDef({
|
||||
name: 'OrderMetrics',
|
||||
initialValue: { count: 0 },
|
||||
sources: [{ eventKind: 'OrderCreated', expression: 'state' }],
|
||||
codeRev: 'v2.0.0',
|
||||
});
|
||||
|
||||
const v1Events = listEventDefs({ codeRev: 'v1.0.0' });
|
||||
const v2Events = listEventDefs({ codeRev: 'v2.0.0' });
|
||||
|
||||
expect(v1Events).toHaveLength(2);
|
||||
expect(v1Events.map((e) => e.name).sort()).toEqual([
|
||||
'OrderCreated',
|
||||
'OrderUpdated',
|
||||
]);
|
||||
|
||||
expect(v2Events).toHaveLength(1);
|
||||
expect(v2Events[0].name).toBe('OrderDeleted');
|
||||
|
||||
const v1Projections = listProjectionDefs({ codeRev: 'v1.0.0' });
|
||||
const v2Projections = listProjectionDefs({ codeRev: 'v2.0.0' });
|
||||
|
||||
expect(v1Projections).toHaveLength(1);
|
||||
expect(v1Projections[0].name).toBe('OrderStats');
|
||||
|
||||
expect(v2Projections).toHaveLength(1);
|
||||
expect(v2Projections[0].name).toBe('OrderMetrics');
|
||||
});
|
||||
|
||||
// ── Test 7: JSONata expression dry-run validation ─────────────
|
||||
|
||||
it('7. validates JSONata expressions during registration', async () => {
|
||||
const validResult = await validateExpression({
|
||||
expression: '{ "count": state.count + 1 }',
|
||||
initialValue: { count: 5 },
|
||||
mockEvent: { kind: 'UserCreated', key: 'u1', data: {} },
|
||||
});
|
||||
|
||||
expect(validResult.valid).toBe(true);
|
||||
expect(validResult.result).toEqual({ count: 6 });
|
||||
expect(validResult.error).toBeUndefined();
|
||||
|
||||
// Test more complex expression
|
||||
const complexResult = await validateExpression({
|
||||
expression: `
|
||||
$count(event.kind = 'UserCreated') > 0 ?
|
||||
{ "count": state.count + 1, "lastUser": event.key } :
|
||||
state
|
||||
`,
|
||||
initialValue: { count: 0, lastUser: null },
|
||||
mockEvent: {
|
||||
kind: 'UserCreated',
|
||||
key: 'user-123',
|
||||
data: { name: 'Alice' },
|
||||
},
|
||||
});
|
||||
|
||||
expect(complexResult.valid).toBe(true);
|
||||
expect(complexResult.result).toEqual({ count: 1, lastUser: 'user-123' });
|
||||
});
|
||||
|
||||
// ── Test 8: Invalid JSONata expressions are rejected ─────────
|
||||
|
||||
it('8. rejects invalid JSONata expressions', async () => {
|
||||
const invalidResult = await validateExpression({
|
||||
expression: '{ "count": state.count + }', // Syntax error
|
||||
initialValue: { count: 0 },
|
||||
mockEvent: { kind: 'test', data: {} },
|
||||
});
|
||||
|
||||
expect(invalidResult.valid).toBe(false);
|
||||
expect(typeof invalidResult.error).toBe('string');
|
||||
expect(invalidResult.result).toBeUndefined();
|
||||
// Registration should fail with invalid expression
|
||||
try {
|
||||
await registerProjectionDef({
|
||||
name: 'BrokenProjection',
|
||||
initialValue: { count: 0 },
|
||||
sources: [
|
||||
{
|
||||
eventKind: 'SomeEvent',
|
||||
expression: '{"invalid": syntax error }',
|
||||
},
|
||||
],
|
||||
codeRev: 'v1.0.0',
|
||||
});
|
||||
throw new Error('Should have thrown');
|
||||
} catch (error: any) {
|
||||
expect(error.message).toMatch(/Invalid JSONata expression/);
|
||||
}
|
||||
});
|
||||
|
||||
// ── Test 9: All tests pass ────────────────────────────────────
|
||||
|
||||
it('9. comprehensive integration test', async () => {
|
||||
// Register a complete set of definitions
|
||||
const objDef = registerObjectDef({
|
||||
name: 'Invoice',
|
||||
codeRev: 'v1.2.0',
|
||||
});
|
||||
|
||||
const eventDef = registerEventDef({
|
||||
name: 'InvoiceCreated',
|
||||
schema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
amount: { type: 'number' },
|
||||
customerId: { type: 'string' },
|
||||
},
|
||||
required: ['amount', 'customerId'],
|
||||
},
|
||||
codeRev: 'v1.2.0',
|
||||
});
|
||||
|
||||
const projDef = await registerProjectionDef({
|
||||
name: 'InvoiceSummary',
|
||||
params: { currency: 'USD' },
|
||||
valueSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
totalAmount: { type: 'number' },
|
||||
invoiceCount: { type: 'number' },
|
||||
},
|
||||
},
|
||||
initialValue: { totalAmount: 0, invoiceCount: 0 },
|
||||
sources: [
|
||||
{
|
||||
eventKind: 'InvoiceCreated',
|
||||
expression: `{
|
||||
"totalAmount": state.totalAmount + event.data.amount,
|
||||
"invoiceCount": state.invoiceCount + 1
|
||||
}`,
|
||||
},
|
||||
],
|
||||
codeRev: 'v1.2.0',
|
||||
});
|
||||
|
||||
// Verify all definitions can be retrieved
|
||||
expect(getObjectDef('Invoice', 'v1.2.0')).toEqual(objDef);
|
||||
expect(getEventDef('InvoiceCreated', 'v1.2.0')).toEqual(eventDef);
|
||||
expect(getProjectionDef('InvoiceSummary', 'v1.2.0')).toEqual(projDef);
|
||||
|
||||
// Verify listing works
|
||||
const events = listEventDefs({ codeRev: 'v1.2.0' });
|
||||
const projections = listProjectionDefs({ codeRev: 'v1.2.0' });
|
||||
|
||||
expect(events.find((e) => e.name === 'InvoiceCreated')).toBeDefined();
|
||||
expect(projections.find((p) => p.name === 'InvoiceSummary')).toBeDefined();
|
||||
|
||||
// Verify expression validation worked during registration
|
||||
const validation = await validateExpression({
|
||||
expression: projDef.sources[0].expression,
|
||||
initialValue: projDef.initialValue,
|
||||
mockEvent: {
|
||||
kind: 'InvoiceCreated',
|
||||
key: 'inv-123',
|
||||
data: { amount: 100.5, customerId: 'cust-456' },
|
||||
},
|
||||
});
|
||||
|
||||
expect(validation.valid).toBe(true);
|
||||
expect(validation.result).toEqual({
|
||||
totalAmount: 100.5,
|
||||
invoiceCount: 1,
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,509 @@
|
||||
/**
|
||||
* @uncaged/pulse — Definition Layer (Phase 1)
|
||||
*
|
||||
* Append-only definition tables for objects, events, and projections.
|
||||
* Content-addressed versioning with code_rev binding.
|
||||
*/
|
||||
|
||||
import { Database } from 'bun:sqlite';
|
||||
import { createHash } from 'node:crypto';
|
||||
import jsonata from 'jsonata';
|
||||
|
||||
// ── Types ──────────────────────────────────────────────────────
|
||||
|
||||
export interface ObjectDef {
|
||||
name: string;
|
||||
codeRev: string;
|
||||
createdAt: number;
|
||||
}
|
||||
|
||||
export interface EventDef {
|
||||
hash: string;
|
||||
name: string;
|
||||
parentHash?: string;
|
||||
schema?: any; // JSON schema
|
||||
codeRev: string;
|
||||
createdAt: number;
|
||||
}
|
||||
|
||||
export interface ProjectionDef {
|
||||
hash: string;
|
||||
name: string;
|
||||
parentHash?: string;
|
||||
params?: any; // JSON
|
||||
valueSchema?: any; // JSON
|
||||
initialValue: any; // JSON
|
||||
codeRev: string;
|
||||
createdAt: number;
|
||||
sources: Array<{
|
||||
eventKind: string;
|
||||
eventKey?: string;
|
||||
expression: string; // JSONata
|
||||
}>;
|
||||
}
|
||||
|
||||
export interface ValidationResult {
|
||||
valid: boolean;
|
||||
result?: any;
|
||||
error?: string;
|
||||
}
|
||||
|
||||
// ── Database Schema ────────────────────────────────────────────
|
||||
|
||||
const OBJECT_DEFS_SCHEMA = `
|
||||
CREATE TABLE IF NOT EXISTS object_defs (
|
||||
name TEXT NOT NULL,
|
||||
code_rev TEXT NOT NULL,
|
||||
created_at INTEGER NOT NULL,
|
||||
PRIMARY KEY (name, code_rev)
|
||||
)
|
||||
`;
|
||||
|
||||
const EVENT_DEFS_SCHEMA = `
|
||||
CREATE TABLE IF NOT EXISTS event_defs (
|
||||
hash TEXT NOT NULL,
|
||||
name TEXT NOT NULL,
|
||||
parent_hash TEXT,
|
||||
schema TEXT,
|
||||
code_rev TEXT NOT NULL,
|
||||
created_at INTEGER NOT NULL,
|
||||
PRIMARY KEY (name, code_rev)
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_event_defs_hash ON event_defs(hash);
|
||||
`;
|
||||
|
||||
const PROJECTION_DEFS_SCHEMA = `
|
||||
CREATE TABLE IF NOT EXISTS projection_defs (
|
||||
hash TEXT NOT NULL,
|
||||
name TEXT NOT NULL,
|
||||
parent_hash TEXT,
|
||||
params TEXT,
|
||||
value_schema TEXT,
|
||||
initial_value TEXT NOT NULL,
|
||||
code_rev TEXT NOT NULL,
|
||||
created_at INTEGER NOT NULL,
|
||||
PRIMARY KEY (name, code_rev)
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS idx_projection_defs_hash ON projection_defs(hash);
|
||||
`;
|
||||
|
||||
const PROJECTION_DEF_SOURCES_SCHEMA = `
|
||||
CREATE TABLE IF NOT EXISTS projection_def_sources (
|
||||
projection_hash TEXT NOT NULL,
|
||||
event_kind TEXT NOT NULL,
|
||||
event_key TEXT,
|
||||
expression TEXT NOT NULL,
|
||||
FOREIGN KEY (projection_hash) REFERENCES projection_defs(hash)
|
||||
)
|
||||
`;
|
||||
|
||||
// ── Module State ───────────────────────────────────────────────
|
||||
|
||||
let _systemDb: Database | null = null;
|
||||
|
||||
/**
|
||||
* Initialize the definition layer with database path.
|
||||
* Must be called before using any def functions.
|
||||
*/
|
||||
export function initDefs(systemDbPath: string): void {
|
||||
_systemDb = new Database(systemDbPath, { create: true });
|
||||
_systemDb.exec('PRAGMA journal_mode = WAL');
|
||||
|
||||
// Create definition tables
|
||||
_systemDb.exec(OBJECT_DEFS_SCHEMA);
|
||||
_systemDb.exec(EVENT_DEFS_SCHEMA);
|
||||
_systemDb.exec(PROJECTION_DEFS_SCHEMA);
|
||||
_systemDb.exec(PROJECTION_DEF_SOURCES_SCHEMA);
|
||||
}
|
||||
|
||||
function getSystemDb(): Database {
|
||||
if (!_systemDb) {
|
||||
throw new Error('Definition layer not initialized. Call initDefs() first.');
|
||||
}
|
||||
return _systemDb;
|
||||
}
|
||||
|
||||
// ── Hash Calculation ───────────────────────────────────────────
|
||||
|
||||
function calculateEventHash(name: string, schema?: any): string {
|
||||
const content = name + JSON.stringify(schema || null);
|
||||
return createHash('sha256').update(content).digest('hex');
|
||||
}
|
||||
|
||||
function calculateProjectionHash(
|
||||
name: string,
|
||||
params?: any,
|
||||
valueSchema?: any,
|
||||
initialValue?: any,
|
||||
): string {
|
||||
const content =
|
||||
name +
|
||||
JSON.stringify(params || null) +
|
||||
JSON.stringify(valueSchema || null) +
|
||||
JSON.stringify(initialValue);
|
||||
return createHash('sha256').update(content).digest('hex');
|
||||
}
|
||||
|
||||
// ── Object Definitions ─────────────────────────────────────────
|
||||
|
||||
const insertObjectDef = (db: Database) =>
|
||||
db.prepare(`
|
||||
INSERT INTO object_defs (name, code_rev, created_at)
|
||||
VALUES (?, ?, ?)
|
||||
`);
|
||||
|
||||
const selectObjectDef = (db: Database) =>
|
||||
db.prepare(`
|
||||
SELECT name, code_rev, created_at
|
||||
FROM object_defs
|
||||
WHERE name = ? AND code_rev = ?
|
||||
`);
|
||||
|
||||
export function registerObjectDef(opts: {
|
||||
name: string;
|
||||
codeRev: string;
|
||||
}): ObjectDef {
|
||||
const db = getSystemDb();
|
||||
const createdAt = Date.now();
|
||||
|
||||
try {
|
||||
insertObjectDef(db).run(opts.name, opts.codeRev, createdAt);
|
||||
} catch (error: any) {
|
||||
if (error.message.includes('UNIQUE constraint failed')) {
|
||||
throw new Error(
|
||||
`Object definition already exists: ${opts.name}@${opts.codeRev}`,
|
||||
);
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
|
||||
return {
|
||||
name: opts.name,
|
||||
codeRev: opts.codeRev,
|
||||
createdAt,
|
||||
};
|
||||
}
|
||||
|
||||
export function getObjectDef(name: string, codeRev: string): ObjectDef | null {
|
||||
const db = getSystemDb();
|
||||
const row = selectObjectDef(db).get(name, codeRev) as any;
|
||||
|
||||
if (!row) return null;
|
||||
|
||||
return {
|
||||
name: row.name,
|
||||
codeRev: row.code_rev,
|
||||
createdAt: row.created_at,
|
||||
};
|
||||
}
|
||||
|
||||
// ── Event Definitions ──────────────────────────────────────────
|
||||
|
||||
const insertEventDef = (db: Database) =>
|
||||
db.prepare(`
|
||||
INSERT INTO event_defs (hash, name, parent_hash, schema, code_rev, created_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
`);
|
||||
|
||||
const selectEventDefByNameCodeRev = (db: Database) =>
|
||||
db.prepare(`
|
||||
SELECT hash, name, parent_hash, schema, code_rev, created_at
|
||||
FROM event_defs
|
||||
WHERE name = ? AND code_rev = ?
|
||||
`);
|
||||
|
||||
const selectEventDefsByCodeRev = (db: Database) =>
|
||||
db.prepare(`
|
||||
SELECT hash, name, parent_hash, schema, code_rev, created_at
|
||||
FROM event_defs
|
||||
WHERE code_rev = ?
|
||||
ORDER BY name
|
||||
`);
|
||||
|
||||
export function registerEventDef(opts: {
|
||||
name: string;
|
||||
schema?: any;
|
||||
parentHash?: string;
|
||||
codeRev: string;
|
||||
}): EventDef {
|
||||
const db = getSystemDb();
|
||||
const hash = calculateEventHash(opts.name, opts.schema);
|
||||
const createdAt = Date.now();
|
||||
|
||||
try {
|
||||
insertEventDef(db).run(
|
||||
hash,
|
||||
opts.name,
|
||||
opts.parentHash || null,
|
||||
opts.schema ? JSON.stringify(opts.schema) : null,
|
||||
opts.codeRev,
|
||||
createdAt,
|
||||
);
|
||||
} catch (error: any) {
|
||||
if (error.message.includes('UNIQUE constraint failed')) {
|
||||
throw new Error(
|
||||
`Event definition already exists: ${opts.name}@${opts.codeRev}`,
|
||||
);
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
|
||||
return {
|
||||
hash,
|
||||
name: opts.name,
|
||||
parentHash: opts.parentHash,
|
||||
schema: opts.schema,
|
||||
codeRev: opts.codeRev,
|
||||
createdAt,
|
||||
};
|
||||
}
|
||||
|
||||
export function getEventDef(name: string, codeRev: string): EventDef | null {
|
||||
const db = getSystemDb();
|
||||
const row = selectEventDefByNameCodeRev(db).get(name, codeRev) as any;
|
||||
|
||||
if (!row) return null;
|
||||
|
||||
return {
|
||||
hash: row.hash,
|
||||
name: row.name,
|
||||
parentHash: row.parent_hash || undefined,
|
||||
schema: row.schema ? JSON.parse(row.schema) : undefined,
|
||||
codeRev: row.code_rev,
|
||||
createdAt: row.created_at,
|
||||
};
|
||||
}
|
||||
|
||||
export function listEventDefs(opts: { codeRev: string }): EventDef[] {
|
||||
const db = getSystemDb();
|
||||
const rows = selectEventDefsByCodeRev(db).all(opts.codeRev) as any[];
|
||||
|
||||
return rows.map((row) => ({
|
||||
hash: row.hash,
|
||||
name: row.name,
|
||||
parentHash: row.parent_hash || undefined,
|
||||
schema: row.schema ? JSON.parse(row.schema) : undefined,
|
||||
codeRev: row.code_rev,
|
||||
createdAt: row.created_at,
|
||||
}));
|
||||
}
|
||||
|
||||
// ── Projection Definitions ─────────────────────────────────────
|
||||
|
||||
const insertProjectionDef = (db: Database) =>
|
||||
db.prepare(`
|
||||
INSERT INTO projection_defs (hash, name, parent_hash, params, value_schema, initial_value, code_rev, created_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||
`);
|
||||
|
||||
const insertProjectionDefSource = (db: Database) =>
|
||||
db.prepare(`
|
||||
INSERT INTO projection_def_sources (projection_hash, event_kind, event_key, expression)
|
||||
VALUES (?, ?, ?, ?)
|
||||
`);
|
||||
|
||||
const selectProjectionDefByNameCodeRev = (db: Database) =>
|
||||
db.prepare(`
|
||||
SELECT hash, name, parent_hash, params, value_schema, initial_value, code_rev, created_at
|
||||
FROM projection_defs
|
||||
WHERE name = ? AND code_rev = ?
|
||||
`);
|
||||
|
||||
const selectProjectionDefsByCodeRev = (db: Database) =>
|
||||
db.prepare(`
|
||||
SELECT hash, name, parent_hash, params, value_schema, initial_value, code_rev, created_at
|
||||
FROM projection_defs
|
||||
WHERE code_rev = ?
|
||||
ORDER BY name
|
||||
`);
|
||||
|
||||
const selectProjectionDefSources = (db: Database) =>
|
||||
db.prepare(`
|
||||
SELECT event_kind, event_key, expression
|
||||
FROM projection_def_sources
|
||||
WHERE projection_hash = ?
|
||||
`);
|
||||
|
||||
export async function registerProjectionDef(opts: {
|
||||
name: string;
|
||||
params?: any;
|
||||
valueSchema?: any;
|
||||
initialValue: any;
|
||||
sources: Array<{
|
||||
eventKind: string;
|
||||
eventKey?: string;
|
||||
expression: string;
|
||||
}>;
|
||||
parentHash?: string;
|
||||
codeRev: string;
|
||||
}): Promise<ProjectionDef> {
|
||||
const db = getSystemDb();
|
||||
const hash = calculateProjectionHash(
|
||||
opts.name,
|
||||
opts.params,
|
||||
opts.valueSchema,
|
||||
opts.initialValue,
|
||||
);
|
||||
const createdAt = Date.now();
|
||||
|
||||
// Validate JSONata expressions with dry-run
|
||||
for (const source of opts.sources) {
|
||||
const mockEvent = {
|
||||
kind: source.eventKind,
|
||||
key: source.eventKey || 'test',
|
||||
data: {},
|
||||
};
|
||||
const validation = await validateExpression({
|
||||
expression: source.expression,
|
||||
initialValue: opts.initialValue,
|
||||
mockEvent,
|
||||
});
|
||||
|
||||
if (!validation.valid) {
|
||||
throw new Error(
|
||||
`Invalid JSONata expression for ${source.eventKind}: ${validation.error}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Use transaction for atomic insertion
|
||||
const transaction = db.transaction(() => {
|
||||
try {
|
||||
insertProjectionDef(db).run(
|
||||
hash,
|
||||
opts.name,
|
||||
opts.parentHash || null,
|
||||
opts.params ? JSON.stringify(opts.params) : null,
|
||||
opts.valueSchema ? JSON.stringify(opts.valueSchema) : null,
|
||||
JSON.stringify(opts.initialValue),
|
||||
opts.codeRev,
|
||||
createdAt,
|
||||
);
|
||||
|
||||
// Insert sources
|
||||
for (const source of opts.sources) {
|
||||
insertProjectionDefSource(db).run(
|
||||
hash,
|
||||
source.eventKind,
|
||||
source.eventKey || null,
|
||||
source.expression,
|
||||
);
|
||||
}
|
||||
} catch (error: any) {
|
||||
if (error.message.includes('UNIQUE constraint failed')) {
|
||||
throw new Error(
|
||||
`Projection definition already exists: ${opts.name}@${opts.codeRev}`,
|
||||
);
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
});
|
||||
|
||||
transaction();
|
||||
|
||||
return {
|
||||
hash,
|
||||
name: opts.name,
|
||||
parentHash: opts.parentHash,
|
||||
params: opts.params,
|
||||
valueSchema: opts.valueSchema,
|
||||
initialValue: opts.initialValue,
|
||||
sources: opts.sources,
|
||||
codeRev: opts.codeRev,
|
||||
createdAt,
|
||||
};
|
||||
}
|
||||
|
||||
export function getProjectionDef(
|
||||
name: string,
|
||||
codeRev: string,
|
||||
): ProjectionDef | null {
|
||||
const db = getSystemDb();
|
||||
const row = selectProjectionDefByNameCodeRev(db).get(name, codeRev) as any;
|
||||
|
||||
if (!row) return null;
|
||||
|
||||
// Get sources
|
||||
const sources = selectProjectionDefSources(db).all(row.hash) as any[];
|
||||
|
||||
return {
|
||||
hash: row.hash,
|
||||
name: row.name,
|
||||
parentHash: row.parent_hash || undefined,
|
||||
params: row.params ? JSON.parse(row.params) : undefined,
|
||||
valueSchema: row.value_schema ? JSON.parse(row.value_schema) : undefined,
|
||||
initialValue: JSON.parse(row.initial_value),
|
||||
sources: sources.map((s) => ({
|
||||
eventKind: s.event_kind,
|
||||
eventKey: s.event_key || undefined,
|
||||
expression: s.expression,
|
||||
})),
|
||||
codeRev: row.code_rev,
|
||||
createdAt: row.created_at,
|
||||
};
|
||||
}
|
||||
|
||||
export function listProjectionDefs(opts: { codeRev: string }): ProjectionDef[] {
|
||||
const db = getSystemDb();
|
||||
const rows = selectProjectionDefsByCodeRev(db).all(opts.codeRev) as any[];
|
||||
|
||||
return rows.map((row) => {
|
||||
const sources = selectProjectionDefSources(db).all(row.hash) as any[];
|
||||
|
||||
return {
|
||||
hash: row.hash,
|
||||
name: row.name,
|
||||
parentHash: row.parent_hash || undefined,
|
||||
params: row.params ? JSON.parse(row.params) : undefined,
|
||||
valueSchema: row.value_schema ? JSON.parse(row.value_schema) : undefined,
|
||||
initialValue: JSON.parse(row.initial_value),
|
||||
sources: sources.map((s) => ({
|
||||
eventKind: s.event_kind,
|
||||
eventKey: s.event_key || undefined,
|
||||
expression: s.expression,
|
||||
})),
|
||||
codeRev: row.code_rev,
|
||||
createdAt: row.created_at,
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
// ── JSONata Expression Validation ─────────────────────────────
|
||||
|
||||
export async function validateExpression(opts: {
|
||||
expression: string;
|
||||
initialValue: any;
|
||||
mockEvent: any;
|
||||
}): Promise<ValidationResult> {
|
||||
try {
|
||||
const expr = jsonata(opts.expression);
|
||||
|
||||
// Test context: { state: initialValue, event: mockEvent, params: {} }
|
||||
const context = {
|
||||
state: opts.initialValue,
|
||||
event: opts.mockEvent,
|
||||
params: {},
|
||||
};
|
||||
|
||||
const result = await expr.evaluate(context);
|
||||
|
||||
return {
|
||||
valid: true,
|
||||
result,
|
||||
};
|
||||
} catch (error: any) {
|
||||
return {
|
||||
valid: false,
|
||||
error: error.message,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// ── Cleanup ────────────────────────────────────────────────────
|
||||
|
||||
export function closeDefs(): void {
|
||||
if (_systemDb) {
|
||||
_systemDb.close();
|
||||
_systemDb = null;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user