Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 17d382313f | |||
| f3857888da |
@@ -1,251 +0,0 @@
|
||||
import { afterEach, expect, test } from 'bun:test';
|
||||
import { mkdtempSync, rmSync } from 'node:fs';
|
||||
import { tmpdir } from 'node:os';
|
||||
import { join } from 'node:path';
|
||||
import {
|
||||
clearGuardExpressionCache,
|
||||
getGuardState,
|
||||
GuardViolationError,
|
||||
matchEventKindPattern,
|
||||
registerGuard,
|
||||
} from './guard-projection.js';
|
||||
import { createScopedStore } from './store.js';
|
||||
|
||||
function mkScoped() {
|
||||
const root = mkdtempSync(join(tmpdir(), 'pulse-guard-'));
|
||||
const scoped = createScopedStore({
|
||||
basePath: join(root, 'scopes'),
|
||||
objectsDir: join(root, 'objects'),
|
||||
});
|
||||
return { root, scoped, db: scoped.scopeDatabase('g'), store: scoped.scope('g') };
|
||||
}
|
||||
|
||||
let last: ReturnType<typeof mkScoped> | null = null;
|
||||
|
||||
afterEach(() => {
|
||||
clearGuardExpressionCache();
|
||||
if (last) {
|
||||
try {
|
||||
void last.scoped.close();
|
||||
} catch {
|
||||
/* ignore */
|
||||
}
|
||||
rmSync(last.root, { recursive: true, force: true });
|
||||
last = null;
|
||||
}
|
||||
});
|
||||
|
||||
test('register guard + legal event passes', async () => {
|
||||
last = mkScoped();
|
||||
const { db, store } = last;
|
||||
|
||||
registerGuard(db, {
|
||||
name: 'task-lifecycle',
|
||||
initial_value: { phase: 'idle' },
|
||||
sources: [
|
||||
{
|
||||
kind: 'task.created',
|
||||
check: '(event.kind != "task.created") or (state.phase = "idle")',
|
||||
transition: '{ "phase": "open" }',
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
const r = await store.appendEvent({
|
||||
kind: 'task.created',
|
||||
key: 't1',
|
||||
occurredAt: 100,
|
||||
});
|
||||
expect(r.id).toBeGreaterThan(0);
|
||||
expect(getGuardState(db, 'task-lifecycle', 't1')).toEqual({ phase: 'open' });
|
||||
});
|
||||
|
||||
test('illegal event rejected when task already open', async () => {
|
||||
last = mkScoped();
|
||||
const { db, store } = last;
|
||||
|
||||
registerGuard(db, {
|
||||
name: 'task-lifecycle',
|
||||
initial_value: { phase: 'idle' },
|
||||
sources: [
|
||||
{
|
||||
kind: 'task.created',
|
||||
check: '(event.kind != "task.created") or (state.phase = "idle")',
|
||||
transition: '{ "phase": "open" }',
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
await store.appendEvent({
|
||||
kind: 'task.created',
|
||||
key: 't1',
|
||||
occurredAt: 1,
|
||||
});
|
||||
|
||||
expect(async () => {
|
||||
await store.appendEvent({
|
||||
kind: 'task.created',
|
||||
key: 't1',
|
||||
occurredAt: 2,
|
||||
});
|
||||
}).toThrow(GuardViolationError);
|
||||
});
|
||||
|
||||
test('state transitions created → claimed → completed', async () => {
|
||||
last = mkScoped();
|
||||
const { db, store } = last;
|
||||
|
||||
registerGuard(db, {
|
||||
name: 'task-lifecycle',
|
||||
initial_value: { phase: 'idle' },
|
||||
sources: [
|
||||
{
|
||||
kind: 'task.created',
|
||||
check: '(event.kind != "task.created") or (state.phase = "idle")',
|
||||
transition: '{ "phase": "open" }',
|
||||
},
|
||||
{
|
||||
kind: 'task.claimed',
|
||||
check: '(event.kind != "task.claimed") or (state.phase = "open")',
|
||||
transition: '{ "phase": "claimed" }',
|
||||
},
|
||||
{
|
||||
kind: 'task.completed',
|
||||
check: '(event.kind != "task.completed") or (state.phase = "claimed")',
|
||||
transition: '{ "phase": "done" }',
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
await store.appendEvent({
|
||||
kind: 'task.created',
|
||||
key: 'k',
|
||||
occurredAt: 10,
|
||||
});
|
||||
expect(getGuardState(db, 'task-lifecycle', 'k')).toEqual({ phase: 'open' });
|
||||
|
||||
await store.appendEvent({
|
||||
kind: 'task.claimed',
|
||||
key: 'k',
|
||||
occurredAt: 20,
|
||||
});
|
||||
expect(getGuardState(db, 'task-lifecycle', 'k')).toEqual({ phase: 'claimed' });
|
||||
|
||||
await store.appendEvent({
|
||||
kind: 'task.completed',
|
||||
key: 'k',
|
||||
occurredAt: 30,
|
||||
});
|
||||
expect(getGuardState(db, 'task-lifecycle', 'k')).toEqual({ phase: 'done' });
|
||||
});
|
||||
|
||||
test('unrelated event kind passes without guard match', async () => {
|
||||
last = mkScoped();
|
||||
const { db, store } = last;
|
||||
|
||||
registerGuard(db, {
|
||||
name: 'only-task',
|
||||
initial_value: { n: 0 },
|
||||
sources: [
|
||||
{
|
||||
kind: 'task.*',
|
||||
check: 'true',
|
||||
transition: '{ "n": state.n + 1 }',
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
await store.appendEvent({
|
||||
kind: 'other.thing',
|
||||
key: 'x',
|
||||
occurredAt: 1,
|
||||
});
|
||||
|
||||
expect(getGuardState(db, 'only-task', 'x')).toEqual({ n: 0 });
|
||||
});
|
||||
|
||||
test('getGuardState reflects latest state', async () => {
|
||||
last = mkScoped();
|
||||
const { db, store } = last;
|
||||
|
||||
registerGuard(db, {
|
||||
name: 'g',
|
||||
initial_value: { v: 0 },
|
||||
sources: [
|
||||
{
|
||||
kind: 'tick',
|
||||
check: 'true',
|
||||
transition: '{ "v": state.v + 1 }',
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
await store.appendEvent({ kind: 'tick', key: 'k', occurredAt: 1 });
|
||||
expect(getGuardState(db, 'g', 'k')).toEqual({ v: 1 });
|
||||
await store.appendEvent({ kind: 'tick', key: 'k', occurredAt: 2 });
|
||||
expect(getGuardState(db, 'g', 'k')).toEqual({ v: 2 });
|
||||
});
|
||||
|
||||
test('complex JSONata check (and / or / in)', async () => {
|
||||
last = mkScoped();
|
||||
const { db, store } = last;
|
||||
|
||||
registerGuard(db, {
|
||||
name: 'complex',
|
||||
initial_value: { mode: 'a' },
|
||||
sources: [
|
||||
{
|
||||
kind: 'x',
|
||||
check:
|
||||
'(event.kind != "x") or ((state.mode = "a" or state.mode = "b") and event.payload.role in ["u", "v"])',
|
||||
transition: '{ "mode": "b" }',
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
await store.appendEvent({
|
||||
kind: 'x',
|
||||
key: 'k',
|
||||
occurredAt: 1,
|
||||
meta: JSON.stringify({ role: 'u' }),
|
||||
});
|
||||
|
||||
expect(getGuardState(db, 'complex', 'k')).toEqual({ mode: 'b' });
|
||||
});
|
||||
|
||||
test('kind wildcard *.__start__ matches meta.__start__ and coding.__start__', async () => {
|
||||
last = mkScoped();
|
||||
const { db, store } = last;
|
||||
|
||||
expect(matchEventKindPattern('*.__start__', 'meta.__start__')).toBe(true);
|
||||
expect(matchEventKindPattern('*.__start__', 'coding.__start__')).toBe(true);
|
||||
expect(matchEventKindPattern('*.__start__', 'other')).toBe(false);
|
||||
|
||||
registerGuard(db, {
|
||||
name: 'starts',
|
||||
initial_value: { kinds: [] },
|
||||
sources: [
|
||||
{
|
||||
kind: '*.__start__',
|
||||
check: 'true',
|
||||
transition:
|
||||
'{ "kinds": $append(state.kinds ? state.kinds : [], event.kind) }',
|
||||
},
|
||||
],
|
||||
});
|
||||
|
||||
await store.appendEvent({
|
||||
kind: 'meta.__start__',
|
||||
key: 't',
|
||||
occurredAt: 1,
|
||||
});
|
||||
await store.appendEvent({
|
||||
kind: 'coding.__start__',
|
||||
key: 't',
|
||||
occurredAt: 2,
|
||||
});
|
||||
|
||||
expect(getGuardState(db, 'starts', 't')).toEqual({
|
||||
kinds: ['meta.__start__', 'coding.__start__'],
|
||||
});
|
||||
});
|
||||
@@ -1,319 +0,0 @@
|
||||
/**
|
||||
* Guard projections: synchronous fold + check on append (JSONata).
|
||||
*/
|
||||
|
||||
import type { Database } from 'bun:sqlite';
|
||||
import jsonata, { type Expression } from 'jsonata';
|
||||
|
||||
// ── Schema ────────────────────────────────────────────────────
|
||||
|
||||
export const GUARD_SCHEMA = `
|
||||
CREATE TABLE IF NOT EXISTS guard_defs (
|
||||
name TEXT PRIMARY KEY,
|
||||
initial_value TEXT NOT NULL,
|
||||
sources TEXT NOT NULL,
|
||||
created_at INTEGER NOT NULL
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS guard_states (
|
||||
guard_name TEXT NOT NULL,
|
||||
key TEXT NOT NULL DEFAULT '',
|
||||
value TEXT NOT NULL,
|
||||
last_event_id INTEGER NOT NULL DEFAULT 0,
|
||||
updated_at INTEGER NOT NULL,
|
||||
PRIMARY KEY (guard_name, key)
|
||||
);
|
||||
`;
|
||||
|
||||
export function initGuardSchema(db: Database): void {
|
||||
db.exec(GUARD_SCHEMA);
|
||||
}
|
||||
|
||||
// ── Types ─────────────────────────────────────────────────────
|
||||
|
||||
export interface GuardSource {
|
||||
kind: string;
|
||||
key_prefix?: string;
|
||||
check: string;
|
||||
transition: string;
|
||||
}
|
||||
|
||||
export interface GuardProjectionDef {
|
||||
name: string;
|
||||
initial_value: any;
|
||||
sources: GuardSource[];
|
||||
}
|
||||
|
||||
export class GuardViolationError extends Error {
|
||||
constructor(
|
||||
public guardName: string,
|
||||
public eventKind: string,
|
||||
public eventKey: string | undefined,
|
||||
public reason: string,
|
||||
) {
|
||||
super(`Guard "${guardName}" rejected event ${eventKind}: ${reason}`);
|
||||
this.name = 'GuardViolationError';
|
||||
}
|
||||
}
|
||||
|
||||
export interface GuardUpdate {
|
||||
guardName: string;
|
||||
key: string;
|
||||
newState: any;
|
||||
lastEventId: number;
|
||||
}
|
||||
|
||||
// ── Expression cache ───────────────────────────────────────────
|
||||
|
||||
const exprCache = new Map<string, Expression>();
|
||||
|
||||
function expr(expressionStr: string): Expression {
|
||||
let e = exprCache.get(expressionStr);
|
||||
if (!e) {
|
||||
e = jsonata(expressionStr);
|
||||
exprCache.set(expressionStr, e);
|
||||
}
|
||||
return e;
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
export function clearGuardExpressionCache(): void {
|
||||
exprCache.clear();
|
||||
guardDefMemory.clear();
|
||||
}
|
||||
|
||||
// ── Kind pattern (* = one dot segment) ─────────────────────────
|
||||
|
||||
export function matchEventKindPattern(pattern: string, kind: string): boolean {
|
||||
const ps = pattern.split('.');
|
||||
const ks = kind.split('.');
|
||||
if (ps.length !== ks.length) return false;
|
||||
for (let i = 0; i < ps.length; i++) {
|
||||
if (ps[i] === '*') continue;
|
||||
if (ps[i] !== ks[i]) return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// ── DB helpers ─────────────────────────────────────────────────
|
||||
|
||||
const insertGuardDefStmt = (db: Database) =>
|
||||
db.prepare(`
|
||||
INSERT OR REPLACE INTO guard_defs (name, initial_value, sources, created_at)
|
||||
VALUES (?, ?, ?, ?)
|
||||
`);
|
||||
|
||||
const selectGuardDefsStmt = (db: Database) =>
|
||||
db.prepare(`SELECT name, initial_value, sources FROM guard_defs`);
|
||||
|
||||
const selectGuardStateStmt = (db: Database) =>
|
||||
db.prepare(`
|
||||
SELECT value, last_event_id FROM guard_states
|
||||
WHERE guard_name = ? AND key = ?
|
||||
`);
|
||||
|
||||
const upsertGuardStateStmt = (db: Database) =>
|
||||
db.prepare(`
|
||||
INSERT INTO guard_states (guard_name, key, value, last_event_id, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
ON CONFLICT(guard_name, key) DO UPDATE SET
|
||||
value = excluded.value,
|
||||
last_event_id = excluded.last_event_id,
|
||||
updated_at = excluded.updated_at
|
||||
`);
|
||||
|
||||
function listGuardDefs(db: Database): GuardProjectionDef[] {
|
||||
const rows = selectGuardDefsStmt(db).all() as {
|
||||
name: string;
|
||||
initial_value: string;
|
||||
sources: string;
|
||||
}[];
|
||||
const byName = new Map<string, GuardProjectionDef>();
|
||||
for (const r of rows) {
|
||||
byName.set(r.name, {
|
||||
name: r.name,
|
||||
initial_value: JSON.parse(r.initial_value) as unknown,
|
||||
sources: JSON.parse(r.sources) as GuardSource[],
|
||||
});
|
||||
}
|
||||
for (const [name, def] of guardDefMemory) {
|
||||
byName.set(name, def);
|
||||
}
|
||||
return [...byName.values()];
|
||||
}
|
||||
|
||||
function loadGuardRow(
|
||||
db: Database,
|
||||
guardName: string,
|
||||
key: string,
|
||||
): { value: any; lastEventId: number } | null {
|
||||
const row = selectGuardStateStmt(db).get(guardName, key) as {
|
||||
value: string;
|
||||
last_event_id: number;
|
||||
} | null;
|
||||
if (!row) return null;
|
||||
return {
|
||||
value: JSON.parse(row.value) as unknown,
|
||||
lastEventId: Number(row.last_event_id),
|
||||
};
|
||||
}
|
||||
|
||||
function eventBindings(
|
||||
event: {
|
||||
kind: string;
|
||||
key?: string;
|
||||
meta?: string;
|
||||
occurredAt: number;
|
||||
},
|
||||
eventId: number,
|
||||
) {
|
||||
return {
|
||||
id: eventId,
|
||||
occurred_at: event.occurredAt,
|
||||
kind: event.kind,
|
||||
key: event.key,
|
||||
payload: event.meta ? JSON.parse(event.meta) : {},
|
||||
};
|
||||
}
|
||||
|
||||
async function evalBool(
|
||||
label: string,
|
||||
expressionStr: string,
|
||||
context: Record<string, unknown>,
|
||||
): Promise<boolean> {
|
||||
const e = expr(expressionStr);
|
||||
const out = await e.evaluate(context, {});
|
||||
if (typeof out === 'boolean') return out;
|
||||
if (out === null || out === undefined) return false;
|
||||
throw new Error(`${label} must return boolean, got ${typeof out}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Register guard (persist + memory cache of compiled defs for this process).
|
||||
*/
|
||||
const guardDefMemory = new Map<string, GuardProjectionDef>();
|
||||
|
||||
export function registerGuard(db: Database, def: GuardProjectionDef): void {
|
||||
const now = Date.now();
|
||||
insertGuardDefStmt(db).run(
|
||||
def.name,
|
||||
JSON.stringify(def.initial_value),
|
||||
JSON.stringify(def.sources),
|
||||
now,
|
||||
);
|
||||
guardDefMemory.set(def.name, def);
|
||||
}
|
||||
|
||||
/**
|
||||
* Read guard's current state (same idea as lazy projection read).
|
||||
*/
|
||||
export function getGuardState(db: Database, guardName: string, key: string): any {
|
||||
const row = loadGuardRow(db, guardName, key);
|
||||
if (!row) {
|
||||
const defs = listGuardDefs(db);
|
||||
const def = defs.find((d) => d.name === guardName);
|
||||
return def ? def.initial_value : undefined;
|
||||
}
|
||||
return row.value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Before append: match guards, fold, check. Returns pending state updates
|
||||
* (lastEventId is a placeholder; caller sets to written event id).
|
||||
*/
|
||||
export async function checkGuards(
|
||||
db: Database,
|
||||
event: { kind: string; key?: string; meta?: string; occurredAt: number },
|
||||
): Promise<{ updates: GuardUpdate[] }> {
|
||||
const defs = listGuardDefs(db);
|
||||
const updates: GuardUpdate[] = [];
|
||||
const pendingEventId = 0;
|
||||
|
||||
for (const def of defs) {
|
||||
const stateKey = event.key ?? '';
|
||||
|
||||
const row = loadGuardRow(db, def.name, stateKey);
|
||||
const baseState = row ? row.value : def.initial_value;
|
||||
|
||||
let working = baseState;
|
||||
let matchedAny = false;
|
||||
|
||||
for (const source of def.sources) {
|
||||
if (!matchEventKindPattern(source.kind, event.kind)) continue;
|
||||
|
||||
if (source.key_prefix !== undefined) {
|
||||
const k = event.key;
|
||||
if (k === undefined || !k.startsWith(source.key_prefix)) continue;
|
||||
}
|
||||
|
||||
matchedAny = true;
|
||||
|
||||
const ev = eventBindings(event, pendingEventId);
|
||||
|
||||
const ctx = { state: working, event: ev };
|
||||
|
||||
let newState: any;
|
||||
try {
|
||||
newState = await expr(source.transition).evaluate(ctx, {});
|
||||
} catch (err: any) {
|
||||
throw new GuardViolationError(
|
||||
def.name,
|
||||
event.kind,
|
||||
event.key,
|
||||
`transition failed: ${err?.message ?? String(err)}`,
|
||||
);
|
||||
}
|
||||
|
||||
let ok: boolean;
|
||||
try {
|
||||
ok = await evalBool('check', source.check, ctx);
|
||||
} catch (err: any) {
|
||||
throw new GuardViolationError(
|
||||
def.name,
|
||||
event.kind,
|
||||
event.key,
|
||||
`check failed: ${err?.message ?? String(err)}`,
|
||||
);
|
||||
}
|
||||
|
||||
if (!ok) {
|
||||
throw new GuardViolationError(
|
||||
def.name,
|
||||
event.kind,
|
||||
event.key,
|
||||
'check returned false',
|
||||
);
|
||||
}
|
||||
|
||||
working = newState;
|
||||
}
|
||||
|
||||
if (matchedAny) {
|
||||
updates.push({
|
||||
guardName: def.name,
|
||||
key: stateKey,
|
||||
newState: working,
|
||||
lastEventId: 0,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return { updates };
|
||||
}
|
||||
|
||||
/**
|
||||
* After event insert: persist guard states (same transaction as the event).
|
||||
*/
|
||||
export function applyGuardUpdates(db: Database, updates: GuardUpdate[]): void {
|
||||
const now = Date.now();
|
||||
const stmt = upsertGuardStateStmt(db);
|
||||
for (const u of updates) {
|
||||
stmt.run(
|
||||
u.guardName,
|
||||
u.key,
|
||||
JSON.stringify(u.newState),
|
||||
u.lastEventId,
|
||||
now,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -969,10 +969,6 @@ export type {
|
||||
WorkflowTickResult,
|
||||
} from './workflows/workflow-rule-adapter.js';
|
||||
export { createWorkflowRule } from './workflows/workflow-rule-adapter.js';
|
||||
export {
|
||||
type SubprocessRoleConfig,
|
||||
createSubprocessRole,
|
||||
} from './workflows/subprocess-role.js';
|
||||
export {
|
||||
END,
|
||||
type MetaOf,
|
||||
@@ -1018,15 +1014,6 @@ export {
|
||||
// ── Projection Engine ───────────────────────────────────────────
|
||||
export * from './projection-engine.js';
|
||||
|
||||
// ── Guard projections ───────────────────────────────────────────
|
||||
export {
|
||||
type GuardProjectionDef,
|
||||
type GuardSource,
|
||||
GuardViolationError,
|
||||
getGuardState,
|
||||
registerGuard,
|
||||
} from './guard-projection.js';
|
||||
|
||||
// ── Task Event Types ────────────────────────────────────────────
|
||||
export type {
|
||||
ActiveProjectsData,
|
||||
|
||||
+44
-60
@@ -16,11 +16,6 @@ import {
|
||||
} from 'node:fs';
|
||||
import { dirname, join } from 'node:path';
|
||||
import { initDefsSchema } from './defs.js';
|
||||
import {
|
||||
applyGuardUpdates,
|
||||
checkGuards,
|
||||
initGuardSchema,
|
||||
} from './guard-projection.js';
|
||||
import { PROJECTIONS_SCHEMA } from './projection-engine.js';
|
||||
|
||||
// ── Types ──────────────────────────────────────────────────────
|
||||
@@ -217,55 +212,6 @@ function rowToObjectInstance(row: RawObjectRow): ObjectInstance {
|
||||
};
|
||||
}
|
||||
|
||||
async function appendOneWithGuards(
|
||||
db: Database,
|
||||
insert: (event: Omit<EventRecord, 'id'>) => EventRecord,
|
||||
event: Omit<EventRecord, 'id'>,
|
||||
): Promise<EventRecord> {
|
||||
const nested = db.inTransaction;
|
||||
if (!nested) db.exec('BEGIN IMMEDIATE');
|
||||
try {
|
||||
const { updates } = await checkGuards(db, event);
|
||||
const written = insert(event);
|
||||
applyGuardUpdates(
|
||||
db,
|
||||
updates.map((u) => ({ ...u, lastEventId: written.id })),
|
||||
);
|
||||
if (!nested) db.exec('COMMIT');
|
||||
return written;
|
||||
} catch (e) {
|
||||
if (!nested) db.exec('ROLLBACK');
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
async function appendManyWithGuards(
|
||||
db: Database,
|
||||
insert: (event: Omit<EventRecord, 'id'>) => EventRecord,
|
||||
events: Omit<EventRecord, 'id'>[],
|
||||
): Promise<EventRecord[]> {
|
||||
if (events.length === 0) return [];
|
||||
const nested = db.inTransaction;
|
||||
if (!nested) db.exec('BEGIN IMMEDIATE');
|
||||
try {
|
||||
const results: EventRecord[] = [];
|
||||
for (const event of events) {
|
||||
const { updates } = await checkGuards(db, event);
|
||||
const written = insert(event);
|
||||
applyGuardUpdates(
|
||||
db,
|
||||
updates.map((u) => ({ ...u, lastEventId: written.id })),
|
||||
);
|
||||
results.push(written);
|
||||
}
|
||||
if (!nested) db.exec('COMMIT');
|
||||
return results;
|
||||
} catch (e) {
|
||||
if (!nested) db.exec('ROLLBACK');
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
// ── Factory ────────────────────────────────────────────────────
|
||||
|
||||
export interface CreateStoreOptions {
|
||||
@@ -286,7 +232,6 @@ export function createStore(options: CreateStoreOptions): PulseStore {
|
||||
eventsDb.exec(EVENTS_SCHEMA);
|
||||
eventsDb.exec(OBJECTS_SCHEMA);
|
||||
migrateEventsObjectId(eventsDb);
|
||||
initGuardSchema(eventsDb);
|
||||
|
||||
const insertEvent = eventsDb.prepare(`
|
||||
INSERT INTO events (occurred_at, kind, key, hash, code_rev, meta, object_id)
|
||||
@@ -318,15 +263,35 @@ export function createStore(options: CreateStoreOptions): PulseStore {
|
||||
return { id, ...event };
|
||||
}
|
||||
|
||||
const appendManyTx = eventsDb.transaction(
|
||||
(events: Omit<EventRecord, 'id'>[]): EventRecord[] => {
|
||||
const results: EventRecord[] = [];
|
||||
for (const event of events) {
|
||||
const result = insertEvent.run(
|
||||
event.occurredAt,
|
||||
event.kind,
|
||||
event.key ?? null,
|
||||
event.hash ?? null,
|
||||
event.codeRev ?? null,
|
||||
event.meta ?? null,
|
||||
event.objectId ?? null,
|
||||
);
|
||||
const id = Number(result.lastInsertRowid);
|
||||
results.push({ id, ...event });
|
||||
}
|
||||
return results;
|
||||
},
|
||||
);
|
||||
|
||||
return {
|
||||
async appendEvent(event: Omit<EventRecord, 'id'>): Promise<EventRecord> {
|
||||
return appendOneWithGuards(eventsDb, doAppendEvent, event);
|
||||
return doAppendEvent(event);
|
||||
},
|
||||
|
||||
async appendEvents(
|
||||
events: Omit<EventRecord, 'id'>[],
|
||||
): Promise<EventRecord[]> {
|
||||
return appendManyWithGuards(eventsDb, doAppendEvent, events);
|
||||
return appendManyTx(events);
|
||||
},
|
||||
|
||||
async getLatest(kind: string, key?: string): Promise<EventRecord | null> {
|
||||
@@ -573,7 +538,6 @@ function openScopeDb(path: string): Database {
|
||||
|
||||
// Each scope carries its own def tables (bun:sqlite is sync under async wrapper)
|
||||
void initDefsSchema(db);
|
||||
initGuardSchema(db);
|
||||
|
||||
return db;
|
||||
}
|
||||
@@ -609,13 +573,33 @@ function createScopeStore(db: Database, objectsDir: string): PulseStore {
|
||||
return { id, ...event };
|
||||
}
|
||||
|
||||
const appendManyTx = db.transaction(
|
||||
(events: Omit<EventRecord, 'id'>[]): EventRecord[] => {
|
||||
const results: EventRecord[] = [];
|
||||
for (const event of events) {
|
||||
const result = insertEvent.run(
|
||||
event.occurredAt,
|
||||
event.kind,
|
||||
event.key ?? null,
|
||||
event.hash ?? null,
|
||||
event.codeRev ?? null,
|
||||
event.meta ?? null,
|
||||
event.objectId ?? null,
|
||||
);
|
||||
const id = Number(result.lastInsertRowid);
|
||||
results.push({ id, ...event });
|
||||
}
|
||||
return results;
|
||||
},
|
||||
);
|
||||
|
||||
return {
|
||||
async appendEvent(event) {
|
||||
return appendOneWithGuards(db, doAppendEvent, event);
|
||||
return doAppendEvent(event);
|
||||
},
|
||||
|
||||
async appendEvents(events) {
|
||||
return appendManyWithGuards(db, doAppendEvent, events);
|
||||
return appendManyTx(events);
|
||||
},
|
||||
|
||||
async getLatest(kind, key?) {
|
||||
|
||||
@@ -1,4 +0,0 @@
|
||||
/** Crash role — exits process immediately */
|
||||
export async function crashRole() {
|
||||
process.exit(1);
|
||||
}
|
||||
@@ -1,5 +0,0 @@
|
||||
/** Echo role — returns first message content or 'empty' */
|
||||
export async function echoRole(chain: any[], topicId: string) {
|
||||
const content = chain.length > 0 ? chain[chain.length - 1].content : 'empty';
|
||||
return { content: `echo:${content}:${topicId}`, meta: { echoed: true } };
|
||||
}
|
||||
@@ -1,4 +0,0 @@
|
||||
/** Error role — always throws */
|
||||
export async function errorRole() {
|
||||
throw new Error('deliberate test error');
|
||||
}
|
||||
@@ -1,6 +0,0 @@
|
||||
/** Hang role — infinite loop to test timeout */
|
||||
export async function hangRole() {
|
||||
while (true) {
|
||||
await new Promise((r) => setTimeout(r, 100));
|
||||
}
|
||||
}
|
||||
@@ -24,10 +24,6 @@ export { createLlmRole, createToolRole } from './roles/llm-role-factory.js';
|
||||
export { createMetaCoderRole } from './roles/meta-coder-cursor.js';
|
||||
export { createMetaTesterRole } from './roles/meta-tester.js';
|
||||
export { createMetaCheckerRole } from './roles/meta-checker.js';
|
||||
export {
|
||||
type SubprocessRoleConfig,
|
||||
createSubprocessRole,
|
||||
} from './subprocess-role.js';
|
||||
export { type ScaffoldOptions, scaffoldWorkflow } from './scaffold.js';
|
||||
export {
|
||||
createWorkflowRule,
|
||||
|
||||
@@ -57,9 +57,12 @@ export function createMetaCheckerRole(opts: {
|
||||
}
|
||||
|
||||
if (changedFiles.length > 0) {
|
||||
// Allowed: only src/workflows/ (meta workflow scope)
|
||||
// Allowed: src/workflows/*, docs/*, tests under src/
|
||||
const allowedPrefixes = [
|
||||
'src/workflows/',
|
||||
'src/',
|
||||
'docs/',
|
||||
'test/',
|
||||
'tests/',
|
||||
...(opts.allowedPrefixes ?? []),
|
||||
];
|
||||
|
||||
@@ -81,7 +84,7 @@ export function createMetaCheckerRole(opts: {
|
||||
];
|
||||
for (const file of changedFiles) {
|
||||
const basename = file.split('/').pop() ?? file;
|
||||
if (blacklist.includes(basename)) {
|
||||
if (blacklist.includes(basename) && !file.startsWith('src/workflows/')) {
|
||||
violations.push(`禁止修改: ${file}`);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -64,7 +64,6 @@ ${testerFeedback}
|
||||
|
||||
## 约束
|
||||
- commit author: 小橘 <xiaoju@shazhou.work>
|
||||
- 只修改 $HOME/.upulse/engine/src/workflows/ 下的代码
|
||||
- 不修改 workflow-rule-adapter.ts 和 workflow-type.ts`;
|
||||
|
||||
return { prompt, cwd: repoDir };
|
||||
|
||||
@@ -1,59 +0,0 @@
|
||||
/**
|
||||
* Tests for subprocess role execution isolation.
|
||||
* 小橘 🍊 (NEKO Team)
|
||||
*/
|
||||
|
||||
import { describe, expect, test } from 'bun:test';
|
||||
import { join } from 'node:path';
|
||||
import { createSubprocessRole } from './subprocess-role.js';
|
||||
|
||||
const FIXTURES = join(import.meta.dir, '__fixtures__');
|
||||
|
||||
describe('createSubprocessRole', () => {
|
||||
test('executes echo role successfully', async () => {
|
||||
const role = createSubprocessRole({
|
||||
rolePath: join(FIXTURES, 'echo-role.ts'),
|
||||
roleExport: 'echoRole',
|
||||
});
|
||||
|
||||
const chain = [
|
||||
{ role: '__start__', content: 'hello', meta: null, timestamp: Date.now() },
|
||||
];
|
||||
const result = await role(chain, 'topic-1', null as any);
|
||||
|
||||
expect(result.content).toBe('echo:hello:topic-1');
|
||||
expect(result.meta).toEqual({ echoed: true });
|
||||
});
|
||||
|
||||
test('propagates role errors', async () => {
|
||||
const role = createSubprocessRole({
|
||||
rolePath: join(FIXTURES, 'error-role.ts'),
|
||||
roleExport: 'errorRole',
|
||||
});
|
||||
|
||||
await expect(role([], 'topic-err', null as any)).rejects.toThrow(
|
||||
'deliberate test error',
|
||||
);
|
||||
});
|
||||
|
||||
test('kills on timeout', async () => {
|
||||
const role = createSubprocessRole({
|
||||
rolePath: join(FIXTURES, 'hang-role.ts'),
|
||||
roleExport: 'hangRole',
|
||||
timeoutMs: 1_000,
|
||||
});
|
||||
|
||||
await expect(role([], 'topic-hang', null as any)).rejects.toThrow(
|
||||
/timed out/,
|
||||
);
|
||||
}, 10_000);
|
||||
|
||||
test('handles subprocess crash', async () => {
|
||||
const role = createSubprocessRole({
|
||||
rolePath: join(FIXTURES, 'crash-role.ts'),
|
||||
roleExport: 'crashRole',
|
||||
});
|
||||
|
||||
await expect(role([], 'topic-crash', null as any)).rejects.toThrow();
|
||||
});
|
||||
});
|
||||
@@ -1,114 +0,0 @@
|
||||
/**
|
||||
* Subprocess role wrapper — runs a Role in an isolated child process.
|
||||
*
|
||||
* Usage:
|
||||
* const role = createSubprocessRole({
|
||||
* rolePath: '/abs/path/to/my-role.ts',
|
||||
* roleExport: 'myRole',
|
||||
* timeoutMs: 60_000,
|
||||
* });
|
||||
* // role has the same signature as Role<unknown>
|
||||
*
|
||||
* 小橘 🍊 (NEKO Team)
|
||||
*/
|
||||
|
||||
import { join } from 'node:path';
|
||||
import type { Role, RoleResult, WorkflowMessage } from './workflow-type.js';
|
||||
|
||||
export interface SubprocessRoleConfig {
|
||||
/** Absolute path to the module containing the role function */
|
||||
rolePath: string;
|
||||
/** Export name of the role function (e.g. 'myRole') */
|
||||
roleExport: string;
|
||||
/** Timeout in ms, default 300_000 (5 min) */
|
||||
timeoutMs?: number;
|
||||
/** Store config passed to subprocess if role needs a store */
|
||||
storeConfig?: { eventsDbPath: string; objectsDir: string };
|
||||
}
|
||||
|
||||
const RUNNER_PATH = join(import.meta.dir, 'subprocess-runner.ts');
|
||||
|
||||
/**
|
||||
* Wrap a Role as a subprocess execution.
|
||||
* Returns a function matching the Role<unknown> signature.
|
||||
*/
|
||||
export function createSubprocessRole(config: SubprocessRoleConfig): Role<unknown> {
|
||||
const {
|
||||
rolePath,
|
||||
roleExport,
|
||||
timeoutMs = 300_000,
|
||||
storeConfig,
|
||||
} = config;
|
||||
|
||||
return async (
|
||||
chain: WorkflowMessage[],
|
||||
topicId: string,
|
||||
): Promise<RoleResult<unknown>> => {
|
||||
const input = JSON.stringify({
|
||||
rolePath,
|
||||
roleExport,
|
||||
chain,
|
||||
topicId,
|
||||
storeConfig,
|
||||
});
|
||||
|
||||
const proc = Bun.spawn(['bun', 'run', RUNNER_PATH], {
|
||||
stdin: 'pipe',
|
||||
stdout: 'pipe',
|
||||
stderr: 'pipe',
|
||||
});
|
||||
|
||||
// Write input and close stdin
|
||||
proc.stdin.write(input);
|
||||
proc.stdin.end();
|
||||
|
||||
// Timeout handling
|
||||
let killed = false;
|
||||
const timer = setTimeout(() => {
|
||||
killed = true;
|
||||
proc.kill();
|
||||
}, timeoutMs);
|
||||
|
||||
try {
|
||||
// Wait for exit
|
||||
const exitCode = await proc.exited;
|
||||
clearTimeout(timer);
|
||||
|
||||
if (killed) {
|
||||
throw new Error(
|
||||
`Subprocess role '${roleExport}' timed out after ${timeoutMs}ms`,
|
||||
);
|
||||
}
|
||||
|
||||
// Read stdout
|
||||
const stdout = await new Response(proc.stdout).text();
|
||||
|
||||
if (exitCode !== 0 && !stdout.trim()) {
|
||||
const stderr = await new Response(proc.stderr).text();
|
||||
throw new Error(
|
||||
`Subprocess role '${roleExport}' exited with code ${exitCode}: ${stderr.slice(0, 500)}`,
|
||||
);
|
||||
}
|
||||
|
||||
let result: any;
|
||||
try {
|
||||
result = JSON.parse(stdout);
|
||||
} catch {
|
||||
throw new Error(
|
||||
`Subprocess role '${roleExport}' returned invalid JSON: ${stdout.slice(0, 500)}`,
|
||||
);
|
||||
}
|
||||
|
||||
if (!result.ok) {
|
||||
throw new Error(
|
||||
`Subprocess role '${roleExport}' failed: ${result.error}`,
|
||||
);
|
||||
}
|
||||
|
||||
return { content: result.content, meta: result.meta ?? null };
|
||||
} catch (err) {
|
||||
clearTimeout(timer);
|
||||
throw err;
|
||||
}
|
||||
};
|
||||
}
|
||||
@@ -1,82 +0,0 @@
|
||||
/**
|
||||
* Subprocess runner — child process entry point for isolated role execution.
|
||||
*
|
||||
* Reads JSON from stdin, dynamically imports the role module, executes it,
|
||||
* and writes JSON result to stdout.
|
||||
*
|
||||
* 小橘 🍊 (NEKO Team)
|
||||
*/
|
||||
|
||||
import { createStore } from '../store.js';
|
||||
|
||||
interface RunnerInput {
|
||||
rolePath: string;
|
||||
roleExport: string;
|
||||
chain: Array<{
|
||||
role: string;
|
||||
content: string;
|
||||
meta: Record<string, unknown> | null;
|
||||
timestamp: number;
|
||||
}>;
|
||||
topicId: string;
|
||||
storeConfig?: { eventsDbPath: string; objectsDir: string };
|
||||
}
|
||||
|
||||
async function main() {
|
||||
// Read all stdin
|
||||
const chunks: Buffer[] = [];
|
||||
for await (const chunk of process.stdin) {
|
||||
chunks.push(chunk as Buffer);
|
||||
}
|
||||
const raw = Buffer.concat(chunks).toString('utf-8');
|
||||
|
||||
let input: RunnerInput;
|
||||
try {
|
||||
input = JSON.parse(raw);
|
||||
} catch (e) {
|
||||
process.stdout.write(
|
||||
JSON.stringify({ ok: false, error: `Invalid JSON input: ${e}` }),
|
||||
);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
try {
|
||||
// Dynamic import of role module
|
||||
const mod = await import(input.rolePath);
|
||||
const roleFn = mod[input.roleExport];
|
||||
if (typeof roleFn !== 'function') {
|
||||
throw new Error(
|
||||
`Export '${input.roleExport}' from '${input.rolePath}' is not a function (got ${typeof roleFn})`,
|
||||
);
|
||||
}
|
||||
|
||||
// Create store if config provided
|
||||
let store: ReturnType<typeof createStore> | undefined;
|
||||
if (input.storeConfig) {
|
||||
store = createStore({
|
||||
eventsDbPath: input.storeConfig.eventsDbPath,
|
||||
objectsDir: input.storeConfig.objectsDir,
|
||||
});
|
||||
}
|
||||
|
||||
const result = await roleFn(input.chain, input.topicId, store);
|
||||
|
||||
process.stdout.write(
|
||||
JSON.stringify({
|
||||
ok: true,
|
||||
content: result.content,
|
||||
meta: result.meta ?? null,
|
||||
}),
|
||||
);
|
||||
} catch (err) {
|
||||
process.stdout.write(
|
||||
JSON.stringify({
|
||||
ok: false,
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
}),
|
||||
);
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
main();
|
||||
Reference in New Issue
Block a user