From b94a9046ac3c0c83406f0ea4f506f9bb9c62d900 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Sat, 18 Apr 2026 11:36:43 +0000 Subject: [PATCH] feat(workflows): subprocess role isolation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add createSubprocessRole() to run workflow roles in isolated child processes. Roles that crash/hang/OOM no longer affect the daemon. - subprocess-runner.ts: child process entry, reads stdin JSON, executes role - subprocess-role.ts: createSubprocessRole() wrapper matching Role - 4 test fixtures + 4 passing tests (echo, error, timeout, crash) - Exported from workflows/index.ts and main index.ts 小橘 🍊 (NEKO Team) --- packages/pulse/src/index.ts | 4 + .../src/workflows/__fixtures__/crash-role.ts | 4 + .../src/workflows/__fixtures__/echo-role.ts | 5 + .../src/workflows/__fixtures__/error-role.ts | 4 + .../src/workflows/__fixtures__/hang-role.ts | 6 + packages/pulse/src/workflows/index.ts | 4 + .../src/workflows/subprocess-role.test.ts | 59 +++++++++ .../pulse/src/workflows/subprocess-role.ts | 114 ++++++++++++++++++ .../pulse/src/workflows/subprocess-runner.ts | 82 +++++++++++++ 9 files changed, 282 insertions(+) create mode 100644 packages/pulse/src/workflows/__fixtures__/crash-role.ts create mode 100644 packages/pulse/src/workflows/__fixtures__/echo-role.ts create mode 100644 packages/pulse/src/workflows/__fixtures__/error-role.ts create mode 100644 packages/pulse/src/workflows/__fixtures__/hang-role.ts create mode 100644 packages/pulse/src/workflows/subprocess-role.test.ts create mode 100644 packages/pulse/src/workflows/subprocess-role.ts create mode 100644 packages/pulse/src/workflows/subprocess-runner.ts diff --git a/packages/pulse/src/index.ts b/packages/pulse/src/index.ts index ca9cb49..a4858f4 100644 --- a/packages/pulse/src/index.ts +++ b/packages/pulse/src/index.ts @@ -969,6 +969,10 @@ export type { WorkflowTickResult, } from './workflows/workflow-rule-adapter.js'; export { createWorkflowRule } from './workflows/workflow-rule-adapter.js'; +export { + type SubprocessRoleConfig, + createSubprocessRole, +} from './workflows/subprocess-role.js'; export { END, type MetaOf, diff --git a/packages/pulse/src/workflows/__fixtures__/crash-role.ts b/packages/pulse/src/workflows/__fixtures__/crash-role.ts new file mode 100644 index 0000000..12e4a15 --- /dev/null +++ b/packages/pulse/src/workflows/__fixtures__/crash-role.ts @@ -0,0 +1,4 @@ +/** Crash role — exits process immediately */ +export async function crashRole() { + process.exit(1); +} diff --git a/packages/pulse/src/workflows/__fixtures__/echo-role.ts b/packages/pulse/src/workflows/__fixtures__/echo-role.ts new file mode 100644 index 0000000..8862f69 --- /dev/null +++ b/packages/pulse/src/workflows/__fixtures__/echo-role.ts @@ -0,0 +1,5 @@ +/** Echo role — returns first message content or 'empty' */ +export async function echoRole(chain: any[], topicId: string) { + const content = chain.length > 0 ? chain[chain.length - 1].content : 'empty'; + return { content: `echo:${content}:${topicId}`, meta: { echoed: true } }; +} diff --git a/packages/pulse/src/workflows/__fixtures__/error-role.ts b/packages/pulse/src/workflows/__fixtures__/error-role.ts new file mode 100644 index 0000000..24dce12 --- /dev/null +++ b/packages/pulse/src/workflows/__fixtures__/error-role.ts @@ -0,0 +1,4 @@ +/** Error role — always throws */ +export async function errorRole() { + throw new Error('deliberate test error'); +} diff --git a/packages/pulse/src/workflows/__fixtures__/hang-role.ts b/packages/pulse/src/workflows/__fixtures__/hang-role.ts new file mode 100644 index 0000000..03bfcb8 --- /dev/null +++ b/packages/pulse/src/workflows/__fixtures__/hang-role.ts @@ -0,0 +1,6 @@ +/** Hang role — infinite loop to test timeout */ +export async function hangRole() { + while (true) { + await new Promise((r) => setTimeout(r, 100)); + } +} diff --git a/packages/pulse/src/workflows/index.ts b/packages/pulse/src/workflows/index.ts index e7f7fa3..4879976 100644 --- a/packages/pulse/src/workflows/index.ts +++ b/packages/pulse/src/workflows/index.ts @@ -24,6 +24,10 @@ export { createLlmRole, createToolRole } from './roles/llm-role-factory.js'; export { createMetaCoderRole } from './roles/meta-coder-cursor.js'; export { createMetaTesterRole } from './roles/meta-tester.js'; export { createMetaCheckerRole } from './roles/meta-checker.js'; +export { + type SubprocessRoleConfig, + createSubprocessRole, +} from './subprocess-role.js'; export { type ScaffoldOptions, scaffoldWorkflow } from './scaffold.js'; export { createWorkflowRule, diff --git a/packages/pulse/src/workflows/subprocess-role.test.ts b/packages/pulse/src/workflows/subprocess-role.test.ts new file mode 100644 index 0000000..787030f --- /dev/null +++ b/packages/pulse/src/workflows/subprocess-role.test.ts @@ -0,0 +1,59 @@ +/** + * Tests for subprocess role execution isolation. + * 小橘 🍊 (NEKO Team) + */ + +import { describe, expect, test } from 'bun:test'; +import { join } from 'node:path'; +import { createSubprocessRole } from './subprocess-role.js'; + +const FIXTURES = join(import.meta.dir, '__fixtures__'); + +describe('createSubprocessRole', () => { + test('executes echo role successfully', async () => { + const role = createSubprocessRole({ + rolePath: join(FIXTURES, 'echo-role.ts'), + roleExport: 'echoRole', + }); + + const chain = [ + { role: '__start__', content: 'hello', meta: null, timestamp: Date.now() }, + ]; + const result = await role(chain, 'topic-1', null as any); + + expect(result.content).toBe('echo:hello:topic-1'); + expect(result.meta).toEqual({ echoed: true }); + }); + + test('propagates role errors', async () => { + const role = createSubprocessRole({ + rolePath: join(FIXTURES, 'error-role.ts'), + roleExport: 'errorRole', + }); + + await expect(role([], 'topic-err', null as any)).rejects.toThrow( + 'deliberate test error', + ); + }); + + test('kills on timeout', async () => { + const role = createSubprocessRole({ + rolePath: join(FIXTURES, 'hang-role.ts'), + roleExport: 'hangRole', + timeoutMs: 1_000, + }); + + await expect(role([], 'topic-hang', null as any)).rejects.toThrow( + /timed out/, + ); + }, 10_000); + + test('handles subprocess crash', async () => { + const role = createSubprocessRole({ + rolePath: join(FIXTURES, 'crash-role.ts'), + roleExport: 'crashRole', + }); + + await expect(role([], 'topic-crash', null as any)).rejects.toThrow(); + }); +}); diff --git a/packages/pulse/src/workflows/subprocess-role.ts b/packages/pulse/src/workflows/subprocess-role.ts new file mode 100644 index 0000000..a451e64 --- /dev/null +++ b/packages/pulse/src/workflows/subprocess-role.ts @@ -0,0 +1,114 @@ +/** + * Subprocess role wrapper — runs a Role in an isolated child process. + * + * Usage: + * const role = createSubprocessRole({ + * rolePath: '/abs/path/to/my-role.ts', + * roleExport: 'myRole', + * timeoutMs: 60_000, + * }); + * // role has the same signature as Role + * + * 小橘 🍊 (NEKO Team) + */ + +import { join } from 'node:path'; +import type { Role, RoleResult, WorkflowMessage } from './workflow-type.js'; + +export interface SubprocessRoleConfig { + /** Absolute path to the module containing the role function */ + rolePath: string; + /** Export name of the role function (e.g. 'myRole') */ + roleExport: string; + /** Timeout in ms, default 300_000 (5 min) */ + timeoutMs?: number; + /** Store config passed to subprocess if role needs a store */ + storeConfig?: { eventsDbPath: string; objectsDir: string }; +} + +const RUNNER_PATH = join(import.meta.dir, 'subprocess-runner.ts'); + +/** + * Wrap a Role as a subprocess execution. + * Returns a function matching the Role signature. + */ +export function createSubprocessRole(config: SubprocessRoleConfig): Role { + const { + rolePath, + roleExport, + timeoutMs = 300_000, + storeConfig, + } = config; + + return async ( + chain: WorkflowMessage[], + topicId: string, + ): Promise> => { + const input = JSON.stringify({ + rolePath, + roleExport, + chain, + topicId, + storeConfig, + }); + + const proc = Bun.spawn(['bun', 'run', RUNNER_PATH], { + stdin: 'pipe', + stdout: 'pipe', + stderr: 'pipe', + }); + + // Write input and close stdin + proc.stdin.write(input); + proc.stdin.end(); + + // Timeout handling + let killed = false; + const timer = setTimeout(() => { + killed = true; + proc.kill(); + }, timeoutMs); + + try { + // Wait for exit + const exitCode = await proc.exited; + clearTimeout(timer); + + if (killed) { + throw new Error( + `Subprocess role '${roleExport}' timed out after ${timeoutMs}ms`, + ); + } + + // Read stdout + const stdout = await new Response(proc.stdout).text(); + + if (exitCode !== 0 && !stdout.trim()) { + const stderr = await new Response(proc.stderr).text(); + throw new Error( + `Subprocess role '${roleExport}' exited with code ${exitCode}: ${stderr.slice(0, 500)}`, + ); + } + + let result: any; + try { + result = JSON.parse(stdout); + } catch { + throw new Error( + `Subprocess role '${roleExport}' returned invalid JSON: ${stdout.slice(0, 500)}`, + ); + } + + if (!result.ok) { + throw new Error( + `Subprocess role '${roleExport}' failed: ${result.error}`, + ); + } + + return { content: result.content, meta: result.meta ?? null }; + } catch (err) { + clearTimeout(timer); + throw err; + } + }; +} diff --git a/packages/pulse/src/workflows/subprocess-runner.ts b/packages/pulse/src/workflows/subprocess-runner.ts new file mode 100644 index 0000000..8e35662 --- /dev/null +++ b/packages/pulse/src/workflows/subprocess-runner.ts @@ -0,0 +1,82 @@ +/** + * Subprocess runner — child process entry point for isolated role execution. + * + * Reads JSON from stdin, dynamically imports the role module, executes it, + * and writes JSON result to stdout. + * + * 小橘 🍊 (NEKO Team) + */ + +import { createStore } from '../store.js'; + +interface RunnerInput { + rolePath: string; + roleExport: string; + chain: Array<{ + role: string; + content: string; + meta: Record | null; + timestamp: number; + }>; + topicId: string; + storeConfig?: { eventsDbPath: string; objectsDir: string }; +} + +async function main() { + // Read all stdin + const chunks: Buffer[] = []; + for await (const chunk of process.stdin) { + chunks.push(chunk as Buffer); + } + const raw = Buffer.concat(chunks).toString('utf-8'); + + let input: RunnerInput; + try { + input = JSON.parse(raw); + } catch (e) { + process.stdout.write( + JSON.stringify({ ok: false, error: `Invalid JSON input: ${e}` }), + ); + process.exit(1); + } + + try { + // Dynamic import of role module + const mod = await import(input.rolePath); + const roleFn = mod[input.roleExport]; + if (typeof roleFn !== 'function') { + throw new Error( + `Export '${input.roleExport}' from '${input.rolePath}' is not a function (got ${typeof roleFn})`, + ); + } + + // Create store if config provided + let store: ReturnType | undefined; + if (input.storeConfig) { + store = createStore({ + eventsDbPath: input.storeConfig.eventsDbPath, + objectsDir: input.storeConfig.objectsDir, + }); + } + + const result = await roleFn(input.chain, input.topicId, store); + + process.stdout.write( + JSON.stringify({ + ok: true, + content: result.content, + meta: result.meta ?? null, + }), + ); + } catch (err) { + process.stdout.write( + JSON.stringify({ + ok: false, + error: err instanceof Error ? err.message : String(err), + }), + ); + process.exit(1); + } +} + +main();