feat(workflows): subprocess role isolation
CI / test (push) Has been cancelled

Add createSubprocessRole() to run workflow roles in isolated child
processes. Roles that crash/hang/OOM no longer affect the daemon.

- subprocess-runner.ts: child process entry, reads stdin JSON, executes role
- subprocess-role.ts: createSubprocessRole() wrapper matching Role<unknown>
- 4 test fixtures + 4 passing tests (echo, error, timeout, crash)
- Exported from workflows/index.ts and main index.ts

小橘 🍊 (NEKO Team)
This commit is contained in:
2026-04-18 11:36:43 +00:00
parent 6c3888af48
commit b94a9046ac
9 changed files with 282 additions and 0 deletions
+4
View File
@@ -969,6 +969,10 @@ export type {
WorkflowTickResult,
} from './workflows/workflow-rule-adapter.js';
export { createWorkflowRule } from './workflows/workflow-rule-adapter.js';
export {
type SubprocessRoleConfig,
createSubprocessRole,
} from './workflows/subprocess-role.js';
export {
END,
type MetaOf,
@@ -0,0 +1,4 @@
/** Crash role — exits process immediately */
export async function crashRole() {
process.exit(1);
}
@@ -0,0 +1,5 @@
/** Echo role — returns first message content or 'empty' */
export async function echoRole(chain: any[], topicId: string) {
const content = chain.length > 0 ? chain[chain.length - 1].content : 'empty';
return { content: `echo:${content}:${topicId}`, meta: { echoed: true } };
}
@@ -0,0 +1,4 @@
/** Error role — always throws */
export async function errorRole() {
throw new Error('deliberate test error');
}
@@ -0,0 +1,6 @@
/** Hang role — infinite loop to test timeout */
export async function hangRole() {
while (true) {
await new Promise((r) => setTimeout(r, 100));
}
}
+4
View File
@@ -24,6 +24,10 @@ export { createLlmRole, createToolRole } from './roles/llm-role-factory.js';
export { createMetaCoderRole } from './roles/meta-coder-cursor.js';
export { createMetaTesterRole } from './roles/meta-tester.js';
export { createMetaCheckerRole } from './roles/meta-checker.js';
export {
type SubprocessRoleConfig,
createSubprocessRole,
} from './subprocess-role.js';
export { type ScaffoldOptions, scaffoldWorkflow } from './scaffold.js';
export {
createWorkflowRule,
@@ -0,0 +1,59 @@
/**
* Tests for subprocess role execution isolation.
* 小橘 🍊 (NEKO Team)
*/
import { describe, expect, test } from 'bun:test';
import { join } from 'node:path';
import { createSubprocessRole } from './subprocess-role.js';
const FIXTURES = join(import.meta.dir, '__fixtures__');
describe('createSubprocessRole', () => {
test('executes echo role successfully', async () => {
const role = createSubprocessRole({
rolePath: join(FIXTURES, 'echo-role.ts'),
roleExport: 'echoRole',
});
const chain = [
{ role: '__start__', content: 'hello', meta: null, timestamp: Date.now() },
];
const result = await role(chain, 'topic-1', null as any);
expect(result.content).toBe('echo:hello:topic-1');
expect(result.meta).toEqual({ echoed: true });
});
test('propagates role errors', async () => {
const role = createSubprocessRole({
rolePath: join(FIXTURES, 'error-role.ts'),
roleExport: 'errorRole',
});
await expect(role([], 'topic-err', null as any)).rejects.toThrow(
'deliberate test error',
);
});
test('kills on timeout', async () => {
const role = createSubprocessRole({
rolePath: join(FIXTURES, 'hang-role.ts'),
roleExport: 'hangRole',
timeoutMs: 1_000,
});
await expect(role([], 'topic-hang', null as any)).rejects.toThrow(
/timed out/,
);
}, 10_000);
test('handles subprocess crash', async () => {
const role = createSubprocessRole({
rolePath: join(FIXTURES, 'crash-role.ts'),
roleExport: 'crashRole',
});
await expect(role([], 'topic-crash', null as any)).rejects.toThrow();
});
});
@@ -0,0 +1,114 @@
/**
* Subprocess role wrapper — runs a Role in an isolated child process.
*
* Usage:
* const role = createSubprocessRole({
* rolePath: '/abs/path/to/my-role.ts',
* roleExport: 'myRole',
* timeoutMs: 60_000,
* });
* // role has the same signature as Role<unknown>
*
* 小橘 🍊 (NEKO Team)
*/
import { join } from 'node:path';
import type { Role, RoleResult, WorkflowMessage } from './workflow-type.js';
export interface SubprocessRoleConfig {
/** Absolute path to the module containing the role function */
rolePath: string;
/** Export name of the role function (e.g. 'myRole') */
roleExport: string;
/** Timeout in ms, default 300_000 (5 min) */
timeoutMs?: number;
/** Store config passed to subprocess if role needs a store */
storeConfig?: { eventsDbPath: string; objectsDir: string };
}
const RUNNER_PATH = join(import.meta.dir, 'subprocess-runner.ts');
/**
* Wrap a Role as a subprocess execution.
* Returns a function matching the Role<unknown> signature.
*/
export function createSubprocessRole(config: SubprocessRoleConfig): Role<unknown> {
const {
rolePath,
roleExport,
timeoutMs = 300_000,
storeConfig,
} = config;
return async (
chain: WorkflowMessage[],
topicId: string,
): Promise<RoleResult<unknown>> => {
const input = JSON.stringify({
rolePath,
roleExport,
chain,
topicId,
storeConfig,
});
const proc = Bun.spawn(['bun', 'run', RUNNER_PATH], {
stdin: 'pipe',
stdout: 'pipe',
stderr: 'pipe',
});
// Write input and close stdin
proc.stdin.write(input);
proc.stdin.end();
// Timeout handling
let killed = false;
const timer = setTimeout(() => {
killed = true;
proc.kill();
}, timeoutMs);
try {
// Wait for exit
const exitCode = await proc.exited;
clearTimeout(timer);
if (killed) {
throw new Error(
`Subprocess role '${roleExport}' timed out after ${timeoutMs}ms`,
);
}
// Read stdout
const stdout = await new Response(proc.stdout).text();
if (exitCode !== 0 && !stdout.trim()) {
const stderr = await new Response(proc.stderr).text();
throw new Error(
`Subprocess role '${roleExport}' exited with code ${exitCode}: ${stderr.slice(0, 500)}`,
);
}
let result: any;
try {
result = JSON.parse(stdout);
} catch {
throw new Error(
`Subprocess role '${roleExport}' returned invalid JSON: ${stdout.slice(0, 500)}`,
);
}
if (!result.ok) {
throw new Error(
`Subprocess role '${roleExport}' failed: ${result.error}`,
);
}
return { content: result.content, meta: result.meta ?? null };
} catch (err) {
clearTimeout(timer);
throw err;
}
};
}
@@ -0,0 +1,82 @@
/**
* Subprocess runner — child process entry point for isolated role execution.
*
* Reads JSON from stdin, dynamically imports the role module, executes it,
* and writes JSON result to stdout.
*
* 小橘 🍊 (NEKO Team)
*/
import { createStore } from '../store.js';
interface RunnerInput {
rolePath: string;
roleExport: string;
chain: Array<{
role: string;
content: string;
meta: Record<string, unknown> | null;
timestamp: number;
}>;
topicId: string;
storeConfig?: { eventsDbPath: string; objectsDir: string };
}
async function main() {
// Read all stdin
const chunks: Buffer[] = [];
for await (const chunk of process.stdin) {
chunks.push(chunk as Buffer);
}
const raw = Buffer.concat(chunks).toString('utf-8');
let input: RunnerInput;
try {
input = JSON.parse(raw);
} catch (e) {
process.stdout.write(
JSON.stringify({ ok: false, error: `Invalid JSON input: ${e}` }),
);
process.exit(1);
}
try {
// Dynamic import of role module
const mod = await import(input.rolePath);
const roleFn = mod[input.roleExport];
if (typeof roleFn !== 'function') {
throw new Error(
`Export '${input.roleExport}' from '${input.rolePath}' is not a function (got ${typeof roleFn})`,
);
}
// Create store if config provided
let store: ReturnType<typeof createStore> | undefined;
if (input.storeConfig) {
store = createStore({
eventsDbPath: input.storeConfig.eventsDbPath,
objectsDir: input.storeConfig.objectsDir,
});
}
const result = await roleFn(input.chain, input.topicId, store);
process.stdout.write(
JSON.stringify({
ok: true,
content: result.content,
meta: result.meta ?? null,
}),
);
} catch (err) {
process.stdout.write(
JSON.stringify({
ok: false,
error: err instanceof Error ? err.message : String(err),
}),
);
process.exit(1);
}
}
main();