From e95e76c145c990c0cbf349f87b1a9d74192036e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Thu, 7 May 2026 10:52:26 +0000 Subject: [PATCH] feat: workflowAsAgent factory MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - workflowAsAgent(name) resolves via registry → bundle → child thread - System-level depth limit (max 3, constant) - Returns summary string, errors as string (no throw) - Integration test with nested workflow execution - 146 tests passing Fixes #33 --- packages/cli-workflow/src/cmd-run.ts | 2 +- .../__tests__/create-llm-adapter.test.ts | 1 + .../__tests__/solve-issue-template.test.ts | 5 +- .../__tests__/build-agent-prompt.test.ts | 4 + packages/workflow/__tests__/engine.test.ts | 6 +- .../workflow/__tests__/fork-thread.test.ts | 21 +- .../workflow/__tests__/refs-tracking.test.ts | 1 + .../__tests__/thread-jsonl-format.test.ts | 1 + .../workflow-as-agent-integration.test.ts | 206 ++++++++++++++++++ .../__tests__/workflow-as-agent.test.ts | 101 +++++++++ packages/workflow/src/create-workflow.ts | 1 + packages/workflow/src/engine.ts | 4 + packages/workflow/src/fork-thread.ts | 10 +- packages/workflow/src/index.ts | 1 + packages/workflow/src/types.ts | 4 + packages/workflow/src/worker.ts | 7 +- packages/workflow/src/workflow-as-agent.ts | 99 +++++++++ 17 files changed, 465 insertions(+), 9 deletions(-) create mode 100644 packages/workflow/__tests__/workflow-as-agent-integration.test.ts create mode 100644 packages/workflow/__tests__/workflow-as-agent.test.ts create mode 100644 packages/workflow/src/workflow-as-agent.ts diff --git a/packages/cli-workflow/src/cmd-run.ts b/packages/cli-workflow/src/cmd-run.ts index ace7e13..12be277 100644 --- a/packages/cli-workflow/src/cmd-run.ts +++ b/packages/cli-workflow/src/cmd-run.ts @@ -46,7 +46,7 @@ export async function cmdRun( threadId, workflowName: name, prompt, - options: { maxRounds }, + options: { maxRounds, depth: 0 }, }, { awaitResponseLine: false }, ); diff --git a/packages/workflow-agent-llm/__tests__/create-llm-adapter.test.ts b/packages/workflow-agent-llm/__tests__/create-llm-adapter.test.ts index f80e189..fadce70 100644 --- a/packages/workflow-agent-llm/__tests__/create-llm-adapter.test.ts +++ b/packages/workflow-agent-llm/__tests__/create-llm-adapter.test.ts @@ -11,6 +11,7 @@ function makeCtx(userContent: string): ThreadContext { meta: { maxRounds: 10 }, timestamp: 1, }, + depth: 0, steps: [], threadId: "01TEST000000000000000000TR", currentRole: { name: "planner", systemPrompt: "system instructions" }, diff --git a/packages/workflow-template-solve-issue/__tests__/solve-issue-template.test.ts b/packages/workflow-template-solve-issue/__tests__/solve-issue-template.test.ts index 143942f..4e63275 100644 --- a/packages/workflow-template-solve-issue/__tests__/solve-issue-template.test.ts +++ b/packages/workflow-template-solve-issue/__tests__/solve-issue-template.test.ts @@ -104,6 +104,7 @@ function makeCtx( ): ModeratorContext { return { threadId: "01TEST000000000000000000TR", + depth: 0, start: makeStart(maxRounds), steps, }; @@ -303,7 +304,7 @@ describe("createSolveIssueRun", () => { const run = createSolveIssueRun({ agent: async () => "" }, stubExtract); const gen = run( { prompt: "task", steps: [] }, - { threadId: "01TEST000000000000000000TR", maxRounds: 20 }, + { threadId: "01TEST000000000000000000TR", maxRounds: 20, depth: 0 }, ); const first = await gen.next(); expect(first.done).toBe(false); @@ -361,7 +362,7 @@ describe("createSolveIssueRun", () => { ); const gen = run( { prompt: "task", steps: [] }, - { threadId: "01TEST000000000000000000TR", maxRounds: 20 }, + { threadId: "01TEST000000000000000000TR", maxRounds: 20, depth: 0 }, ); await gen.next(); expect(calls).toEqual(["preparer"]); diff --git a/packages/workflow-util-agent/__tests__/build-agent-prompt.test.ts b/packages/workflow-util-agent/__tests__/build-agent-prompt.test.ts index af469fd..349ec81 100644 --- a/packages/workflow-util-agent/__tests__/build-agent-prompt.test.ts +++ b/packages/workflow-util-agent/__tests__/build-agent-prompt.test.ts @@ -16,6 +16,7 @@ describe("buildAgentPrompt", () => { test("includes system prompt and full task; omits tools when there are no steps", () => { const ctx: ThreadContext = { start: startTask("fix the bug"), + depth: 0, steps: [], threadId: "01TEST000000000000000000TR", currentRole: { name: START, systemPrompt: "You are an agent." }, @@ -30,6 +31,7 @@ describe("buildAgentPrompt", () => { test("single step shows full content and meta, and includes tools", () => { const ctx: ThreadContext = { start: startTask("user task"), + depth: 0, threadId: "01TEST000000000000000000TR", currentRole: { name: "coder", systemPrompt: "Be helpful." }, steps: [ @@ -55,6 +57,7 @@ describe("buildAgentPrompt", () => { test("two or more steps: previous steps are meta-only; latest step is full", () => { const ctx: ThreadContext = { start: startTask("first message full: task content here"), + depth: 0, threadId: "01TEST000000000000000000TR", currentRole: { name: "coder", systemPrompt: "System." }, steps: [ @@ -90,6 +93,7 @@ describe("buildAgentPrompt", () => { test("middle steps show meta summary only, not full content", () => { const ctx: ThreadContext = { start: startTask("start"), + depth: 0, threadId: "01TEST000000000000000000TR", currentRole: { name: "c", systemPrompt: "S" }, steps: [ diff --git a/packages/workflow/__tests__/engine.test.ts b/packages/workflow/__tests__/engine.test.ts index 0072696..4e13c38 100644 --- a/packages/workflow/__tests__/engine.test.ts +++ b/packages/workflow/__tests__/engine.test.ts @@ -150,6 +150,7 @@ describe("executeThread", () => { { prompt: "Fix the login redirect bug in #3", steps: [] }, { maxRounds: 5, + depth: 0, signal: ac.signal, awaitAfterEachYield: async () => {}, forkSourceThreadId: null, @@ -178,7 +179,8 @@ describe("executeThread", () => { expect(params.prompt).toBe("Fix the login redirect bug in #3"); const opts = params.options as Record; expect(opts.maxRounds).toBe(5); - expect(Object.keys(opts).sort()).toEqual(["maxRounds"]); + expect(opts.depth).toBe(0); + expect(Object.keys(opts).sort()).toEqual(["depth", "maxRounds"]); const role1 = JSON.parse(lines[1] ?? "{}") as Record; expect(role1.role).toBe("planner"); @@ -238,6 +240,7 @@ describe("executeThread", () => { }, { maxRounds: 5, + depth: 0, signal: ac.signal, awaitAfterEachYield: async () => {}, forkSourceThreadId: "01SRC1111111111111111111", @@ -298,6 +301,7 @@ describe("executeThread", () => { { prompt: "hello", steps: [] }, { maxRounds: 0, + depth: 0, signal: ac.signal, awaitAfterEachYield: async () => {}, forkSourceThreadId: null, diff --git a/packages/workflow/__tests__/fork-thread.test.ts b/packages/workflow/__tests__/fork-thread.test.ts index 1d8ccb1..d1dd70b 100644 --- a/packages/workflow/__tests__/fork-thread.test.ts +++ b/packages/workflow/__tests__/fork-thread.test.ts @@ -24,6 +24,7 @@ describe("fork-thread", () => { expect(r.value.start.threadId).toBe("01AAA1111111111111111111"); expect(r.value.start.prompt).toBe("hi"); expect(r.value.start.maxRounds).toBe(5); + expect(r.value.start.depth).toBe(0); expect(r.value.roleSteps.length).toBe(3); expect(r.value.roleSteps[0]?.role).toBe("planner"); }); @@ -83,6 +84,24 @@ describe("fork-thread", () => { expect(r.value.workflowName).toBe("demo"); expect(r.value.historicalSteps.length).toBe(1); expect(r.value.historicalSteps[0]?.timestamp).toBe(101); - expect(r.value.runOptions).toEqual({ maxRounds: 5 }); + expect(r.value.runOptions).toEqual({ maxRounds: 5, depth: 0 }); + }); + + test("parseThreadDataJsonl reads explicit depth from start record", () => { + const text = `{"name":"demo","hash":"H","threadId":"01ZZZZZZZZZZZZZZZZZZZZZZ","parameters":{"prompt":"p","options":{"maxRounds":3,"depth":2}},"timestamp":1} +{"role":"planner","content":"x","meta":{},"timestamp":2} +`; + const r = parseThreadDataJsonl(text); + expect(r.ok).toBe(true); + if (!r.ok) { + return; + } + expect(r.value.start.depth).toBe(2); + const plan = buildForkPlan(text, null); + expect(plan.ok).toBe(true); + if (!plan.ok) { + return; + } + expect(plan.value.runOptions).toEqual({ maxRounds: 3, depth: 2 }); }); }); diff --git a/packages/workflow/__tests__/refs-tracking.test.ts b/packages/workflow/__tests__/refs-tracking.test.ts index 585ae9b..6b6ad6a 100644 --- a/packages/workflow/__tests__/refs-tracking.test.ts +++ b/packages/workflow/__tests__/refs-tracking.test.ts @@ -149,6 +149,7 @@ describe("RoleStep refs tracking", () => { { prompt: "task", steps: [] }, { maxRounds: 5, + depth: 0, signal: ac.signal, awaitAfterEachYield: async () => {}, forkSourceThreadId: null, diff --git a/packages/workflow/__tests__/thread-jsonl-format.test.ts b/packages/workflow/__tests__/thread-jsonl-format.test.ts index a8d0a32..4602926 100644 --- a/packages/workflow/__tests__/thread-jsonl-format.test.ts +++ b/packages/workflow/__tests__/thread-jsonl-format.test.ts @@ -10,6 +10,7 @@ describe("RFC-001 thread JSONL shapes", () => { prompt: "Fix the login redirect bug in #3", options: { maxRounds: 5, + depth: 0, }, }, timestamp: 1714963200000, diff --git a/packages/workflow/__tests__/workflow-as-agent-integration.test.ts b/packages/workflow/__tests__/workflow-as-agent-integration.test.ts new file mode 100644 index 0000000..b4386f4 --- /dev/null +++ b/packages/workflow/__tests__/workflow-as-agent-integration.test.ts @@ -0,0 +1,206 @@ +import { afterEach, describe, expect, test } from "bun:test"; +import { mkdir, mkdtemp, readdir, readFile, rm, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import * as z from "zod/v4"; + +import { createWorkflow } from "../src/create-workflow.js"; +import { executeThread } from "../src/engine.js"; +import { createExtract } from "../src/extract-fn.js"; +import { hashWorkflowBundleBytes } from "../src/hash.js"; +import { createLogger } from "../src/logger.js"; +import { + readWorkflowRegistry, + registerWorkflowVersion, + writeWorkflowRegistry, +} from "../src/registry.js"; +import { END } from "../src/types.js"; +import { workflowAsAgent } from "../src/workflow-as-agent.js"; + +const callerMetaSchema = z.object({ done: z.literal(true) }); + +type ParentMeta = { + caller: z.infer; +}; + +function installMockChatCompletions(sequence: ReadonlyArray>): () => void { + const origFetch = globalThis.fetch; + let i = 0; + const mockFetch = async ( + input: Parameters[0], + init?: RequestInit, + ): Promise => { + const args = sequence[i] ?? sequence[sequence.length - 1]; + if (args === undefined) { + throw new Error("installMockChatCompletions: empty sequence"); + } + i += 1; + void input; + const body = init?.body ? (JSON.parse(String(init.body)) as Record) : {}; + const tools = body.tools; + const firstTool = + Array.isArray(tools) && tools.length > 0 && tools[0] !== null && typeof tools[0] === "object" + ? (tools[0] as Record) + : null; + const fn = + firstTool !== null ? (firstTool.function as Record | undefined) : undefined; + const toolName = typeof fn?.name === "string" ? fn.name : "extract"; + return new Response( + JSON.stringify({ + choices: [ + { + message: { + tool_calls: [ + { + type: "function", + function: { + name: toolName, + arguments: JSON.stringify(args), + }, + }, + ], + }, + }, + ], + }), + { status: 200, headers: { "Content-Type": "application/json" } }, + ); + }; + globalThis.fetch = Object.assign(mockFetch, { + preconnect: origFetch.preconnect.bind(origFetch), + }) as typeof fetch; + return () => { + globalThis.fetch = origFetch; + }; +} + +const parentExtract = createExtract({ + baseUrl: "http://127.0.0.1:9", + apiKey: "test", + model: "test", +}); + +const childBundleSource = `export const descriptor = { + description: "child-integration", + roles: { + agent: { + description: "agent", + schema: { type: "object", properties: {}, additionalProperties: true }, + }, + }, +}; +export async function* run(input) { + yield { role: "agent", content: "child-body", meta: {}, refs: [] }; + return { returnCode: 0, summary: "child-done:" + input.prompt }; +} +`; + +async function installChildWorkflow(storageRoot: string): Promise<{ hash: string }> { + const bytes = new TextEncoder().encode(childBundleSource); + const hash = hashWorkflowBundleBytes(bytes); + await mkdir(join(storageRoot, "bundles"), { recursive: true }); + await writeFile(join(storageRoot, "bundles", `${hash}.esm.js`), childBundleSource, "utf8"); + const reg = await readWorkflowRegistry(storageRoot); + if (!reg.ok) { + throw reg.error; + } + const next = registerWorkflowVersion(reg.value, "child-wf", hash, Date.now()); + const wr = await writeWorkflowRegistry(storageRoot, next); + if (!wr.ok) { + throw wr.error; + } + return { hash }; +} + +describe("workflowAsAgent integration", () => { + let restoreFetch: (() => void) | null = null; + + afterEach(() => { + restoreFetch?.(); + restoreFetch = null; + }); + + test("createWorkflow parent invokes nested workflow via workflowAsAgent", async () => { + restoreFetch = installMockChatCompletions([{ done: true }]); + + const root = await mkdtemp(join(tmpdir(), "wf-waa-int-")); + try { + const { hash: childHash } = await installChildWorkflow(root); + + const parentWorkflow = createWorkflow( + { + roles: { + caller: { + description: "delegates to child workflow", + systemPrompt: "system", + extractPrompt: "extract done flag", + schema: callerMetaSchema, + extractRefs: null, + }, + }, + moderator: (ctx) => (ctx.steps.length === 0 ? "caller" : END), + }, + { agent: workflowAsAgent("child-wf", { storageRoot: root }) }, + parentExtract, + ); + + const threadId = "01KQXKW18CT8G75T53R8F4G7YG"; + const parentHash = "C9NMV6V2TQT81"; + const dataPath = join(root, "logs", parentHash, `${threadId}.data.jsonl`); + const infoPath = join(root, "logs", parentHash, `${threadId}.info.jsonl`); + await mkdir(join(root, "logs", parentHash), { recursive: true }); + + const logger = createLogger({ sink: { kind: "file", path: infoPath } }); + const ac = new AbortController(); + + const result = await executeThread( + parentWorkflow, + "parent-wf", + { prompt: "from-parent", steps: [] }, + { + maxRounds: 5, + depth: 0, + signal: ac.signal, + awaitAfterEachYield: async () => {}, + forkSourceThreadId: null, + prefilledDiskSteps: null, + }, + { threadId, hash: parentHash, dataJsonlPath: dataPath, infoJsonlPath: infoPath }, + logger, + ); + + expect(result.returnCode).toBe(0); + + const parentText = await readFile(dataPath, "utf8"); + const parentLines = parentText + .trim() + .split("\n") + .filter((l) => l !== ""); + expect(parentLines.length).toBe(2); + const callerLine = JSON.parse(parentLines[1] ?? "{}") as Record; + expect(callerLine.role).toBe("caller"); + expect(callerLine.content).toBe("child-done:from-parent"); + + const childDir = join(root, "logs", childHash); + const childFiles = await readdir(childDir); + const childDataName = childFiles.find((n) => n.endsWith(".data.jsonl")); + expect(childDataName).toBeDefined(); + + const childText = await readFile(join(childDir, childDataName ?? ""), "utf8"); + const childStart = JSON.parse( + childText + .trim() + .split("\n") + .filter((l) => l !== "")[0] ?? "{}", + ) as Record; + expect(childStart.forkFrom).toEqual({ threadId }); + const childOpts = (childStart.parameters as Record).options as Record< + string, + unknown + >; + expect(childOpts.depth).toBe(1); + } finally { + await rm(root, { recursive: true, force: true }); + } + }); +}); diff --git a/packages/workflow/__tests__/workflow-as-agent.test.ts b/packages/workflow/__tests__/workflow-as-agent.test.ts new file mode 100644 index 0000000..d232f6e --- /dev/null +++ b/packages/workflow/__tests__/workflow-as-agent.test.ts @@ -0,0 +1,101 @@ +import { describe, expect, test } from "bun:test"; +import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; + +import { hashWorkflowBundleBytes } from "../src/hash.js"; +import { + readWorkflowRegistry, + registerWorkflowVersion, + writeWorkflowRegistry, +} from "../src/registry.js"; +import { type AgentContext, START } from "../src/types.js"; +import { workflowAsAgent } from "../src/workflow-as-agent.js"; + +function makeAgentCtx(params: { depth: number; prompt: string; maxRounds: number }): AgentContext { + const ts = Date.now(); + return { + threadId: "01PARENT000000000000000001AA", + depth: params.depth, + start: { + role: START, + content: params.prompt, + meta: { maxRounds: params.maxRounds }, + timestamp: ts, + }, + steps: [], + currentRole: { + name: "caller", + systemPrompt: "caller", + }, + }; +} + +const childBundleSource = `export const descriptor = { + description: "child-test", + roles: { + agent: { + description: "agent", + schema: { type: "object", properties: {}, additionalProperties: true }, + }, + }, +}; +export async function* run(input) { + yield { role: "agent", content: "child-body", meta: {}, refs: [] }; + return { returnCode: 0, summary: "child-done:" + input.prompt }; +} +`; + +async function installChildWorkflow(storageRoot: string): Promise<{ hash: string }> { + const bytes = new TextEncoder().encode(childBundleSource); + const hash = hashWorkflowBundleBytes(bytes); + await mkdir(join(storageRoot, "bundles"), { recursive: true }); + await writeFile(join(storageRoot, "bundles", `${hash}.esm.js`), childBundleSource, "utf8"); + const reg = await readWorkflowRegistry(storageRoot); + if (!reg.ok) { + throw reg.error; + } + const next = registerWorkflowVersion(reg.value, "child-wf", hash, Date.now()); + const wr = await writeWorkflowRegistry(storageRoot, next); + if (!wr.ok) { + throw wr.error; + } + return { hash }; +} + +describe("workflowAsAgent", () => { + test("returns error when workflow name is not registered", async () => { + const root = await mkdtemp(join(tmpdir(), "wf-waa-missing-")); + try { + const agent = workflowAsAgent("missing-wf", { storageRoot: root }); + const out = await agent(makeAgentCtx({ depth: 0, prompt: "x", maxRounds: 5 })); + expect(out).toContain("not found in registry"); + expect(out).toContain("missing-wf"); + } finally { + await rm(root, { recursive: true, force: true }); + } + }); + + test("runs registered workflow and returns child summary string", async () => { + const root = await mkdtemp(join(tmpdir(), "wf-waa-ok-")); + try { + await installChildWorkflow(root); + const agent = workflowAsAgent("child-wf", { storageRoot: root }); + const out = await agent(makeAgentCtx({ depth: 0, prompt: "hello-parent", maxRounds: 5 })); + expect(out).toBe("child-done:hello-parent"); + } finally { + await rm(root, { recursive: true, force: true }); + } + }); + + test("enforces depth limit (returns error string, does not throw)", async () => { + const root = await mkdtemp(join(tmpdir(), "wf-waa-depth-")); + try { + const agent = workflowAsAgent("child-wf", { storageRoot: root }); + const out = await agent(makeAgentCtx({ depth: 3, prompt: "x", maxRounds: 5 })); + expect(out).toContain("depth limit"); + } finally { + await rm(root, { recursive: true, force: true }); + } + }); +}); diff --git a/packages/workflow/src/create-workflow.ts b/packages/workflow/src/create-workflow.ts index 954448d..5cfd008 100644 --- a/packages/workflow/src/create-workflow.ts +++ b/packages/workflow/src/create-workflow.ts @@ -74,6 +74,7 @@ export function createWorkflow( const modCtx: ModeratorContext = { threadId: options.threadId, + depth: options.depth, start, steps, }; diff --git a/packages/workflow/src/engine.ts b/packages/workflow/src/engine.ts index 5fc45c0..2e3abdb 100644 --- a/packages/workflow/src/engine.ts +++ b/packages/workflow/src/engine.ts @@ -23,6 +23,8 @@ export type PrefilledDiskStep = { export type ExecuteThreadOptions = { maxRounds: number; + /** Passed to the bundle as `WorkflowFnOptions.depth`. */ + depth: number; signal: AbortSignal; /** Invoked after each successful yield (and outer-loop checks); used for pause/resume. */ awaitAfterEachYield: () => Promise; @@ -136,6 +138,7 @@ export async function executeThread( prompt: input.prompt, options: { maxRounds: options.maxRounds, + depth: options.depth, }, }, timestamp: nowMs, @@ -171,6 +174,7 @@ export async function executeThread( const bundleOptions: WorkflowFnOptions = { threadId: io.threadId, maxRounds: options.maxRounds, + depth: options.depth, }; return await driveWorkflowGenerator({ diff --git a/packages/workflow/src/fork-thread.ts b/packages/workflow/src/fork-thread.ts index e3a43dd..b058a64 100644 --- a/packages/workflow/src/fork-thread.ts +++ b/packages/workflow/src/fork-thread.ts @@ -11,6 +11,7 @@ export type ParsedThreadStartRecord = { threadId: string; prompt: string; maxRounds: number; + depth: number; }; function parseRoleLine( @@ -78,12 +79,17 @@ function parseStartRecordLine(firstLine: string): Result = { /** Phase 1: Moderator decides next role. */ export type ModeratorContext = { threadId: string; + /** Same as `WorkflowFnOptions.depth` for the active thread. */ + depth: number; start: StartStep; steps: RoleStep[]; }; diff --git a/packages/workflow/src/worker.ts b/packages/workflow/src/worker.ts index cf5200e..85de2cd 100644 --- a/packages/workflow/src/worker.ts +++ b/packages/workflow/src/worker.ts @@ -17,7 +17,7 @@ type RunCommand = { threadId: string; workflowName: string; prompt: string; - options: { maxRounds: number }; + options: { maxRounds: number; depth: number }; steps: RoleOutput[]; /** Timestamps aligned with `steps` for `.data.jsonl` replay; length must match `steps` when non-null. */ stepTimestamps: number[] | null; @@ -124,6 +124,9 @@ function parseRunControlPayload(rec: Record): RunCommand | null if (typeof maxRounds !== "number") { return null; } + const depthRaw = optRec.depth; + const depth = + typeof depthRaw === "number" && Number.isFinite(depthRaw) ? Math.trunc(depthRaw) : 0; const parsedSteps = parseRunStepsPayload(rec); if (parsedSteps === null) { return null; @@ -141,7 +144,7 @@ function parseRunControlPayload(rec: Record): RunCommand | null threadId, workflowName, prompt, - options: { maxRounds }, + options: { maxRounds, depth }, steps: parsedSteps.steps, stepTimestamps: parsedSteps.stepTimestamps, forkSourceThreadId, diff --git a/packages/workflow/src/workflow-as-agent.ts b/packages/workflow/src/workflow-as-agent.ts new file mode 100644 index 0000000..bbe6ce0 --- /dev/null +++ b/packages/workflow/src/workflow-as-agent.ts @@ -0,0 +1,99 @@ +import { join } from "node:path"; + +import { type ExecuteThreadIo, executeThread } from "./engine.js"; +import { extractBundleExports } from "./extract-bundle-exports.js"; +import { createLogger } from "./logger.js"; +import { getRegisteredWorkflow, readWorkflowRegistry } from "./registry.js"; +import { getDefaultWorkflowStorageRoot } from "./storage-root.js"; +import type { AgentContext, AgentFn, ThreadInput } from "./types.js"; +import { generateUlid } from "./ulid.js"; + +/** Maximum `WorkflowFnOptions.depth` allowed for a child spawned via `workflowAsAgent`. */ +const WORKFLOW_AS_AGENT_MAX_DEPTH = 3; + +export type WorkflowAsAgentOptions = { + /** When `null`, uses `getDefaultWorkflowStorageRoot()`. */ + storageRoot: string | null; +}; + +function resolveWorkflowAsAgentStorageRoot(options: WorkflowAsAgentOptions | null): string { + if (options !== null && options.storageRoot !== null) { + return options.storageRoot; + } + return getDefaultWorkflowStorageRoot(); +} + +/** + * Returns an {@link AgentFn} that runs another registered workflow in a new thread, + * using the parent thread's initial prompt (`ctx.start.content`) as the child {@link ThreadInput.prompt}. + */ +export function workflowAsAgent( + workflowName: string, + options: WorkflowAsAgentOptions | null = null, +): AgentFn { + return async (ctx: AgentContext): Promise => { + const nextDepth = ctx.depth + 1; + if (nextDepth > WORKFLOW_AS_AGENT_MAX_DEPTH) { + return `ERROR: workflow-as-agent depth limit exceeded (max ${WORKFLOW_AS_AGENT_MAX_DEPTH})`; + } + + const storageRoot = resolveWorkflowAsAgentStorageRoot(options); + + const registryResult = await readWorkflowRegistry(storageRoot); + if (!registryResult.ok) { + return `ERROR: failed to read workflow registry: ${registryResult.error.message}`; + } + + const entry = getRegisteredWorkflow(registryResult.value, workflowName); + if (entry === null) { + return `ERROR: workflow "${workflowName}" not found in registry`; + } + + const bundlePath = join(storageRoot, "bundles", `${entry.hash}.esm.js`); + const bundleExportsResult = await extractBundleExports(bundlePath); + if (!bundleExportsResult.ok) { + return `ERROR: ${bundleExportsResult.error}`; + } + + const input: ThreadInput = { + prompt: ctx.start.content, + steps: [], + }; + + const childThreadId = generateUlid(Date.now()); + const dataJsonlPath = join(storageRoot, "logs", entry.hash, `${childThreadId}.data.jsonl`); + const infoJsonlPath = join(storageRoot, "logs", entry.hash, `${childThreadId}.info.jsonl`); + + const io: ExecuteThreadIo = { + threadId: childThreadId, + hash: entry.hash, + dataJsonlPath, + infoJsonlPath, + }; + + const logger = createLogger({ sink: { kind: "file", path: infoJsonlPath } }); + const signalNever = new AbortController(); + + try { + const result = await executeThread( + bundleExportsResult.value.run, + workflowName, + input, + { + maxRounds: ctx.start.meta.maxRounds, + depth: nextDepth, + signal: signalNever.signal, + awaitAfterEachYield: async () => {}, + forkSourceThreadId: ctx.threadId, + prefilledDiskSteps: null, + }, + io, + logger, + ); + return result.summary; + } catch (e) { + const message = e instanceof Error ? e.message : String(e); + return `ERROR: ${message}`; + } + }; +}