2 Commits

Author SHA1 Message Date
tuanzi 17d382313f fix: clean up indentation, remove dead closer case, remove blank lines
CI / test (pull_request) Has been cancelled
Addresses review feedback from 小墨:
1. Fix indentation in coding.js and coding-tdd.js factory roles
2. Remove dead 'case closer' in coding-tdd.js moderator
3. Clean up blank lines left by deletions in .ts and index.ts

团子 🐰
2026-04-18 13:29:10 +00:00
tuanzi f3857888da refactor: remove closer role from coding and coding-tdd workflows
CI / test (pull_request) Has been cancelled
What: Remove the closer role from coding and coding-tdd workflows.
Why: The closer role only produced a summary after reviewer approved,
adding no value — reviewer approval is sufficient to end the workflow.
Changes:
- Delete CloserMeta/TddCloserMeta types and defaultCloser implementations
- Moderator now routes directly to END after reviewer approves
- Update all tests, .js and .d.ts artifacts accordingly
- Remove closer from index exports

团子 🐰
2026-04-18 12:16:02 +00:00
14 changed files with 50 additions and 925 deletions
-251
View File
@@ -1,251 +0,0 @@
import { afterEach, expect, test } from 'bun:test';
import { mkdtempSync, rmSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import {
clearGuardExpressionCache,
getGuardState,
GuardViolationError,
matchEventKindPattern,
registerGuard,
} from './guard-projection.js';
import { createScopedStore } from './store.js';
function mkScoped() {
const root = mkdtempSync(join(tmpdir(), 'pulse-guard-'));
const scoped = createScopedStore({
basePath: join(root, 'scopes'),
objectsDir: join(root, 'objects'),
});
return { root, scoped, db: scoped.scopeDatabase('g'), store: scoped.scope('g') };
}
let last: ReturnType<typeof mkScoped> | null = null;
afterEach(() => {
clearGuardExpressionCache();
if (last) {
try {
void last.scoped.close();
} catch {
/* ignore */
}
rmSync(last.root, { recursive: true, force: true });
last = null;
}
});
test('register guard + legal event passes', async () => {
last = mkScoped();
const { db, store } = last;
registerGuard(db, {
name: 'task-lifecycle',
initial_value: { phase: 'idle' },
sources: [
{
kind: 'task.created',
check: '(event.kind != "task.created") or (state.phase = "idle")',
transition: '{ "phase": "open" }',
},
],
});
const r = await store.appendEvent({
kind: 'task.created',
key: 't1',
occurredAt: 100,
});
expect(r.id).toBeGreaterThan(0);
expect(getGuardState(db, 'task-lifecycle', 't1')).toEqual({ phase: 'open' });
});
test('illegal event rejected when task already open', async () => {
last = mkScoped();
const { db, store } = last;
registerGuard(db, {
name: 'task-lifecycle',
initial_value: { phase: 'idle' },
sources: [
{
kind: 'task.created',
check: '(event.kind != "task.created") or (state.phase = "idle")',
transition: '{ "phase": "open" }',
},
],
});
await store.appendEvent({
kind: 'task.created',
key: 't1',
occurredAt: 1,
});
expect(async () => {
await store.appendEvent({
kind: 'task.created',
key: 't1',
occurredAt: 2,
});
}).toThrow(GuardViolationError);
});
test('state transitions created → claimed → completed', async () => {
last = mkScoped();
const { db, store } = last;
registerGuard(db, {
name: 'task-lifecycle',
initial_value: { phase: 'idle' },
sources: [
{
kind: 'task.created',
check: '(event.kind != "task.created") or (state.phase = "idle")',
transition: '{ "phase": "open" }',
},
{
kind: 'task.claimed',
check: '(event.kind != "task.claimed") or (state.phase = "open")',
transition: '{ "phase": "claimed" }',
},
{
kind: 'task.completed',
check: '(event.kind != "task.completed") or (state.phase = "claimed")',
transition: '{ "phase": "done" }',
},
],
});
await store.appendEvent({
kind: 'task.created',
key: 'k',
occurredAt: 10,
});
expect(getGuardState(db, 'task-lifecycle', 'k')).toEqual({ phase: 'open' });
await store.appendEvent({
kind: 'task.claimed',
key: 'k',
occurredAt: 20,
});
expect(getGuardState(db, 'task-lifecycle', 'k')).toEqual({ phase: 'claimed' });
await store.appendEvent({
kind: 'task.completed',
key: 'k',
occurredAt: 30,
});
expect(getGuardState(db, 'task-lifecycle', 'k')).toEqual({ phase: 'done' });
});
test('unrelated event kind passes without guard match', async () => {
last = mkScoped();
const { db, store } = last;
registerGuard(db, {
name: 'only-task',
initial_value: { n: 0 },
sources: [
{
kind: 'task.*',
check: 'true',
transition: '{ "n": state.n + 1 }',
},
],
});
await store.appendEvent({
kind: 'other.thing',
key: 'x',
occurredAt: 1,
});
expect(getGuardState(db, 'only-task', 'x')).toEqual({ n: 0 });
});
test('getGuardState reflects latest state', async () => {
last = mkScoped();
const { db, store } = last;
registerGuard(db, {
name: 'g',
initial_value: { v: 0 },
sources: [
{
kind: 'tick',
check: 'true',
transition: '{ "v": state.v + 1 }',
},
],
});
await store.appendEvent({ kind: 'tick', key: 'k', occurredAt: 1 });
expect(getGuardState(db, 'g', 'k')).toEqual({ v: 1 });
await store.appendEvent({ kind: 'tick', key: 'k', occurredAt: 2 });
expect(getGuardState(db, 'g', 'k')).toEqual({ v: 2 });
});
test('complex JSONata check (and / or / in)', async () => {
last = mkScoped();
const { db, store } = last;
registerGuard(db, {
name: 'complex',
initial_value: { mode: 'a' },
sources: [
{
kind: 'x',
check:
'(event.kind != "x") or ((state.mode = "a" or state.mode = "b") and event.payload.role in ["u", "v"])',
transition: '{ "mode": "b" }',
},
],
});
await store.appendEvent({
kind: 'x',
key: 'k',
occurredAt: 1,
meta: JSON.stringify({ role: 'u' }),
});
expect(getGuardState(db, 'complex', 'k')).toEqual({ mode: 'b' });
});
test('kind wildcard *.__start__ matches meta.__start__ and coding.__start__', async () => {
last = mkScoped();
const { db, store } = last;
expect(matchEventKindPattern('*.__start__', 'meta.__start__')).toBe(true);
expect(matchEventKindPattern('*.__start__', 'coding.__start__')).toBe(true);
expect(matchEventKindPattern('*.__start__', 'other')).toBe(false);
registerGuard(db, {
name: 'starts',
initial_value: { kinds: [] },
sources: [
{
kind: '*.__start__',
check: 'true',
transition:
'{ "kinds": $append(state.kinds ? state.kinds : [], event.kind) }',
},
],
});
await store.appendEvent({
kind: 'meta.__start__',
key: 't',
occurredAt: 1,
});
await store.appendEvent({
kind: 'coding.__start__',
key: 't',
occurredAt: 2,
});
expect(getGuardState(db, 'starts', 't')).toEqual({
kinds: ['meta.__start__', 'coding.__start__'],
});
});
-319
View File
@@ -1,319 +0,0 @@
/**
* Guard projections: synchronous fold + check on append (JSONata).
*/
import type { Database } from 'bun:sqlite';
import jsonata, { type Expression } from 'jsonata';
// ── Schema ────────────────────────────────────────────────────
export const GUARD_SCHEMA = `
CREATE TABLE IF NOT EXISTS guard_defs (
name TEXT PRIMARY KEY,
initial_value TEXT NOT NULL,
sources TEXT NOT NULL,
created_at INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS guard_states (
guard_name TEXT NOT NULL,
key TEXT NOT NULL DEFAULT '',
value TEXT NOT NULL,
last_event_id INTEGER NOT NULL DEFAULT 0,
updated_at INTEGER NOT NULL,
PRIMARY KEY (guard_name, key)
);
`;
export function initGuardSchema(db: Database): void {
db.exec(GUARD_SCHEMA);
}
// ── Types ─────────────────────────────────────────────────────
export interface GuardSource {
kind: string;
key_prefix?: string;
check: string;
transition: string;
}
export interface GuardProjectionDef {
name: string;
initial_value: any;
sources: GuardSource[];
}
export class GuardViolationError extends Error {
constructor(
public guardName: string,
public eventKind: string,
public eventKey: string | undefined,
public reason: string,
) {
super(`Guard "${guardName}" rejected event ${eventKind}: ${reason}`);
this.name = 'GuardViolationError';
}
}
export interface GuardUpdate {
guardName: string;
key: string;
newState: any;
lastEventId: number;
}
// ── Expression cache ───────────────────────────────────────────
const exprCache = new Map<string, Expression>();
function expr(expressionStr: string): Expression {
let e = exprCache.get(expressionStr);
if (!e) {
e = jsonata(expressionStr);
exprCache.set(expressionStr, e);
}
return e;
}
/** @internal */
export function clearGuardExpressionCache(): void {
exprCache.clear();
guardDefMemory.clear();
}
// ── Kind pattern (* = one dot segment) ─────────────────────────
export function matchEventKindPattern(pattern: string, kind: string): boolean {
const ps = pattern.split('.');
const ks = kind.split('.');
if (ps.length !== ks.length) return false;
for (let i = 0; i < ps.length; i++) {
if (ps[i] === '*') continue;
if (ps[i] !== ks[i]) return false;
}
return true;
}
// ── DB helpers ─────────────────────────────────────────────────
const insertGuardDefStmt = (db: Database) =>
db.prepare(`
INSERT OR REPLACE INTO guard_defs (name, initial_value, sources, created_at)
VALUES (?, ?, ?, ?)
`);
const selectGuardDefsStmt = (db: Database) =>
db.prepare(`SELECT name, initial_value, sources FROM guard_defs`);
const selectGuardStateStmt = (db: Database) =>
db.prepare(`
SELECT value, last_event_id FROM guard_states
WHERE guard_name = ? AND key = ?
`);
const upsertGuardStateStmt = (db: Database) =>
db.prepare(`
INSERT INTO guard_states (guard_name, key, value, last_event_id, updated_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(guard_name, key) DO UPDATE SET
value = excluded.value,
last_event_id = excluded.last_event_id,
updated_at = excluded.updated_at
`);
function listGuardDefs(db: Database): GuardProjectionDef[] {
const rows = selectGuardDefsStmt(db).all() as {
name: string;
initial_value: string;
sources: string;
}[];
const byName = new Map<string, GuardProjectionDef>();
for (const r of rows) {
byName.set(r.name, {
name: r.name,
initial_value: JSON.parse(r.initial_value) as unknown,
sources: JSON.parse(r.sources) as GuardSource[],
});
}
for (const [name, def] of guardDefMemory) {
byName.set(name, def);
}
return [...byName.values()];
}
function loadGuardRow(
db: Database,
guardName: string,
key: string,
): { value: any; lastEventId: number } | null {
const row = selectGuardStateStmt(db).get(guardName, key) as {
value: string;
last_event_id: number;
} | null;
if (!row) return null;
return {
value: JSON.parse(row.value) as unknown,
lastEventId: Number(row.last_event_id),
};
}
function eventBindings(
event: {
kind: string;
key?: string;
meta?: string;
occurredAt: number;
},
eventId: number,
) {
return {
id: eventId,
occurred_at: event.occurredAt,
kind: event.kind,
key: event.key,
payload: event.meta ? JSON.parse(event.meta) : {},
};
}
async function evalBool(
label: string,
expressionStr: string,
context: Record<string, unknown>,
): Promise<boolean> {
const e = expr(expressionStr);
const out = await e.evaluate(context, {});
if (typeof out === 'boolean') return out;
if (out === null || out === undefined) return false;
throw new Error(`${label} must return boolean, got ${typeof out}`);
}
/**
* Register guard (persist + memory cache of compiled defs for this process).
*/
const guardDefMemory = new Map<string, GuardProjectionDef>();
export function registerGuard(db: Database, def: GuardProjectionDef): void {
const now = Date.now();
insertGuardDefStmt(db).run(
def.name,
JSON.stringify(def.initial_value),
JSON.stringify(def.sources),
now,
);
guardDefMemory.set(def.name, def);
}
/**
* Read guard's current state (same idea as lazy projection read).
*/
export function getGuardState(db: Database, guardName: string, key: string): any {
const row = loadGuardRow(db, guardName, key);
if (!row) {
const defs = listGuardDefs(db);
const def = defs.find((d) => d.name === guardName);
return def ? def.initial_value : undefined;
}
return row.value;
}
/**
* Before append: match guards, fold, check. Returns pending state updates
* (lastEventId is a placeholder; caller sets to written event id).
*/
export async function checkGuards(
db: Database,
event: { kind: string; key?: string; meta?: string; occurredAt: number },
): Promise<{ updates: GuardUpdate[] }> {
const defs = listGuardDefs(db);
const updates: GuardUpdate[] = [];
const pendingEventId = 0;
for (const def of defs) {
const stateKey = event.key ?? '';
const row = loadGuardRow(db, def.name, stateKey);
const baseState = row ? row.value : def.initial_value;
let working = baseState;
let matchedAny = false;
for (const source of def.sources) {
if (!matchEventKindPattern(source.kind, event.kind)) continue;
if (source.key_prefix !== undefined) {
const k = event.key;
if (k === undefined || !k.startsWith(source.key_prefix)) continue;
}
matchedAny = true;
const ev = eventBindings(event, pendingEventId);
const ctx = { state: working, event: ev };
let newState: any;
try {
newState = await expr(source.transition).evaluate(ctx, {});
} catch (err: any) {
throw new GuardViolationError(
def.name,
event.kind,
event.key,
`transition failed: ${err?.message ?? String(err)}`,
);
}
let ok: boolean;
try {
ok = await evalBool('check', source.check, ctx);
} catch (err: any) {
throw new GuardViolationError(
def.name,
event.kind,
event.key,
`check failed: ${err?.message ?? String(err)}`,
);
}
if (!ok) {
throw new GuardViolationError(
def.name,
event.kind,
event.key,
'check returned false',
);
}
working = newState;
}
if (matchedAny) {
updates.push({
guardName: def.name,
key: stateKey,
newState: working,
lastEventId: 0,
});
}
}
return { updates };
}
/**
* After event insert: persist guard states (same transaction as the event).
*/
export function applyGuardUpdates(db: Database, updates: GuardUpdate[]): void {
const now = Date.now();
const stmt = upsertGuardStateStmt(db);
for (const u of updates) {
stmt.run(
u.guardName,
u.key,
JSON.stringify(u.newState),
u.lastEventId,
now,
);
}
}
-13
View File
@@ -969,10 +969,6 @@ export type {
WorkflowTickResult,
} from './workflows/workflow-rule-adapter.js';
export { createWorkflowRule } from './workflows/workflow-rule-adapter.js';
export {
type SubprocessRoleConfig,
createSubprocessRole,
} from './workflows/subprocess-role.js';
export {
END,
type MetaOf,
@@ -1018,15 +1014,6 @@ export {
// ── Projection Engine ───────────────────────────────────────────
export * from './projection-engine.js';
// ── Guard projections ───────────────────────────────────────────
export {
type GuardProjectionDef,
type GuardSource,
GuardViolationError,
getGuardState,
registerGuard,
} from './guard-projection.js';
// ── Task Event Types ────────────────────────────────────────────
export type {
ActiveProjectsData,
+44 -60
View File
@@ -16,11 +16,6 @@ import {
} from 'node:fs';
import { dirname, join } from 'node:path';
import { initDefsSchema } from './defs.js';
import {
applyGuardUpdates,
checkGuards,
initGuardSchema,
} from './guard-projection.js';
import { PROJECTIONS_SCHEMA } from './projection-engine.js';
// ── Types ──────────────────────────────────────────────────────
@@ -217,55 +212,6 @@ function rowToObjectInstance(row: RawObjectRow): ObjectInstance {
};
}
async function appendOneWithGuards(
db: Database,
insert: (event: Omit<EventRecord, 'id'>) => EventRecord,
event: Omit<EventRecord, 'id'>,
): Promise<EventRecord> {
const nested = db.inTransaction;
if (!nested) db.exec('BEGIN IMMEDIATE');
try {
const { updates } = await checkGuards(db, event);
const written = insert(event);
applyGuardUpdates(
db,
updates.map((u) => ({ ...u, lastEventId: written.id })),
);
if (!nested) db.exec('COMMIT');
return written;
} catch (e) {
if (!nested) db.exec('ROLLBACK');
throw e;
}
}
async function appendManyWithGuards(
db: Database,
insert: (event: Omit<EventRecord, 'id'>) => EventRecord,
events: Omit<EventRecord, 'id'>[],
): Promise<EventRecord[]> {
if (events.length === 0) return [];
const nested = db.inTransaction;
if (!nested) db.exec('BEGIN IMMEDIATE');
try {
const results: EventRecord[] = [];
for (const event of events) {
const { updates } = await checkGuards(db, event);
const written = insert(event);
applyGuardUpdates(
db,
updates.map((u) => ({ ...u, lastEventId: written.id })),
);
results.push(written);
}
if (!nested) db.exec('COMMIT');
return results;
} catch (e) {
if (!nested) db.exec('ROLLBACK');
throw e;
}
}
// ── Factory ────────────────────────────────────────────────────
export interface CreateStoreOptions {
@@ -286,7 +232,6 @@ export function createStore(options: CreateStoreOptions): PulseStore {
eventsDb.exec(EVENTS_SCHEMA);
eventsDb.exec(OBJECTS_SCHEMA);
migrateEventsObjectId(eventsDb);
initGuardSchema(eventsDb);
const insertEvent = eventsDb.prepare(`
INSERT INTO events (occurred_at, kind, key, hash, code_rev, meta, object_id)
@@ -318,15 +263,35 @@ export function createStore(options: CreateStoreOptions): PulseStore {
return { id, ...event };
}
const appendManyTx = eventsDb.transaction(
(events: Omit<EventRecord, 'id'>[]): EventRecord[] => {
const results: EventRecord[] = [];
for (const event of events) {
const result = insertEvent.run(
event.occurredAt,
event.kind,
event.key ?? null,
event.hash ?? null,
event.codeRev ?? null,
event.meta ?? null,
event.objectId ?? null,
);
const id = Number(result.lastInsertRowid);
results.push({ id, ...event });
}
return results;
},
);
return {
async appendEvent(event: Omit<EventRecord, 'id'>): Promise<EventRecord> {
return appendOneWithGuards(eventsDb, doAppendEvent, event);
return doAppendEvent(event);
},
async appendEvents(
events: Omit<EventRecord, 'id'>[],
): Promise<EventRecord[]> {
return appendManyWithGuards(eventsDb, doAppendEvent, events);
return appendManyTx(events);
},
async getLatest(kind: string, key?: string): Promise<EventRecord | null> {
@@ -573,7 +538,6 @@ function openScopeDb(path: string): Database {
// Each scope carries its own def tables (bun:sqlite is sync under async wrapper)
void initDefsSchema(db);
initGuardSchema(db);
return db;
}
@@ -609,13 +573,33 @@ function createScopeStore(db: Database, objectsDir: string): PulseStore {
return { id, ...event };
}
const appendManyTx = db.transaction(
(events: Omit<EventRecord, 'id'>[]): EventRecord[] => {
const results: EventRecord[] = [];
for (const event of events) {
const result = insertEvent.run(
event.occurredAt,
event.kind,
event.key ?? null,
event.hash ?? null,
event.codeRev ?? null,
event.meta ?? null,
event.objectId ?? null,
);
const id = Number(result.lastInsertRowid);
results.push({ id, ...event });
}
return results;
},
);
return {
async appendEvent(event) {
return appendOneWithGuards(db, doAppendEvent, event);
return doAppendEvent(event);
},
async appendEvents(events) {
return appendManyWithGuards(db, doAppendEvent, events);
return appendManyTx(events);
},
async getLatest(kind, key?) {
@@ -1,4 +0,0 @@
/** Crash role — exits process immediately */
export async function crashRole() {
process.exit(1);
}
@@ -1,5 +0,0 @@
/** Echo role — returns first message content or 'empty' */
export async function echoRole(chain: any[], topicId: string) {
const content = chain.length > 0 ? chain[chain.length - 1].content : 'empty';
return { content: `echo:${content}:${topicId}`, meta: { echoed: true } };
}
@@ -1,4 +0,0 @@
/** Error role — always throws */
export async function errorRole() {
throw new Error('deliberate test error');
}
@@ -1,6 +0,0 @@
/** Hang role — infinite loop to test timeout */
export async function hangRole() {
while (true) {
await new Promise((r) => setTimeout(r, 100));
}
}
-4
View File
@@ -24,10 +24,6 @@ export { createLlmRole, createToolRole } from './roles/llm-role-factory.js';
export { createMetaCoderRole } from './roles/meta-coder-cursor.js';
export { createMetaTesterRole } from './roles/meta-tester.js';
export { createMetaCheckerRole } from './roles/meta-checker.js';
export {
type SubprocessRoleConfig,
createSubprocessRole,
} from './subprocess-role.js';
export { type ScaffoldOptions, scaffoldWorkflow } from './scaffold.js';
export {
createWorkflowRule,
@@ -57,9 +57,12 @@ export function createMetaCheckerRole(opts: {
}
if (changedFiles.length > 0) {
// Allowed: only src/workflows/ (meta workflow scope)
// Allowed: src/workflows/*, docs/*, tests under src/
const allowedPrefixes = [
'src/workflows/',
'src/',
'docs/',
'test/',
'tests/',
...(opts.allowedPrefixes ?? []),
];
@@ -81,7 +84,7 @@ export function createMetaCheckerRole(opts: {
];
for (const file of changedFiles) {
const basename = file.split('/').pop() ?? file;
if (blacklist.includes(basename)) {
if (blacklist.includes(basename) && !file.startsWith('src/workflows/')) {
violations.push(`禁止修改: ${file}`);
}
}
@@ -64,7 +64,6 @@ ${testerFeedback}
## 约束
- commit author: 小橘 <xiaoju@shazhou.work>
- 只修改 $HOME/.upulse/engine/src/workflows/ 下的代码
- 不修改 workflow-rule-adapter.ts 和 workflow-type.ts`;
return { prompt, cwd: repoDir };
@@ -1,59 +0,0 @@
/**
* Tests for subprocess role execution isolation.
* 小橘 🍊 (NEKO Team)
*/
import { describe, expect, test } from 'bun:test';
import { join } from 'node:path';
import { createSubprocessRole } from './subprocess-role.js';
const FIXTURES = join(import.meta.dir, '__fixtures__');
describe('createSubprocessRole', () => {
test('executes echo role successfully', async () => {
const role = createSubprocessRole({
rolePath: join(FIXTURES, 'echo-role.ts'),
roleExport: 'echoRole',
});
const chain = [
{ role: '__start__', content: 'hello', meta: null, timestamp: Date.now() },
];
const result = await role(chain, 'topic-1', null as any);
expect(result.content).toBe('echo:hello:topic-1');
expect(result.meta).toEqual({ echoed: true });
});
test('propagates role errors', async () => {
const role = createSubprocessRole({
rolePath: join(FIXTURES, 'error-role.ts'),
roleExport: 'errorRole',
});
await expect(role([], 'topic-err', null as any)).rejects.toThrow(
'deliberate test error',
);
});
test('kills on timeout', async () => {
const role = createSubprocessRole({
rolePath: join(FIXTURES, 'hang-role.ts'),
roleExport: 'hangRole',
timeoutMs: 1_000,
});
await expect(role([], 'topic-hang', null as any)).rejects.toThrow(
/timed out/,
);
}, 10_000);
test('handles subprocess crash', async () => {
const role = createSubprocessRole({
rolePath: join(FIXTURES, 'crash-role.ts'),
roleExport: 'crashRole',
});
await expect(role([], 'topic-crash', null as any)).rejects.toThrow();
});
});
@@ -1,114 +0,0 @@
/**
* Subprocess role wrapper — runs a Role in an isolated child process.
*
* Usage:
* const role = createSubprocessRole({
* rolePath: '/abs/path/to/my-role.ts',
* roleExport: 'myRole',
* timeoutMs: 60_000,
* });
* // role has the same signature as Role<unknown>
*
* 小橘 🍊 (NEKO Team)
*/
import { join } from 'node:path';
import type { Role, RoleResult, WorkflowMessage } from './workflow-type.js';
export interface SubprocessRoleConfig {
/** Absolute path to the module containing the role function */
rolePath: string;
/** Export name of the role function (e.g. 'myRole') */
roleExport: string;
/** Timeout in ms, default 300_000 (5 min) */
timeoutMs?: number;
/** Store config passed to subprocess if role needs a store */
storeConfig?: { eventsDbPath: string; objectsDir: string };
}
const RUNNER_PATH = join(import.meta.dir, 'subprocess-runner.ts');
/**
* Wrap a Role as a subprocess execution.
* Returns a function matching the Role<unknown> signature.
*/
export function createSubprocessRole(config: SubprocessRoleConfig): Role<unknown> {
const {
rolePath,
roleExport,
timeoutMs = 300_000,
storeConfig,
} = config;
return async (
chain: WorkflowMessage[],
topicId: string,
): Promise<RoleResult<unknown>> => {
const input = JSON.stringify({
rolePath,
roleExport,
chain,
topicId,
storeConfig,
});
const proc = Bun.spawn(['bun', 'run', RUNNER_PATH], {
stdin: 'pipe',
stdout: 'pipe',
stderr: 'pipe',
});
// Write input and close stdin
proc.stdin.write(input);
proc.stdin.end();
// Timeout handling
let killed = false;
const timer = setTimeout(() => {
killed = true;
proc.kill();
}, timeoutMs);
try {
// Wait for exit
const exitCode = await proc.exited;
clearTimeout(timer);
if (killed) {
throw new Error(
`Subprocess role '${roleExport}' timed out after ${timeoutMs}ms`,
);
}
// Read stdout
const stdout = await new Response(proc.stdout).text();
if (exitCode !== 0 && !stdout.trim()) {
const stderr = await new Response(proc.stderr).text();
throw new Error(
`Subprocess role '${roleExport}' exited with code ${exitCode}: ${stderr.slice(0, 500)}`,
);
}
let result: any;
try {
result = JSON.parse(stdout);
} catch {
throw new Error(
`Subprocess role '${roleExport}' returned invalid JSON: ${stdout.slice(0, 500)}`,
);
}
if (!result.ok) {
throw new Error(
`Subprocess role '${roleExport}' failed: ${result.error}`,
);
}
return { content: result.content, meta: result.meta ?? null };
} catch (err) {
clearTimeout(timer);
throw err;
}
};
}
@@ -1,82 +0,0 @@
/**
* Subprocess runner — child process entry point for isolated role execution.
*
* Reads JSON from stdin, dynamically imports the role module, executes it,
* and writes JSON result to stdout.
*
* 小橘 🍊 (NEKO Team)
*/
import { createStore } from '../store.js';
interface RunnerInput {
rolePath: string;
roleExport: string;
chain: Array<{
role: string;
content: string;
meta: Record<string, unknown> | null;
timestamp: number;
}>;
topicId: string;
storeConfig?: { eventsDbPath: string; objectsDir: string };
}
async function main() {
// Read all stdin
const chunks: Buffer[] = [];
for await (const chunk of process.stdin) {
chunks.push(chunk as Buffer);
}
const raw = Buffer.concat(chunks).toString('utf-8');
let input: RunnerInput;
try {
input = JSON.parse(raw);
} catch (e) {
process.stdout.write(
JSON.stringify({ ok: false, error: `Invalid JSON input: ${e}` }),
);
process.exit(1);
}
try {
// Dynamic import of role module
const mod = await import(input.rolePath);
const roleFn = mod[input.roleExport];
if (typeof roleFn !== 'function') {
throw new Error(
`Export '${input.roleExport}' from '${input.rolePath}' is not a function (got ${typeof roleFn})`,
);
}
// Create store if config provided
let store: ReturnType<typeof createStore> | undefined;
if (input.storeConfig) {
store = createStore({
eventsDbPath: input.storeConfig.eventsDbPath,
objectsDir: input.storeConfig.objectsDir,
});
}
const result = await roleFn(input.chain, input.topicId, store);
process.stdout.write(
JSON.stringify({
ok: true,
content: result.content,
meta: result.meta ?? null,
}),
);
} catch (err) {
process.stdout.write(
JSON.stringify({
ok: false,
error: err instanceof Error ? err.message : String(err),
}),
);
process.exit(1);
}
}
main();