diff --git a/docs/rfc-001-workflow-engine.md b/docs/rfc-001-workflow-engine.md index 3f95bcc..6b7f892 100644 --- a/docs/rfc-001-workflow-engine.md +++ b/docs/rfc-001-workflow-engine.md @@ -35,9 +35,15 @@ type WorkflowResult = { summary: string; }; +/** Input to a workflow — prompt + optional historical steps for fork/resume. */ +type ThreadInput = { + prompt: string; + steps: RoleOutput[]; // [] for new thread, pre-filled for fork/resume +}; + /** The bundle contract — an AsyncGenerator, not a Promise. */ type WorkflowFn = ( - prompt: string, + input: ThreadInput, options: { isDryRun: boolean; maxRounds: number } ) => AsyncGenerator; ``` @@ -49,8 +55,8 @@ exporting a framework-specific shape: ```typescript // Example bundle — zero framework dependency -export default async function* (prompt, options) { - const plan = await callLLM("plan: " + prompt); +export default async function* (input, options) { + const plan = await callLLM("plan: " + input.prompt); yield { role: "planner", content: plan, meta: { files: ["src/auth.ts"] } }; const code = await callLLM("implement: " + plan); @@ -63,9 +69,27 @@ export default async function* (prompt, options) { **Engine controls the loop**, not the bundle: - Each `yield` → engine writes to `.data.jsonl`, checks `AbortSignal`, handles pause/resume - `return` → engine writes the final result, marks thread complete -- **Fork** = replay the first N yields from persisted `.data.jsonl`, then resume iteration +- **Fork** = read historical steps from `.data.jsonl`, pass as `input.steps` to a new generator - **Zero injection** — the bundle doesn't import or receive anything from the engine +### Fork/Resume via ThreadInput + +When using the `createRoleModerator` helper, fork is **naturally handled**: + +```typescript +// The moderator receives ThreadContext with historical steps +// It sees planner already ran → routes to coder automatically +const gen = workflow( + { prompt: "fix bug #3", steps: [{ role: "planner", content: "...", meta: {} }] }, + { isDryRun: false, maxRounds: 10 } +); +// First yield will be coder's output, not planner's +``` + +No special replay logic needed — the moderator/role pattern inherently supports +resuming from any snapshot, because moderator routing is a pure function of the +accumulated steps. + This follows the **Dependency Inversion Principle**: the engine depends on the generator protocol (a language primitive), not on a framework-specific `WorkflowDefinition`. Bundles remain pure functions with no coupling to `@uncaged/workflow`. @@ -262,7 +286,7 @@ routing function. It lives in `@uncaged/workflow` as an optional utility. ```typescript function createRoleModerator( def: { roles: { [K in keyof M & string]: Role }; moderator: Moderator } -): (prompt: string, options: { isDryRun: boolean; maxRounds: number }) => AsyncGenerator; +): WorkflowFn; // returns (input: ThreadInput, options) => AsyncGenerator ``` Usage in a bundle: @@ -274,6 +298,7 @@ export default createRoleModerator({ roles: { planner, coder }, moderator(ctx) { return ctx.steps.length === 0 ? "planner" : END; }, }); +// Accepts ThreadInput — fork with pre-filled steps works automatically ``` ### Supporting Types diff --git a/packages/cli-workflow/__tests__/commands.test.ts b/packages/cli-workflow/__tests__/commands.test.ts index 366877f..6ce819d 100644 --- a/packages/cli-workflow/__tests__/commands.test.ts +++ b/packages/cli-workflow/__tests__/commands.test.ts @@ -35,21 +35,11 @@ describe("cli workflow commands", () => { bundlePath, `import fs from "node:fs"; -export default { - name: "solve-issue", - roles: { - noop: async () => { - fs.existsSync("."); - return { content: "ok", meta: { done: true } }; - }, - }, - moderator(ctx) { - if (ctx.steps.length === 0) { - return "noop"; - } - return "__end__"; - }, -}; +export default async function* () { + fs.existsSync("."); + yield { role: "noop", content: "ok", meta: { done: true } }; + return { returnCode: 0, summary: "done" }; +} `, "utf8", ); @@ -91,7 +81,7 @@ export default { const bundlePath = join(storageRoot, "bad.esm.js"); await writeFile( bundlePath, - 'import x from "./local";\nexport default async function run() { return { returnCode: 0, summary: "" }; }\n', + 'import x from "./local";\nexport default async function* run() { return { returnCode: 0, summary: "" }; }\n', "utf8", ); const r = await cmdAdd(storageRoot, "solve-issue", bundlePath); diff --git a/packages/cli-workflow/__tests__/thread-cli.test.ts b/packages/cli-workflow/__tests__/thread-cli.test.ts index 7a2255d..3fa147e 100644 --- a/packages/cli-workflow/__tests__/thread-cli.test.ts +++ b/packages/cli-workflow/__tests__/thread-cli.test.ts @@ -13,54 +13,29 @@ import { cmdThreadRemove, cmdThreadShow } from "../src/cmd-thread.js"; import { cmdThreads } from "../src/cmd-threads.js"; import { pathExists } from "../src/fs-utils.js"; -const fastBundleSource = `export default { - name: "solve-issue", - roles: { - planner: async () => ({ content: "plan", meta: { plan: "x" } }), - coder: async () => ({ content: "code", meta: { diff: "y" } }), - }, - moderator(ctx) { - if (ctx.steps.length === 0) return "planner"; - if (ctx.steps.length === 1) return "coder"; - return "__end__"; - }, -}; +const fastBundleSource = `export default async function* () { + yield { role: "planner", content: "plan", meta: { plan: "x" } }; + yield { role: "coder", content: "code", meta: { diff: "y" } }; + return { returnCode: 0, summary: "done" }; +} `; -const slowPlannerBundleSource = `export default { - name: "solve-issue", - roles: { - planner: async () => { - await new Promise((r) => setTimeout(r, 400)); - return { content: "plan", meta: { plan: "x" } }; - }, - coder: async () => ({ content: "code", meta: { diff: "y" } }), - }, - moderator(ctx) { - if (ctx.steps.length === 0) return "planner"; - if (ctx.steps.length === 1) return "coder"; - return "__end__"; - }, -}; +const slowPlannerBundleSource = `export default async function* () { + await new Promise((r) => setTimeout(r, 400)); + yield { role: "planner", content: "plan", meta: { plan: "x" } }; + yield { role: "coder", content: "code", meta: { diff: "y" } }; + return { returnCode: 0, summary: "done" }; +} `; const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url)); -const abortablePlannerBundleSource = `export default { - name: "solve-issue", - roles: { - planner: async () => { - await new Promise((r) => setTimeout(r, 600)); - return { content: "plan", meta: { plan: "x" } }; - }, - coder: async () => ({ content: "code", meta: { diff: "y" } }), - }, - moderator(ctx) { - if (ctx.steps.length === 0) return "planner"; - if (ctx.steps.length === 1) return "coder"; - return "__end__"; - }, -}; +const abortablePlannerBundleSource = `export default async function* () { + await new Promise((r) => setTimeout(r, 600)); + yield { role: "planner", content: "plan", meta: { plan: "x" } }; + yield { role: "coder", content: "code", meta: { diff: "y" } }; + return { returnCode: 0, summary: "done" }; +} `; describe("cli thread commands", () => { diff --git a/packages/cli-workflow/src/cmd-run.ts b/packages/cli-workflow/src/cmd-run.ts index 0113ddd..7e25924 100644 --- a/packages/cli-workflow/src/cmd-run.ts +++ b/packages/cli-workflow/src/cmd-run.ts @@ -43,6 +43,7 @@ export async function cmdRun( const sent = await sendWorkerTcpCommand(worker.value.port, { type: "run", threadId, + workflowName: name, prompt, options: { isDryRun, maxRounds }, }); diff --git a/packages/workflow/__tests__/bundle-validator.test.ts b/packages/workflow/__tests__/bundle-validator.test.ts index 278c411..f9b6afa 100644 --- a/packages/workflow/__tests__/bundle-validator.test.ts +++ b/packages/workflow/__tests__/bundle-validator.test.ts @@ -6,7 +6,7 @@ describe("validateWorkflowBundle", () => { test("accepts minimal valid builtin-only bundle", () => { const source = `import fs from "node:fs"; -export default async function run() { +export default async function* run() { fs.existsSync("."); return { returnCode: 0, summary: "ok" }; } @@ -18,11 +18,22 @@ export default async function run() { test("rejects wrong filename suffix", () => { const r = validateWorkflowBundle({ filePath: "/tmp/w.js", - source: "export default async function run() { return { returnCode: 0, summary: '' }; }\n", + source: "export default async function* run() { return { returnCode: 0, summary: '' }; }\n", }); expect(r.ok).toBe(false); }); + test("rejects default export that is not a callable bundle shape", () => { + const r = validateWorkflowBundle({ + filePath: "/tmp/w.esm.js", + source: 'export default { name: "x", roles: {}, moderator() { return "__end__"; } };\n', + }); + expect(r.ok).toBe(false); + if (!r.ok) { + expect(r.error).toContain("default export must be a function"); + } + }); + test("rejects missing default export", () => { const r = validateWorkflowBundle({ filePath: "/tmp/w.esm.js", @@ -38,7 +49,7 @@ export default async function run() { const r = validateWorkflowBundle({ filePath: "/tmp/w.esm.js", source: - 'import x from "some-package";\nexport default async function run() { return { returnCode: 0, summary: "" }; }\n', + 'import x from "some-package";\nexport default async function* run() { return { returnCode: 0, summary: "" }; }\n', }); expect(r.ok).toBe(false); }); @@ -47,7 +58,7 @@ export default async function run() { const r = validateWorkflowBundle({ filePath: "/tmp/w.esm.js", source: - 'export default async function run() { await import("fs"); return { returnCode: 0, summary: "" }; }\n', + 'export default async function* run() { await import("fs"); return { returnCode: 0, summary: "" }; }\n', }); expect(r.ok).toBe(false); if (!r.ok) { @@ -59,7 +70,7 @@ export default async function run() { const r = validateWorkflowBundle({ filePath: "/tmp/w.esm.js", source: - 'export default async function run() { require("fs"); return { returnCode: 0, summary: "" }; }\n', + 'export default async function* run() { require("fs"); return { returnCode: 0, summary: "" }; }\n', }); expect(r.ok).toBe(false); }); diff --git a/packages/workflow/__tests__/engine.test.ts b/packages/workflow/__tests__/engine.test.ts index b48efa0..6ec61fa 100644 --- a/packages/workflow/__tests__/engine.test.ts +++ b/packages/workflow/__tests__/engine.test.ts @@ -3,17 +3,17 @@ import { mkdir, mkdtemp, readFile, rm } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; +import { createRoleModerator } from "../src/create-role-moderator.js"; import { executeThread } from "../src/engine.js"; import { createLogger } from "../src/logger.js"; -import { END, type WorkflowDefinition } from "../src/types.js"; +import { END } from "../src/types.js"; type DemoMeta = { planner: Record; coder: Record; }; -const demoWorkflow: WorkflowDefinition = { - name: "demo-flow", +const demoWorkflow = createRoleModerator({ roles: { planner: async () => ({ content: "plan-body", @@ -33,7 +33,7 @@ const demoWorkflow: WorkflowDefinition = { } return END; }, -}; +}); describe("executeThread", () => { test("writes RFC-001 `.data.jsonl` start + role records and `.info.jsonl` logs", async () => { @@ -50,6 +50,7 @@ describe("executeThread", () => { const result = await executeThread( demoWorkflow, + "demo-flow", "Fix the login redirect bug in #3", { isDryRun: false, maxRounds: 5, signal: ac.signal }, { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath }, @@ -116,6 +117,7 @@ describe("executeThread", () => { const result = await executeThread( demoWorkflow, + "demo-flow", "hello", { isDryRun: false, maxRounds: 0, signal: ac.signal }, { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath }, diff --git a/packages/workflow/__tests__/hash.test.ts b/packages/workflow/__tests__/hash.test.ts index 0c44e52..dca18e8 100644 --- a/packages/workflow/__tests__/hash.test.ts +++ b/packages/workflow/__tests__/hash.test.ts @@ -17,7 +17,7 @@ describe("hashWorkflowBundleBytes", () => { test("stable for identical content", () => { const encoder = new TextEncoder(); const data = encoder.encode( - "export default async function run() { return { returnCode: 0, summary: '' }; }\n", + "export default async function* run() { return { returnCode: 0, summary: '' }; }\n", ); expect(hashWorkflowBundleBytes(data)).toBe(hashWorkflowBundleBytes(data)); }); diff --git a/packages/workflow/__tests__/worker.test.ts b/packages/workflow/__tests__/worker.test.ts index 5f13817..599b97c 100644 --- a/packages/workflow/__tests__/worker.test.ts +++ b/packages/workflow/__tests__/worker.test.ts @@ -7,18 +7,11 @@ import { join } from "node:path"; import { getWorkerHostScriptPath } from "../src/worker-entry-path.js"; -const bundleSource = `export default { - name: "demo-flow", - roles: { - planner: async () => ({ content: "p", meta: { plan: "x" } }), - coder: async () => ({ content: "c", meta: { diff: "y" } }), - }, - moderator(ctx) { - if (ctx.steps.length === 0) return "planner"; - if (ctx.steps.length === 1) return "coder"; - return "__end__"; - }, -}; +const bundleSource = `export default async function* () { + yield { role: "planner", content: "p", meta: { plan: "x" } }; + yield { role: "coder", content: "c", meta: { diff: "y" } }; + return { returnCode: 0, summary: "completed: moderator returned END" }; +} `; async function readReadyPort(child: import("node:child_process").ChildProcess): Promise { @@ -95,6 +88,7 @@ describe("worker process", () => { await sendJson(port, { type: "run", threadId, + workflowName: "demo-flow", prompt: "hello", options: { isDryRun: false, maxRounds: 5 }, }); diff --git a/packages/workflow/src/bundle-validator.ts b/packages/workflow/src/bundle-validator.ts index 849ac76..d3b7b76 100644 --- a/packages/workflow/src/bundle-validator.ts +++ b/packages/workflow/src/bundle-validator.ts @@ -2,6 +2,7 @@ import { isBuiltin } from "node:module"; import type { CallExpression, ExportAllDeclaration, + ExportDefaultDeclaration, ExportNamedDeclaration, ImportDeclaration, Node, @@ -74,6 +75,27 @@ function programHasDefaultExport(body: readonly Node[]): boolean { return false; } +function defaultExportDeclarationIsCallable(program: Program): boolean { + for (const stmt of program.body) { + if (stmt.type !== "ExportDefaultDeclaration") { + continue; + } + const decl = (stmt as ExportDefaultDeclaration).declaration; + if ( + decl.type === "FunctionDeclaration" || + decl.type === "FunctionExpression" || + decl.type === "ArrowFunctionExpression" + ) { + return true; + } + if (decl.type === "CallExpression") { + return true; + } + return false; + } + return false; +} + function stringLiteralModuleSpecifier(src: Node): string | null { if (src.type !== "Literal" || typeof src.value !== "string") { return null; @@ -183,6 +205,12 @@ export function validateWorkflowBundle(input: WorkflowBundleValidationInput): Re return err("workflow bundle must have a default export"); } + if (!defaultExportDeclarationIsCallable(program)) { + return err( + "workflow bundle default export must be a function (e.g. async function*) or a call expression that returns one", + ); + } + let violation: string | null = null; walkAst(ast, (node) => { if (violation !== null) { diff --git a/packages/workflow/src/create-role-moderator.ts b/packages/workflow/src/create-role-moderator.ts new file mode 100644 index 0000000..b36db96 --- /dev/null +++ b/packages/workflow/src/create-role-moderator.ts @@ -0,0 +1,83 @@ +import { + END, + type RoleMeta, + type RoleStep, + START, + type ThreadContext, + type WorkflowDefinition, + type WorkflowFn, + type WorkflowFnOptions, + type WorkflowResult, +} from "./types.js"; + +function isRoleNext( + next: (keyof M & string) | typeof END, +): next is keyof M & string { + return next !== END; +} + +/** + * Role + Moderator pattern as an optional helper: returns a {@link WorkflowFn} that runs the + * moderator loop and yields each {@link RoleOutput}. + */ +export function createRoleModerator( + def: Pick, "roles" | "moderator">, +): WorkflowFn { + return async function* roleModeratorWorkflow( + prompt: string, + options: WorkflowFnOptions, + ): AsyncGenerator { + const nowMs = Date.now(); + const start: ThreadContext["start"] = { + role: START, + content: prompt, + meta: { maxRounds: options.maxRounds, threadId: options.threadId }, + timestamp: nowMs, + }; + + let steps: RoleStep[] = []; + + while (true) { + if (steps.length >= options.maxRounds) { + return { + returnCode: 0, + summary: `completed: reached maxRounds (${options.maxRounds})`, + }; + } + + const ctx: ThreadContext = { + threadId: options.threadId, + start, + steps, + }; + + const next = def.moderator(ctx); + + if (!isRoleNext(next)) { + return { returnCode: 0, summary: "completed: moderator returned END" }; + } + + const roleFn = def.roles[next]; + if (roleFn === undefined) { + return { returnCode: 1, summary: `unknown role: ${next}` }; + } + + const result = await roleFn(ctx); + const ts = Date.now(); + const step = { + role: next, + content: result.content, + meta: result.meta, + timestamp: ts, + } as RoleStep; + + yield { + role: step.role, + content: step.content, + meta: step.meta, + }; + + steps = [...steps, step]; + } + }; +} diff --git a/packages/workflow/src/engine.ts b/packages/workflow/src/engine.ts index 7855f4d..68de1f3 100644 --- a/packages/workflow/src/engine.ts +++ b/packages/workflow/src/engine.ts @@ -2,14 +2,7 @@ import { appendFile, mkdir } from "node:fs/promises"; import { dirname } from "node:path"; import type { LogFn } from "./logger.js"; -import { - END, - type RoleMeta, - type RoleStep, - START, - type ThreadContext, - type WorkflowDefinition, -} from "./types.js"; +import type { WorkflowFn, WorkflowResult } from "./types.js"; export type ExecuteThreadIo = { threadId: string; @@ -24,41 +17,29 @@ export type ExecuteThreadOptions = { signal: AbortSignal; }; -function isRoleNext( - next: (keyof M & string) | typeof END, -): next is keyof M & string { - return next !== END; -} - async function appendDataLine(path: string, record: unknown): Promise { const line = `${JSON.stringify(record)}\n`; await appendFile(path, line, "utf8"); } /** - * Execute a workflow thread: moderator loop, role steps, RFC-001 `.data.jsonl` records, + * Execute a workflow thread: drive the bundle's AsyncGenerator, RFC-001 `.data.jsonl` records, * debug lines via `logger` to `.info.jsonl`. */ -export async function executeThread( - def: WorkflowDefinition, +export async function executeThread( + fn: WorkflowFn, + workflowName: string, prompt: string, options: ExecuteThreadOptions, io: ExecuteThreadIo, logger: LogFn, -): Promise<{ returnCode: number; summary: string }> { +): Promise { await mkdir(dirname(io.dataJsonlPath), { recursive: true }); await mkdir(dirname(io.infoJsonlPath), { recursive: true }); const nowMs = Date.now(); - const start: ThreadContext["start"] = { - role: START, - content: prompt, - meta: { maxRounds: options.maxRounds, threadId: io.threadId }, - timestamp: nowMs, - }; - const startRecord = { - name: def.name, + name: workflowName, hash: io.hash, threadId: io.threadId, parameters: { @@ -73,9 +54,23 @@ export async function executeThread( await appendDataLine(io.dataJsonlPath, startRecord); - let steps: RoleStep[] = []; + logger("T9HQ2KHM", `thread ${io.threadId} started for workflow ${workflowName}`); - logger("T9HQ2KHM", `thread ${io.threadId} started for workflow ${def.name}`); + if (options.maxRounds <= 0) { + logger("R3CW7YBQ", `thread ${io.threadId} stopped at maxRounds=${options.maxRounds}`); + return { + returnCode: 0, + summary: `completed: reached maxRounds (${options.maxRounds})`, + }; + } + + const gen = fn(prompt, { + isDryRun: options.isDryRun, + maxRounds: options.maxRounds, + threadId: io.threadId, + }); + + let written = 0; while (true) { if (options.signal.aborted) { @@ -83,7 +78,7 @@ export async function executeThread( return { returnCode: 130, summary: "thread aborted" }; } - if (steps.length >= options.maxRounds) { + if (written >= options.maxRounds) { logger("R3CW7YBQ", `thread ${io.threadId} stopped at maxRounds=${options.maxRounds}`); return { returnCode: 0, @@ -91,49 +86,24 @@ export async function executeThread( }; } - const ctx: ThreadContext = { - threadId: io.threadId, - start, - steps, - }; + const iterResult = await gen.next(); - const next = def.moderator(ctx); - - if (!isRoleNext(next)) { - logger("M5FZ2K8H", `thread ${io.threadId} moderator returned END`); - return { returnCode: 0, summary: "completed: moderator returned END" }; + if (iterResult.done) { + logger("F3HN8QKP", `thread ${io.threadId} generator finished`); + return iterResult.value; } - const roleFn = def.roles[next]; - if (roleFn === undefined) { - logger("K2P8QX9W", `thread ${io.threadId} unknown role ${next}`); - return { returnCode: 1, summary: `unknown role: ${next}` }; - } - - if (options.signal.aborted) { - logger("V8JX4NP3", `thread ${io.threadId} aborted`); - return { returnCode: 130, summary: "thread aborted" }; - } - - const result = await roleFn(ctx); - + written++; + const step = iterResult.value; const ts = Date.now(); - const step: RoleStep = { - role: next, - content: result.content, - meta: result.meta, - timestamp: ts, - } as RoleStep; - await appendDataLine(io.dataJsonlPath, { role: step.role, content: step.content, meta: step.meta, - timestamp: step.timestamp, + timestamp: ts, }); - steps = [...steps, step]; - logger("N7BW4YHQ", `thread ${io.threadId} completed role ${next}`); + logger("N7BW4YHQ", `thread ${io.threadId} wrote role ${step.role}`); if (options.signal.aborted) { logger("V8JX4NP4", `thread ${io.threadId} aborted`); diff --git a/packages/workflow/src/index.ts b/packages/workflow/src/index.ts index b9c365d..a7c33cf 100644 --- a/packages/workflow/src/index.ts +++ b/packages/workflow/src/index.ts @@ -6,6 +6,7 @@ export { encodeUint64AsCrockford, } from "./base32.js"; export { validateWorkflowBundle, type WorkflowBundleValidationInput } from "./bundle-validator.js"; +export { createRoleModerator } from "./create-role-moderator.js"; export { type ExecuteThreadIo, type ExecuteThreadOptions, @@ -40,12 +41,16 @@ export { type Moderator, type Role, type RoleMeta, + type RoleOutput, type RoleResult, type RoleStep, START, type StartStep, type ThreadContext, type WorkflowDefinition, + type WorkflowFn, + type WorkflowFnOptions, + type WorkflowResult, } from "./types.js"; export { generateUlid } from "./ulid.js"; export { getWorkerHostScriptPath } from "./worker-entry-path.js"; diff --git a/packages/workflow/src/types.ts b/packages/workflow/src/types.ts index 66aa229..37e1b5f 100644 --- a/packages/workflow/src/types.ts +++ b/packages/workflow/src/types.ts @@ -5,6 +5,32 @@ export const END = "__end__" as const; /** Maps role names → their meta types. Single generic drives all inference. */ export type RoleMeta = Record>; +/** What each generator yield produces — one role's output (engine adds `timestamp` when persisting). */ +export type RoleOutput = { + role: string; + content: string; + meta: Record; +}; + +/** What the workflow AsyncGenerator returns when done. */ +export type WorkflowResult = { + returnCode: number; + summary: string; +}; + +/** Options passed to a workflow bundle's default-export function (engine-provided). */ +export type WorkflowFnOptions = { + isDryRun: boolean; + maxRounds: number; + threadId: string; +}; + +/** Bundle contract — default export is a function returning an AsyncGenerator. */ +export type WorkflowFn = ( + prompt: string, + options: WorkflowFnOptions, +) => AsyncGenerator; + /** Typed output of a Role execution. */ export type RoleResult> = { content: string; diff --git a/packages/workflow/src/worker.ts b/packages/workflow/src/worker.ts index 23e2356..1e7f60b 100644 --- a/packages/workflow/src/worker.ts +++ b/packages/workflow/src/worker.ts @@ -5,13 +5,14 @@ import { pathToFileURL } from "node:url"; import { type ExecuteThreadIo, executeThread } from "./engine.js"; import { createLogger } from "./logger.js"; -import type { RoleMeta, WorkflowDefinition } from "./types.js"; +import type { WorkflowFn } from "./types.js"; const bootLog = createLogger({ sink: { kind: "stderr" } }); type RunCommand = { type: "run"; threadId: string; + workflowName: string; prompt: string; options: { isDryRun: boolean; maxRounds: number }; }; @@ -38,9 +39,14 @@ function parseControlPayload(payload: unknown): ControlCommand | null { } if (type === "run") { const threadId = rec.threadId; + const workflowName = rec.workflowName; const prompt = rec.prompt; const options = rec.options; - if (typeof threadId !== "string" || typeof prompt !== "string") { + if ( + typeof threadId !== "string" || + typeof workflowName !== "string" || + typeof prompt !== "string" + ) { return null; } if (options === null || typeof options !== "object") { @@ -55,6 +61,7 @@ function parseControlPayload(payload: unknown): ControlCommand | null { return { type: "run", threadId, + workflowName, prompt, options: { isDryRun, maxRounds }, }; @@ -77,21 +84,8 @@ function parseCommandLine(line: string): ControlCommand | null { return parseControlPayload(parsed); } -function isWorkflowDefinitionLike(value: unknown): value is WorkflowDefinition { - if (value === null || typeof value !== "object") { - return false; - } - const rec = value as Record; - if (typeof rec.name !== "string") { - return false; - } - if (rec.roles === null || typeof rec.roles !== "object") { - return false; - } - if (typeof rec.moderator !== "function") { - return false; - } - return true; +function isWorkflowFnLike(value: unknown): value is WorkflowFn { + return typeof value === "function"; } async function readLineFromSocket(socket: Socket): Promise { @@ -146,15 +140,15 @@ async function main(): Promise { const modUnknown: unknown = await import(pathToFileURL(bundlePath).href); const modRec = modUnknown as Record; const defaultExport = modRec.default; - if (!isWorkflowDefinitionLike(defaultExport)) { + if (!isWorkflowFnLike(defaultExport)) { bootLog( "T4BW9YJX", - "workflow bundle default export must be a WorkflowDefinition { name, roles, moderator }", + "workflow bundle default export must be a function (AsyncGenerator workflow)", ); process.exit(2); return; } - const def = defaultExport; + const workflowFn = defaultExport; const controllers = new Map(); let activeThreads = 0; @@ -231,7 +225,14 @@ async function main(): Promise { const logger = createLogger({ sink: { kind: "file", path: infoJsonlPath } }); - await executeThread(def, cmd.prompt, { ...cmd.options, signal: ac.signal }, io, logger); + await executeThread( + workflowFn, + cmd.workflowName, + cmd.prompt, + { ...cmd.options, signal: ac.signal }, + io, + logger, + ); } catch (e) { const message = e instanceof Error ? e.message : String(e); bootLog("Q3MN8YKW", `thread ${threadId} failed: ${message}`);