feat: workflowAsAgent factory

- workflowAsAgent(name) resolves via registry → bundle → child thread
- System-level depth limit (max 3, constant)
- Returns summary string, errors as string (no throw)
- Integration test with nested workflow execution
- 146 tests passing

Fixes #33
This commit is contained in:
2026-05-07 10:52:26 +00:00
parent af69e773a0
commit e95e76c145
17 changed files with 465 additions and 9 deletions
+1 -1
View File
@@ -46,7 +46,7 @@ export async function cmdRun(
threadId,
workflowName: name,
prompt,
options: { maxRounds },
options: { maxRounds, depth: 0 },
},
{ awaitResponseLine: false },
);
@@ -11,6 +11,7 @@ function makeCtx(userContent: string): ThreadContext {
meta: { maxRounds: 10 },
timestamp: 1,
},
depth: 0,
steps: [],
threadId: "01TEST000000000000000000TR",
currentRole: { name: "planner", systemPrompt: "system instructions" },
@@ -104,6 +104,7 @@ function makeCtx(
): ModeratorContext<SolveIssueMeta> {
return {
threadId: "01TEST000000000000000000TR",
depth: 0,
start: makeStart(maxRounds),
steps,
};
@@ -303,7 +304,7 @@ describe("createSolveIssueRun", () => {
const run = createSolveIssueRun({ agent: async () => "" }, stubExtract);
const gen = run(
{ prompt: "task", steps: [] },
{ threadId: "01TEST000000000000000000TR", maxRounds: 20 },
{ threadId: "01TEST000000000000000000TR", maxRounds: 20, depth: 0 },
);
const first = await gen.next();
expect(first.done).toBe(false);
@@ -361,7 +362,7 @@ describe("createSolveIssueRun", () => {
);
const gen = run(
{ prompt: "task", steps: [] },
{ threadId: "01TEST000000000000000000TR", maxRounds: 20 },
{ threadId: "01TEST000000000000000000TR", maxRounds: 20, depth: 0 },
);
await gen.next();
expect(calls).toEqual(["preparer"]);
@@ -16,6 +16,7 @@ describe("buildAgentPrompt", () => {
test("includes system prompt and full task; omits tools when there are no steps", () => {
const ctx: ThreadContext = {
start: startTask("fix the bug"),
depth: 0,
steps: [],
threadId: "01TEST000000000000000000TR",
currentRole: { name: START, systemPrompt: "You are an agent." },
@@ -30,6 +31,7 @@ describe("buildAgentPrompt", () => {
test("single step shows full content and meta, and includes tools", () => {
const ctx: ThreadContext = {
start: startTask("user task"),
depth: 0,
threadId: "01TEST000000000000000000TR",
currentRole: { name: "coder", systemPrompt: "Be helpful." },
steps: [
@@ -55,6 +57,7 @@ describe("buildAgentPrompt", () => {
test("two or more steps: previous steps are meta-only; latest step is full", () => {
const ctx: ThreadContext = {
start: startTask("first message full: task content here"),
depth: 0,
threadId: "01TEST000000000000000000TR",
currentRole: { name: "coder", systemPrompt: "System." },
steps: [
@@ -90,6 +93,7 @@ describe("buildAgentPrompt", () => {
test("middle steps show meta summary only, not full content", () => {
const ctx: ThreadContext = {
start: startTask("start"),
depth: 0,
threadId: "01TEST000000000000000000TR",
currentRole: { name: "c", systemPrompt: "S" },
steps: [
+5 -1
View File
@@ -150,6 +150,7 @@ describe("executeThread", () => {
{ prompt: "Fix the login redirect bug in #3", steps: [] },
{
maxRounds: 5,
depth: 0,
signal: ac.signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: null,
@@ -178,7 +179,8 @@ describe("executeThread", () => {
expect(params.prompt).toBe("Fix the login redirect bug in #3");
const opts = params.options as Record<string, unknown>;
expect(opts.maxRounds).toBe(5);
expect(Object.keys(opts).sort()).toEqual(["maxRounds"]);
expect(opts.depth).toBe(0);
expect(Object.keys(opts).sort()).toEqual(["depth", "maxRounds"]);
const role1 = JSON.parse(lines[1] ?? "{}") as Record<string, unknown>;
expect(role1.role).toBe("planner");
@@ -238,6 +240,7 @@ describe("executeThread", () => {
},
{
maxRounds: 5,
depth: 0,
signal: ac.signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: "01SRC1111111111111111111",
@@ -298,6 +301,7 @@ describe("executeThread", () => {
{ prompt: "hello", steps: [] },
{
maxRounds: 0,
depth: 0,
signal: ac.signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: null,
@@ -24,6 +24,7 @@ describe("fork-thread", () => {
expect(r.value.start.threadId).toBe("01AAA1111111111111111111");
expect(r.value.start.prompt).toBe("hi");
expect(r.value.start.maxRounds).toBe(5);
expect(r.value.start.depth).toBe(0);
expect(r.value.roleSteps.length).toBe(3);
expect(r.value.roleSteps[0]?.role).toBe("planner");
});
@@ -83,6 +84,24 @@ describe("fork-thread", () => {
expect(r.value.workflowName).toBe("demo");
expect(r.value.historicalSteps.length).toBe(1);
expect(r.value.historicalSteps[0]?.timestamp).toBe(101);
expect(r.value.runOptions).toEqual({ maxRounds: 5 });
expect(r.value.runOptions).toEqual({ maxRounds: 5, depth: 0 });
});
test("parseThreadDataJsonl reads explicit depth from start record", () => {
const text = `{"name":"demo","hash":"H","threadId":"01ZZZZZZZZZZZZZZZZZZZZZZ","parameters":{"prompt":"p","options":{"maxRounds":3,"depth":2}},"timestamp":1}
{"role":"planner","content":"x","meta":{},"timestamp":2}
`;
const r = parseThreadDataJsonl(text);
expect(r.ok).toBe(true);
if (!r.ok) {
return;
}
expect(r.value.start.depth).toBe(2);
const plan = buildForkPlan(text, null);
expect(plan.ok).toBe(true);
if (!plan.ok) {
return;
}
expect(plan.value.runOptions).toEqual({ maxRounds: 3, depth: 2 });
});
});
@@ -149,6 +149,7 @@ describe("RoleStep refs tracking", () => {
{ prompt: "task", steps: [] },
{
maxRounds: 5,
depth: 0,
signal: ac.signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: null,
@@ -10,6 +10,7 @@ describe("RFC-001 thread JSONL shapes", () => {
prompt: "Fix the login redirect bug in #3",
options: {
maxRounds: 5,
depth: 0,
},
},
timestamp: 1714963200000,
@@ -0,0 +1,206 @@
import { afterEach, describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, readdir, readFile, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import * as z from "zod/v4";
import { createWorkflow } from "../src/create-workflow.js";
import { executeThread } from "../src/engine.js";
import { createExtract } from "../src/extract-fn.js";
import { hashWorkflowBundleBytes } from "../src/hash.js";
import { createLogger } from "../src/logger.js";
import {
readWorkflowRegistry,
registerWorkflowVersion,
writeWorkflowRegistry,
} from "../src/registry.js";
import { END } from "../src/types.js";
import { workflowAsAgent } from "../src/workflow-as-agent.js";
const callerMetaSchema = z.object({ done: z.literal(true) });
type ParentMeta = {
caller: z.infer<typeof callerMetaSchema>;
};
function installMockChatCompletions(sequence: ReadonlyArray<Record<string, unknown>>): () => void {
const origFetch = globalThis.fetch;
let i = 0;
const mockFetch = async (
input: Parameters<typeof fetch>[0],
init?: RequestInit,
): Promise<Response> => {
const args = sequence[i] ?? sequence[sequence.length - 1];
if (args === undefined) {
throw new Error("installMockChatCompletions: empty sequence");
}
i += 1;
void input;
const body = init?.body ? (JSON.parse(String(init.body)) as Record<string, unknown>) : {};
const tools = body.tools;
const firstTool =
Array.isArray(tools) && tools.length > 0 && tools[0] !== null && typeof tools[0] === "object"
? (tools[0] as Record<string, unknown>)
: null;
const fn =
firstTool !== null ? (firstTool.function as Record<string, unknown> | undefined) : undefined;
const toolName = typeof fn?.name === "string" ? fn.name : "extract";
return new Response(
JSON.stringify({
choices: [
{
message: {
tool_calls: [
{
type: "function",
function: {
name: toolName,
arguments: JSON.stringify(args),
},
},
],
},
},
],
}),
{ status: 200, headers: { "Content-Type": "application/json" } },
);
};
globalThis.fetch = Object.assign(mockFetch, {
preconnect: origFetch.preconnect.bind(origFetch),
}) as typeof fetch;
return () => {
globalThis.fetch = origFetch;
};
}
const parentExtract = createExtract({
baseUrl: "http://127.0.0.1:9",
apiKey: "test",
model: "test",
});
const childBundleSource = `export const descriptor = {
description: "child-integration",
roles: {
agent: {
description: "agent",
schema: { type: "object", properties: {}, additionalProperties: true },
},
},
};
export async function* run(input) {
yield { role: "agent", content: "child-body", meta: {}, refs: [] };
return { returnCode: 0, summary: "child-done:" + input.prompt };
}
`;
async function installChildWorkflow(storageRoot: string): Promise<{ hash: string }> {
const bytes = new TextEncoder().encode(childBundleSource);
const hash = hashWorkflowBundleBytes(bytes);
await mkdir(join(storageRoot, "bundles"), { recursive: true });
await writeFile(join(storageRoot, "bundles", `${hash}.esm.js`), childBundleSource, "utf8");
const reg = await readWorkflowRegistry(storageRoot);
if (!reg.ok) {
throw reg.error;
}
const next = registerWorkflowVersion(reg.value, "child-wf", hash, Date.now());
const wr = await writeWorkflowRegistry(storageRoot, next);
if (!wr.ok) {
throw wr.error;
}
return { hash };
}
describe("workflowAsAgent integration", () => {
let restoreFetch: (() => void) | null = null;
afterEach(() => {
restoreFetch?.();
restoreFetch = null;
});
test("createWorkflow parent invokes nested workflow via workflowAsAgent", async () => {
restoreFetch = installMockChatCompletions([{ done: true }]);
const root = await mkdtemp(join(tmpdir(), "wf-waa-int-"));
try {
const { hash: childHash } = await installChildWorkflow(root);
const parentWorkflow = createWorkflow<ParentMeta>(
{
roles: {
caller: {
description: "delegates to child workflow",
systemPrompt: "system",
extractPrompt: "extract done flag",
schema: callerMetaSchema,
extractRefs: null,
},
},
moderator: (ctx) => (ctx.steps.length === 0 ? "caller" : END),
},
{ agent: workflowAsAgent("child-wf", { storageRoot: root }) },
parentExtract,
);
const threadId = "01KQXKW18CT8G75T53R8F4G7YG";
const parentHash = "C9NMV6V2TQT81";
const dataPath = join(root, "logs", parentHash, `${threadId}.data.jsonl`);
const infoPath = join(root, "logs", parentHash, `${threadId}.info.jsonl`);
await mkdir(join(root, "logs", parentHash), { recursive: true });
const logger = createLogger({ sink: { kind: "file", path: infoPath } });
const ac = new AbortController();
const result = await executeThread(
parentWorkflow,
"parent-wf",
{ prompt: "from-parent", steps: [] },
{
maxRounds: 5,
depth: 0,
signal: ac.signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: null,
prefilledDiskSteps: null,
},
{ threadId, hash: parentHash, dataJsonlPath: dataPath, infoJsonlPath: infoPath },
logger,
);
expect(result.returnCode).toBe(0);
const parentText = await readFile(dataPath, "utf8");
const parentLines = parentText
.trim()
.split("\n")
.filter((l) => l !== "");
expect(parentLines.length).toBe(2);
const callerLine = JSON.parse(parentLines[1] ?? "{}") as Record<string, unknown>;
expect(callerLine.role).toBe("caller");
expect(callerLine.content).toBe("child-done:from-parent");
const childDir = join(root, "logs", childHash);
const childFiles = await readdir(childDir);
const childDataName = childFiles.find((n) => n.endsWith(".data.jsonl"));
expect(childDataName).toBeDefined();
const childText = await readFile(join(childDir, childDataName ?? ""), "utf8");
const childStart = JSON.parse(
childText
.trim()
.split("\n")
.filter((l) => l !== "")[0] ?? "{}",
) as Record<string, unknown>;
expect(childStart.forkFrom).toEqual({ threadId });
const childOpts = (childStart.parameters as Record<string, unknown>).options as Record<
string,
unknown
>;
expect(childOpts.depth).toBe(1);
} finally {
await rm(root, { recursive: true, force: true });
}
});
});
@@ -0,0 +1,101 @@
import { describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { hashWorkflowBundleBytes } from "../src/hash.js";
import {
readWorkflowRegistry,
registerWorkflowVersion,
writeWorkflowRegistry,
} from "../src/registry.js";
import { type AgentContext, START } from "../src/types.js";
import { workflowAsAgent } from "../src/workflow-as-agent.js";
function makeAgentCtx(params: { depth: number; prompt: string; maxRounds: number }): AgentContext {
const ts = Date.now();
return {
threadId: "01PARENT000000000000000001AA",
depth: params.depth,
start: {
role: START,
content: params.prompt,
meta: { maxRounds: params.maxRounds },
timestamp: ts,
},
steps: [],
currentRole: {
name: "caller",
systemPrompt: "caller",
},
};
}
const childBundleSource = `export const descriptor = {
description: "child-test",
roles: {
agent: {
description: "agent",
schema: { type: "object", properties: {}, additionalProperties: true },
},
},
};
export async function* run(input) {
yield { role: "agent", content: "child-body", meta: {}, refs: [] };
return { returnCode: 0, summary: "child-done:" + input.prompt };
}
`;
async function installChildWorkflow(storageRoot: string): Promise<{ hash: string }> {
const bytes = new TextEncoder().encode(childBundleSource);
const hash = hashWorkflowBundleBytes(bytes);
await mkdir(join(storageRoot, "bundles"), { recursive: true });
await writeFile(join(storageRoot, "bundles", `${hash}.esm.js`), childBundleSource, "utf8");
const reg = await readWorkflowRegistry(storageRoot);
if (!reg.ok) {
throw reg.error;
}
const next = registerWorkflowVersion(reg.value, "child-wf", hash, Date.now());
const wr = await writeWorkflowRegistry(storageRoot, next);
if (!wr.ok) {
throw wr.error;
}
return { hash };
}
describe("workflowAsAgent", () => {
test("returns error when workflow name is not registered", async () => {
const root = await mkdtemp(join(tmpdir(), "wf-waa-missing-"));
try {
const agent = workflowAsAgent("missing-wf", { storageRoot: root });
const out = await agent(makeAgentCtx({ depth: 0, prompt: "x", maxRounds: 5 }));
expect(out).toContain("not found in registry");
expect(out).toContain("missing-wf");
} finally {
await rm(root, { recursive: true, force: true });
}
});
test("runs registered workflow and returns child summary string", async () => {
const root = await mkdtemp(join(tmpdir(), "wf-waa-ok-"));
try {
await installChildWorkflow(root);
const agent = workflowAsAgent("child-wf", { storageRoot: root });
const out = await agent(makeAgentCtx({ depth: 0, prompt: "hello-parent", maxRounds: 5 }));
expect(out).toBe("child-done:hello-parent");
} finally {
await rm(root, { recursive: true, force: true });
}
});
test("enforces depth limit (returns error string, does not throw)", async () => {
const root = await mkdtemp(join(tmpdir(), "wf-waa-depth-"));
try {
const agent = workflowAsAgent("child-wf", { storageRoot: root });
const out = await agent(makeAgentCtx({ depth: 3, prompt: "x", maxRounds: 5 }));
expect(out).toContain("depth limit");
} finally {
await rm(root, { recursive: true, force: true });
}
});
});
+1
View File
@@ -74,6 +74,7 @@ export function createWorkflow<M extends RoleMeta>(
const modCtx: ModeratorContext<M> = {
threadId: options.threadId,
depth: options.depth,
start,
steps,
};
+4
View File
@@ -23,6 +23,8 @@ export type PrefilledDiskStep = {
export type ExecuteThreadOptions = {
maxRounds: number;
/** Passed to the bundle as `WorkflowFnOptions.depth`. */
depth: number;
signal: AbortSignal;
/** Invoked after each successful yield (and outer-loop checks); used for pause/resume. */
awaitAfterEachYield: () => Promise<void>;
@@ -136,6 +138,7 @@ export async function executeThread(
prompt: input.prompt,
options: {
maxRounds: options.maxRounds,
depth: options.depth,
},
},
timestamp: nowMs,
@@ -171,6 +174,7 @@ export async function executeThread(
const bundleOptions: WorkflowFnOptions = {
threadId: io.threadId,
maxRounds: options.maxRounds,
depth: options.depth,
};
return await driveWorkflowGenerator({
+8 -2
View File
@@ -11,6 +11,7 @@ export type ParsedThreadStartRecord = {
threadId: string;
prompt: string;
maxRounds: number;
depth: number;
};
function parseRoleLine(
@@ -78,12 +79,17 @@ function parseStartRecordLine(firstLine: string): Result<ParsedThreadStartRecord
return err("start record missing parameters.options.maxRounds");
}
const depthRaw = optRec.depth;
const depth =
typeof depthRaw === "number" && Number.isFinite(depthRaw) ? Math.trunc(depthRaw) : 0;
return ok({
workflowName: name,
hash,
threadId,
prompt,
maxRounds,
depth,
});
}
@@ -196,7 +202,7 @@ export type ForkPlan = {
hash: string;
sourceThreadId: string;
prompt: string;
runOptions: { maxRounds: number };
runOptions: { maxRounds: number; depth: number };
historicalSteps: ForkHistoricalStep[];
};
@@ -221,7 +227,7 @@ export function buildForkPlan(
hash: start.hash,
sourceThreadId: start.threadId,
prompt: start.prompt,
runOptions: { maxRounds: start.maxRounds },
runOptions: { maxRounds: start.maxRounds, depth: start.depth },
historicalSteps: selected.value,
});
}
+1
View File
@@ -82,6 +82,7 @@ export {
} from "./types.js";
export { generateUlid } from "./ulid.js";
export { getWorkerHostScriptPath } from "./worker-entry-path.js";
export { type WorkflowAsAgentOptions, workflowAsAgent } from "./workflow-as-agent.js";
export {
validateWorkflowDescriptor,
type WorkflowDescriptor,
+4
View File
@@ -39,6 +39,8 @@ export type ThreadInput = {
export type WorkflowFnOptions = {
threadId: string;
maxRounds: number;
/** Nesting depth for workflow-as-agent chains; root threads use `0`. */
depth: number;
};
/** Bundle contract — named export `run` is a function returning an AsyncGenerator. */
@@ -69,6 +71,8 @@ export type RoleStep<M extends RoleMeta> = {
/** Phase 1: Moderator decides next role. */
export type ModeratorContext<M extends RoleMeta = RoleMeta> = {
threadId: string;
/** Same as `WorkflowFnOptions.depth` for the active thread. */
depth: number;
start: StartStep;
steps: RoleStep<M>[];
};
+5 -2
View File
@@ -17,7 +17,7 @@ type RunCommand = {
threadId: string;
workflowName: string;
prompt: string;
options: { maxRounds: number };
options: { maxRounds: number; depth: number };
steps: RoleOutput[];
/** Timestamps aligned with `steps` for `.data.jsonl` replay; length must match `steps` when non-null. */
stepTimestamps: number[] | null;
@@ -124,6 +124,9 @@ function parseRunControlPayload(rec: Record<string, unknown>): RunCommand | null
if (typeof maxRounds !== "number") {
return null;
}
const depthRaw = optRec.depth;
const depth =
typeof depthRaw === "number" && Number.isFinite(depthRaw) ? Math.trunc(depthRaw) : 0;
const parsedSteps = parseRunStepsPayload(rec);
if (parsedSteps === null) {
return null;
@@ -141,7 +144,7 @@ function parseRunControlPayload(rec: Record<string, unknown>): RunCommand | null
threadId,
workflowName,
prompt,
options: { maxRounds },
options: { maxRounds, depth },
steps: parsedSteps.steps,
stepTimestamps: parsedSteps.stepTimestamps,
forkSourceThreadId,
@@ -0,0 +1,99 @@
import { join } from "node:path";
import { type ExecuteThreadIo, executeThread } from "./engine.js";
import { extractBundleExports } from "./extract-bundle-exports.js";
import { createLogger } from "./logger.js";
import { getRegisteredWorkflow, readWorkflowRegistry } from "./registry.js";
import { getDefaultWorkflowStorageRoot } from "./storage-root.js";
import type { AgentContext, AgentFn, ThreadInput } from "./types.js";
import { generateUlid } from "./ulid.js";
/** Maximum `WorkflowFnOptions.depth` allowed for a child spawned via `workflowAsAgent`. */
const WORKFLOW_AS_AGENT_MAX_DEPTH = 3;
export type WorkflowAsAgentOptions = {
/** When `null`, uses `getDefaultWorkflowStorageRoot()`. */
storageRoot: string | null;
};
function resolveWorkflowAsAgentStorageRoot(options: WorkflowAsAgentOptions | null): string {
if (options !== null && options.storageRoot !== null) {
return options.storageRoot;
}
return getDefaultWorkflowStorageRoot();
}
/**
* Returns an {@link AgentFn} that runs another registered workflow in a new thread,
* using the parent thread's initial prompt (`ctx.start.content`) as the child {@link ThreadInput.prompt}.
*/
export function workflowAsAgent(
workflowName: string,
options: WorkflowAsAgentOptions | null = null,
): AgentFn {
return async (ctx: AgentContext): Promise<string> => {
const nextDepth = ctx.depth + 1;
if (nextDepth > WORKFLOW_AS_AGENT_MAX_DEPTH) {
return `ERROR: workflow-as-agent depth limit exceeded (max ${WORKFLOW_AS_AGENT_MAX_DEPTH})`;
}
const storageRoot = resolveWorkflowAsAgentStorageRoot(options);
const registryResult = await readWorkflowRegistry(storageRoot);
if (!registryResult.ok) {
return `ERROR: failed to read workflow registry: ${registryResult.error.message}`;
}
const entry = getRegisteredWorkflow(registryResult.value, workflowName);
if (entry === null) {
return `ERROR: workflow "${workflowName}" not found in registry`;
}
const bundlePath = join(storageRoot, "bundles", `${entry.hash}.esm.js`);
const bundleExportsResult = await extractBundleExports(bundlePath);
if (!bundleExportsResult.ok) {
return `ERROR: ${bundleExportsResult.error}`;
}
const input: ThreadInput = {
prompt: ctx.start.content,
steps: [],
};
const childThreadId = generateUlid(Date.now());
const dataJsonlPath = join(storageRoot, "logs", entry.hash, `${childThreadId}.data.jsonl`);
const infoJsonlPath = join(storageRoot, "logs", entry.hash, `${childThreadId}.info.jsonl`);
const io: ExecuteThreadIo = {
threadId: childThreadId,
hash: entry.hash,
dataJsonlPath,
infoJsonlPath,
};
const logger = createLogger({ sink: { kind: "file", path: infoJsonlPath } });
const signalNever = new AbortController();
try {
const result = await executeThread(
bundleExportsResult.value.run,
workflowName,
input,
{
maxRounds: ctx.start.meta.maxRounds,
depth: nextDepth,
signal: signalNever.signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: ctx.threadId,
prefilledDiskSteps: null,
},
io,
logger,
);
return result.summary;
} catch (e) {
const message = e instanceof Error ? e.message : String(e);
return `ERROR: ${message}`;
}
};
}