Compare commits

...

6 Commits

Author SHA1 Message Date
xiaoju e95e76c145 feat: workflowAsAgent factory
- workflowAsAgent(name) resolves via registry → bundle → child thread
- System-level depth limit (max 3, constant)
- Returns summary string, errors as string (no throw)
- Integration test with nested workflow execution
- 146 tests passing

Fixes #33
2026-05-07 10:52:26 +00:00
xiaoju af69e773a0 Merge pull request 'feat: CAS garbage collection' (#38) from feat/32-cas-gc into main 2026-05-07 10:48:07 +00:00
xiaoju 6488b7bbb4 feat: CAS garbage collection
- garbageCollectCas() mark-and-sweep: scan .data.jsonl refs, delete orphans
- 'uncaged-workflow gc' CLI command
- thread rm triggers GC automatically
- 141 tests passing

Fixes #32
2026-05-07 10:47:52 +00:00
xiaoju 15d39c96a7 Merge pull request 'feat: add refs tracking to RoleStep' (#35) from feat/31-refs-tracking into main 2026-05-07 10:44:44 +00:00
xiaoju 30e4e99908 feat: add refs tracking to RoleStep
- RoleOutput gains refs: string[] for CAS reference tracking
- RoleDefinition gains extractRefs: ((meta) => string[]) | null
- planner: phases.map(p => p.hash), coder: [completedPhase]
- Engine persists refs, fork preserves refs
- Backward compat: missing refs normalized to []
- 137 tests passing

Fixes #31
2026-05-07 10:44:25 +00:00
xiaoju a3c70a5041 Merge pull request 'feat: migrate CAS to global storage' (#34) from feat/30-global-cas into main 2026-05-07 10:40:41 +00:00
31 changed files with 1049 additions and 16 deletions
+1
View File
@@ -28,6 +28,7 @@ const greeter: RoleDefinition<Roles["greeter"]> = {
systemPrompt: "You greet the user briefly.",
extractPrompt: "Extract the greeting string produced for the user.",
schema: greeterMetaSchema,
extractRefs: null,
};
const extract = createExtract({
@@ -0,0 +1,144 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { spawnSync } from "node:child_process";
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { fileURLToPath } from "node:url";
import { createCasStore, garbageCollectCas, getGlobalCasDir } from "@uncaged/workflow";
import { cmdThreadRemove } from "../src/cmd-thread.js";
import { pathExists } from "../src/fs-utils.js";
const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url));
/** Minimal valid `.data.jsonl` with one role step referencing `activeHash` in `refs`. */
function makeDataJsonl(threadId: string, bundleHash: string, activeHash: string): string {
return [
JSON.stringify({
name: "demo",
hash: bundleHash,
threadId,
parameters: { prompt: "hi", options: { maxRounds: 5 } },
timestamp: 100,
}),
JSON.stringify({
role: "planner",
content: "p",
meta: {},
refs: [activeHash],
timestamp: 101,
}),
"",
].join("\n");
}
describe("gc cli and garbageCollectCas", () => {
let prevEnv: string | undefined;
let storageRoot: string;
beforeEach(async () => {
prevEnv = process.env.UNCAGED_WORKFLOW_STORAGE_ROOT;
storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-gc-"));
process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = storageRoot;
});
afterEach(async () => {
if (prevEnv === undefined) {
delete process.env.UNCAGED_WORKFLOW_STORAGE_ROOT;
} else {
process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = prevEnv;
}
await rm(storageRoot, { recursive: true, force: true });
});
test("garbageCollectCas keeps CAS entries referenced by thread refs", async () => {
const bundleHash = "C9NMV6V2TQT81";
const threadId = "01AAA1111111111111111111";
const logsDir = join(storageRoot, "logs", bundleHash);
await mkdir(logsDir, { recursive: true });
const cas = createCasStore(getGlobalCasDir(storageRoot));
const activeHash = await cas.put("active-blob");
const orphanHash = await cas.put("orphan-blob");
await writeFile(
join(logsDir, `${threadId}.data.jsonl`),
makeDataJsonl(threadId, bundleHash, activeHash),
"utf8",
);
const gc = await garbageCollectCas(storageRoot);
expect(gc.ok).toBe(true);
if (!gc.ok) {
return;
}
expect(gc.value.scannedThreads).toBe(1);
expect(gc.value.activeRefs).toBe(1);
expect(gc.value.deletedEntries).toBe(1);
expect(gc.value.deletedHashes).toEqual([orphanHash]);
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${activeHash}.txt`))).toBe(true);
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${orphanHash}.txt`))).toBe(false);
});
test("garbageCollectCas deletes orphaned CAS when no threads reference them", async () => {
const cas = createCasStore(getGlobalCasDir(storageRoot));
const orphanHash = await cas.put("lonely");
const gc = await garbageCollectCas(storageRoot);
expect(gc.ok).toBe(true);
if (!gc.ok) {
return;
}
expect(gc.value.scannedThreads).toBe(0);
expect(gc.value.activeRefs).toBe(0);
expect(gc.value.deletedEntries).toBe(1);
expect(gc.value.deletedHashes).toEqual([orphanHash]);
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${orphanHash}.txt`))).toBe(false);
});
test("cli gc prints stats", async () => {
const bundleHash = "C9NMV6V2TQT81";
const threadId = "01BBB2222222222222222222";
const logsDir = join(storageRoot, "logs", bundleHash);
await mkdir(logsDir, { recursive: true });
const cas = createCasStore(getGlobalCasDir(storageRoot));
const activeHash = await cas.put("keep-me");
await cas.put("drop-me");
await writeFile(
join(logsDir, `${threadId}.data.jsonl`),
makeDataJsonl(threadId, bundleHash, activeHash),
"utf8",
);
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const proc = spawnSync(process.execPath, [cliEntryPath, "gc"], { env, encoding: "utf8" });
expect(proc.status).toBe(0);
expect(String(proc.stdout).trim()).toBe("scanned 1 threads, 1 active refs, deleted 1 entries");
});
test("thread rm triggers gc so unreferenced CAS is removed", async () => {
const bundleHash = "C9NMV6V2TQT81";
const threadId = "01CCC3333333333333333333";
const logsDir = join(storageRoot, "logs", bundleHash);
await mkdir(logsDir, { recursive: true });
const cas = createCasStore(getGlobalCasDir(storageRoot));
const activeHash = await cas.put("pinned-by-ref");
await writeFile(
join(logsDir, `${threadId}.data.jsonl`),
makeDataJsonl(threadId, bundleHash, activeHash),
"utf8",
);
const orphanHash = await cas.put("orphan-after-rm");
const orphanPath = join(getGlobalCasDir(storageRoot), `${orphanHash}.txt`);
const removed = await cmdThreadRemove(storageRoot, threadId);
expect(removed.ok).toBe(true);
expect(await pathExists(orphanPath)).toBe(false);
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${activeHash}.txt`))).toBe(false);
});
});
@@ -177,7 +177,7 @@ describe("cli thread commands", () => {
expect(await pathExists(dataPath)).toBe(false);
});
test("thread rm does not delete global cas blobs for that thread id", async () => {
test("thread rm runs GC and removes CAS blobs not referenced by any remaining thread", async () => {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
@@ -223,7 +223,7 @@ describe("cli thread commands", () => {
expect(removed.ok).toBe(true);
const stillThere = await readTextFileIfExists(casBlob);
expect(stillThere).toBe("keep-after-thread-rm");
expect(stillThere).toBeNull();
});
test("cli entrypoint dispatches threads / ps (spawn)", () => {
+20
View File
@@ -2,6 +2,7 @@ import { printCliError, printCliLine, printCliWarn } from "./cli-output.js";
import { cmdAdd, formatAddSuccess, parseAddArgv } from "./cmd-add.js";
import { cmdCasGet, cmdCasList, cmdCasPut, cmdCasRm } from "./cmd-cas.js";
import { cmdFork, parseForkArgv } from "./cmd-fork.js";
import { cmdGc } from "./cmd-gc.js";
import { cmdHistory } from "./cmd-history.js";
import { cmdKill } from "./cmd-kill.js";
import { cmdList, formatListLines } from "./cmd-list.js";
@@ -34,6 +35,7 @@ function usage(): string {
" uncaged-workflow thread <id>",
" uncaged-workflow thread rm <id>",
" uncaged-workflow fork <thread-id> [--from-role <role>]",
" uncaged-workflow gc",
" uncaged-workflow cas get <thread-id> <hash>",
" uncaged-workflow cas put <thread-id> <content>",
" uncaged-workflow cas list <thread-id>",
@@ -266,6 +268,23 @@ async function dispatchThreadBranch(storageRoot: string, rest: string[]): Promis
return dispatchThread(storageRoot, rest);
}
async function dispatchGc(storageRoot: string, argv: string[]): Promise<number> {
if (argv.length > 0) {
printCliError(`${usage()}\n\nerror: gc takes no arguments`);
return 1;
}
const result = await cmdGc(storageRoot);
if (!result.ok) {
printCliError(result.error);
return 1;
}
const stats = result.value;
printCliLine(
`scanned ${stats.scannedThreads} threads, ${stats.activeRefs} active refs, deleted ${stats.deletedEntries} entries`,
);
return 0;
}
async function dispatchFork(storageRoot: string, argv: string[]): Promise<number> {
const parsed = parseForkArgv(argv);
if (!parsed.ok) {
@@ -387,6 +406,7 @@ const COMMAND_TABLE: Record<string, DispatchFn> = {
threads: dispatchThreads,
thread: dispatchThreadBranch,
fork: dispatchFork,
gc: dispatchGc,
cas: dispatchCas,
};
+5
View File
@@ -0,0 +1,5 @@
import { type GcResult, garbageCollectCas, type Result } from "@uncaged/workflow";
export async function cmdGc(storageRoot: string): Promise<Result<GcResult, string>> {
return garbageCollectCas(storageRoot);
}
+1 -1
View File
@@ -46,7 +46,7 @@ export async function cmdRun(
threadId,
workflowName: name,
prompt,
options: { maxRounds },
options: { maxRounds, depth: 0 },
},
{ awaitResponseLine: false },
);
+3 -1
View File
@@ -1,7 +1,7 @@
import { unlink } from "node:fs/promises";
import { dirname, join } from "node:path";
import { err, ok, type Result } from "@uncaged/workflow";
import { err, garbageCollectCas, ok, type Result } from "@uncaged/workflow";
import { readTextFileIfExists } from "./fs-utils.js";
import { resolveThreadDataPath } from "./thread-scan.js";
@@ -38,5 +38,7 @@ export async function cmdThreadRemove(
await unlink(infoPath).catch(() => {});
await unlink(runningPath).catch(() => {});
await garbageCollectCas(storageRoot);
return ok(undefined);
}
@@ -11,6 +11,7 @@ function makeCtx(userContent: string): ThreadContext {
meta: { maxRounds: 10 },
timestamp: 1,
},
depth: 0,
steps: [],
threadId: "01TEST000000000000000000TR",
currentRole: { name: "planner", systemPrompt: "system instructions" },
@@ -38,4 +38,5 @@ export const coderRole: RoleDefinition<CoderMeta> = {
extractPrompt:
"Extract completedPhase: the planner phase hash finished this round (exact hash string from the plan). If multiple phases were finished in one round, use the last finished phase hash. Extract filesChanged and a summary of the work.",
schema: coderMetaSchema,
extractRefs: (meta) => [meta.completedPhase],
};
@@ -31,4 +31,5 @@ export const committerRole: RoleDefinition<CommitterMeta> = {
extractPrompt:
"Extract the commit result: committed (with branch and SHA), recoverable failure, or unrecoverable failure. Include error details and log references if applicable.",
schema: committerMetaSchema,
extractRefs: null,
};
@@ -49,4 +49,5 @@ export const plannerRole: RoleDefinition<PlannerMeta> = {
extractPrompt:
"Extract the implementation phases from the agent's output. Each phase has a hash (the CAS content-hash returned by the cas put command) and a title (one-line summary).",
schema: plannerMetaSchema,
extractRefs: (meta) => meta.phases.map((p) => p.hash),
};
@@ -47,4 +47,5 @@ export const preparerRole: RoleDefinition<PreparerMeta> = {
extractPrompt:
"Extract repoPath (absolute path), defaultBranch, conventions (summary string or null), and toolchain (packageManager, testCommand, lintCommand, buildCommand — each string or null).",
schema: preparerMetaSchema,
extractRefs: null,
};
@@ -21,4 +21,5 @@ export const reviewerRole: RoleDefinition<ReviewerMeta> = {
extractPrompt:
"Extract the review verdict: approved or rejected. If rejected, list the blocking issues.",
schema: reviewerMetaSchema,
extractRefs: null,
};
@@ -104,6 +104,7 @@ function makeCtx(
): ModeratorContext<SolveIssueMeta> {
return {
threadId: "01TEST000000000000000000TR",
depth: 0,
start: makeStart(maxRounds),
steps,
};
@@ -124,6 +125,7 @@ function preparerStep(): RoleStep<SolveIssueMeta> {
buildCommand: "bun run build",
},
},
refs: [],
timestamp: 0,
};
}
@@ -133,6 +135,7 @@ function plannerStep(phases: PlannerMeta["phases"] = DEFAULT_PHASES): RoleStep<S
role: "planner",
content: "plan",
meta: { phases },
refs: phases.map((p) => p.hash),
timestamp: 1,
};
}
@@ -142,6 +145,7 @@ function coderStep(completedPhase = "4KNMR2PX"): RoleStep<SolveIssueMeta> {
role: "coder",
content: "code",
meta: { completedPhase, filesChanged: ["a.ts"], summary: "fixed" },
refs: [completedPhase],
timestamp: 2,
};
}
@@ -153,6 +157,7 @@ function reviewerStep(approved: boolean): RoleStep<SolveIssueMeta> {
meta: approved
? { status: "approved" as const }
: { status: "rejected" as const, issues: ["needs fix"] },
refs: [],
timestamp: 3,
};
}
@@ -162,6 +167,7 @@ function committerStep(): RoleStep<SolveIssueMeta> {
role: "committer",
content: "commit",
meta: { status: "committed", branch: "feat/issue-1", commitSha: "abc1234" },
refs: [],
timestamp: 4,
};
}
@@ -298,7 +304,7 @@ describe("createSolveIssueRun", () => {
const run = createSolveIssueRun({ agent: async () => "" }, stubExtract);
const gen = run(
{ prompt: "task", steps: [] },
{ threadId: "01TEST000000000000000000TR", maxRounds: 20 },
{ threadId: "01TEST000000000000000000TR", maxRounds: 20, depth: 0 },
);
const first = await gen.next();
expect(first.done).toBe(false);
@@ -356,7 +362,7 @@ describe("createSolveIssueRun", () => {
);
const gen = run(
{ prompt: "task", steps: [] },
{ threadId: "01TEST000000000000000000TR", maxRounds: 20 },
{ threadId: "01TEST000000000000000000TR", maxRounds: 20, depth: 0 },
);
await gen.next();
expect(calls).toEqual(["preparer"]);
@@ -16,6 +16,7 @@ describe("buildAgentPrompt", () => {
test("includes system prompt and full task; omits tools when there are no steps", () => {
const ctx: ThreadContext = {
start: startTask("fix the bug"),
depth: 0,
steps: [],
threadId: "01TEST000000000000000000TR",
currentRole: { name: START, systemPrompt: "You are an agent." },
@@ -30,6 +31,7 @@ describe("buildAgentPrompt", () => {
test("single step shows full content and meta, and includes tools", () => {
const ctx: ThreadContext = {
start: startTask("user task"),
depth: 0,
threadId: "01TEST000000000000000000TR",
currentRole: { name: "coder", systemPrompt: "Be helpful." },
steps: [
@@ -37,6 +39,7 @@ describe("buildAgentPrompt", () => {
role: "coder",
content: "only step full body",
meta: { files: ["a.ts"] },
refs: [],
timestamp: 2,
},
],
@@ -54,6 +57,7 @@ describe("buildAgentPrompt", () => {
test("two or more steps: previous steps are meta-only; latest step is full", () => {
const ctx: ThreadContext = {
start: startTask("first message full: task content here"),
depth: 0,
threadId: "01TEST000000000000000000TR",
currentRole: { name: "coder", systemPrompt: "System." },
steps: [
@@ -61,12 +65,14 @@ describe("buildAgentPrompt", () => {
role: "planner",
content: "PLANNER_SECRET_FULL_TEXT",
meta: { plan: "short" },
refs: [],
timestamp: 2,
},
{
role: "coder",
content: "last step full content",
meta: { done: true },
refs: [],
timestamp: 3,
},
],
@@ -87,6 +93,7 @@ describe("buildAgentPrompt", () => {
test("middle steps show meta summary only, not full content", () => {
const ctx: ThreadContext = {
start: startTask("start"),
depth: 0,
threadId: "01TEST000000000000000000TR",
currentRole: { name: "c", systemPrompt: "S" },
steps: [
@@ -94,18 +101,21 @@ describe("buildAgentPrompt", () => {
role: "a",
content: "HIDDEN_A",
meta: { n: 1 },
refs: [],
timestamp: 2,
},
{
role: "b",
content: "HIDDEN_B_MIDDLE",
meta: { n: 2 },
refs: [],
timestamp: 3,
},
{
role: "c",
content: "VISIBLE_LAST",
meta: { n: 3 },
refs: [],
timestamp: 4,
},
],
@@ -22,6 +22,7 @@ describe("buildDescriptor", () => {
systemPrompt: "You are an analyst.",
extractPrompt: "Extract title and count from the analysis.",
schema,
extractRefs: null,
},
},
moderator: () => END,
+12 -1
View File
@@ -89,12 +89,14 @@ const demoWorkflow = createWorkflow<DemoMeta>(
systemPrompt: "You are a planner.",
extractPrompt: "Extract plan text and affected files list.",
schema: plannerMetaSchema,
extractRefs: null,
},
coder: {
description: "Demo coder",
systemPrompt: "You are a coder.",
extractPrompt: "Extract the code diff summary.",
schema: coderMetaSchema,
extractRefs: null,
},
},
moderator: (ctx) => {
@@ -148,6 +150,7 @@ describe("executeThread", () => {
{ prompt: "Fix the login redirect bug in #3", steps: [] },
{
maxRounds: 5,
depth: 0,
signal: ac.signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: null,
@@ -176,16 +179,19 @@ describe("executeThread", () => {
expect(params.prompt).toBe("Fix the login redirect bug in #3");
const opts = params.options as Record<string, unknown>;
expect(opts.maxRounds).toBe(5);
expect(Object.keys(opts).sort()).toEqual(["maxRounds"]);
expect(opts.depth).toBe(0);
expect(Object.keys(opts).sort()).toEqual(["depth", "maxRounds"]);
const role1 = JSON.parse(lines[1] ?? "{}") as Record<string, unknown>;
expect(role1.role).toBe("planner");
expect(role1.content).toBe("plan-body");
expect(role1.meta).toEqual({ plan: "do-it", files: ["a.ts"] });
expect(role1.refs).toEqual([]);
expect(typeof role1.timestamp).toBe("number");
const role2 = JSON.parse(lines[2] ?? "{}") as Record<string, unknown>;
expect(role2.role).toBe("coder");
expect(role2.refs).toEqual([]);
const infoText = await readFile(infoPath, "utf8");
const infoLines = infoText
@@ -228,11 +234,13 @@ describe("executeThread", () => {
role: "planner",
content: "plan-body",
meta: { plan: "do-it", files: ["a.ts"] },
refs: ["CAS111AAAAAAA"],
},
],
},
{
maxRounds: 5,
depth: 0,
signal: ac.signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: "01SRC1111111111111111111",
@@ -241,6 +249,7 @@ describe("executeThread", () => {
role: "planner",
content: "plan-body",
meta: { plan: "do-it", files: ["a.ts"] },
refs: ["CAS111AAAAAAA"],
timestamp: histTs,
},
],
@@ -264,6 +273,7 @@ describe("executeThread", () => {
const role0 = JSON.parse(lines[1] ?? "{}") as Record<string, unknown>;
expect(role0.role).toBe("planner");
expect(role0.timestamp).toBe(histTs);
expect(role0.refs).toEqual(["CAS111AAAAAAA"]);
const role1 = JSON.parse(lines[2] ?? "{}") as Record<string, unknown>;
expect(role1.role).toBe("coder");
@@ -291,6 +301,7 @@ describe("executeThread", () => {
{ prompt: "hello", steps: [] },
{
maxRounds: 0,
depth: 0,
signal: ac.signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: null,
@@ -24,6 +24,7 @@ describe("fork-thread", () => {
expect(r.value.start.threadId).toBe("01AAA1111111111111111111");
expect(r.value.start.prompt).toBe("hi");
expect(r.value.start.maxRounds).toBe(5);
expect(r.value.start.depth).toBe(0);
expect(r.value.roleSteps.length).toBe(3);
expect(r.value.roleSteps[0]?.role).toBe("planner");
});
@@ -83,6 +84,24 @@ describe("fork-thread", () => {
expect(r.value.workflowName).toBe("demo");
expect(r.value.historicalSteps.length).toBe(1);
expect(r.value.historicalSteps[0]?.timestamp).toBe(101);
expect(r.value.runOptions).toEqual({ maxRounds: 5 });
expect(r.value.runOptions).toEqual({ maxRounds: 5, depth: 0 });
});
test("parseThreadDataJsonl reads explicit depth from start record", () => {
const text = `{"name":"demo","hash":"H","threadId":"01ZZZZZZZZZZZZZZZZZZZZZZ","parameters":{"prompt":"p","options":{"maxRounds":3,"depth":2}},"timestamp":1}
{"role":"planner","content":"x","meta":{},"timestamp":2}
`;
const r = parseThreadDataJsonl(text);
expect(r.ok).toBe(true);
if (!r.ok) {
return;
}
expect(r.value.start.depth).toBe(2);
const plan = buildForkPlan(text, null);
expect(plan.ok).toBe(true);
if (!plan.ok) {
return;
}
expect(plan.value.runOptions).toEqual({ maxRounds: 3, depth: 2 });
});
});
@@ -0,0 +1,192 @@
import { afterEach, describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, readFile, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import * as z from "zod/v4";
import { createWorkflow } from "../src/create-workflow.js";
import { executeThread } from "../src/engine.js";
import { createExtract } from "../src/extract-fn.js";
import { buildForkPlan, parseThreadDataJsonl } from "../src/fork-thread.js";
import { createLogger } from "../src/logger.js";
import { END } from "../src/types.js";
const phaseSchema = z.object({
hash: z.string(),
title: z.string(),
});
const plannerMetaSchema = z.object({
phases: z.array(phaseSchema),
});
type RefsDemoMeta = {
planner: z.infer<typeof plannerMetaSchema>;
};
function installMockChatCompletions(sequence: ReadonlyArray<Record<string, unknown>>): () => void {
const origFetch = globalThis.fetch;
let i = 0;
const mockFetch = async (
input: Parameters<typeof fetch>[0],
init?: RequestInit,
): Promise<Response> => {
const args = sequence[i] ?? sequence[sequence.length - 1];
if (args === undefined) {
throw new Error("installMockChatCompletions: empty sequence");
}
i += 1;
void input;
const body = init?.body ? (JSON.parse(String(init.body)) as Record<string, unknown>) : {};
const tools = body.tools;
const firstTool =
Array.isArray(tools) && tools.length > 0 && tools[0] !== null && typeof tools[0] === "object"
? (tools[0] as Record<string, unknown>)
: null;
const fn =
firstTool !== null ? (firstTool.function as Record<string, unknown> | undefined) : undefined;
const toolName = typeof fn?.name === "string" ? fn.name : "extract";
return new Response(
JSON.stringify({
choices: [
{
message: {
tool_calls: [
{
type: "function",
function: {
name: toolName,
arguments: JSON.stringify(args),
},
},
],
},
},
],
}),
{ status: 200, headers: { "Content-Type": "application/json" } },
);
};
globalThis.fetch = Object.assign(mockFetch, {
preconnect: origFetch.preconnect.bind(origFetch),
}) as typeof fetch;
return () => {
globalThis.fetch = origFetch;
};
}
const refsDemoExtract = createExtract({
baseUrl: "http://127.0.0.1:9",
apiKey: "test",
model: "test",
});
const refsDemoWorkflow = createWorkflow<RefsDemoMeta>(
{
roles: {
planner: {
description: "Planner with phase hashes",
systemPrompt: "Plan.",
extractPrompt: "Extract phases with CAS hashes.",
schema: plannerMetaSchema,
extractRefs: (meta) => meta.phases.map((p) => p.hash),
},
},
moderator: (ctx) => (ctx.steps.length === 0 ? "planner" : END),
},
{
agent: async () => "plan-output",
},
refsDemoExtract,
);
describe("RoleStep refs tracking", () => {
let restoreFetch: (() => void) | null = null;
afterEach(() => {
restoreFetch?.();
restoreFetch = null;
});
test("parseThreadDataJsonl reads refs and defaults missing refs to []", () => {
const text = `{"name":"demo","hash":"C9NMV6V2TQT81","threadId":"01AAA1111111111111111111","parameters":{"prompt":"hi","options":{"maxRounds":5}},"timestamp":100}
{"role":"planner","content":"p","meta":{},"refs":["H111AAAAAAAAA","H222AAAAAAAAA"],"timestamp":101}
{"role":"coder","content":"c","meta":{},"timestamp":102}
`;
const r = parseThreadDataJsonl(text);
expect(r.ok).toBe(true);
if (!r.ok) {
return;
}
expect(r.value.roleSteps[0]?.refs).toEqual(["H111AAAAAAAAA", "H222AAAAAAAAA"]);
expect(r.value.roleSteps[1]?.refs).toEqual([]);
});
test("executeThread persists refs from extractRefs on role yields", async () => {
restoreFetch = installMockChatCompletions([
{
phases: [
{ hash: "C9NMV6V2TQT81", title: "phase-a" },
{ hash: "C9NMV6V2TQT82", title: "phase-b" },
],
},
]);
const root = await mkdtemp(join(tmpdir(), "wf-refs-"));
try {
const threadId = "01KQXKW18CT8G75T53R8F4G7YG";
const hash = "C9NMV6V2TQT81";
const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`);
const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`);
await mkdir(join(root, "logs", hash), { recursive: true });
const logger = createLogger({ sink: { kind: "file", path: infoPath } });
const ac = new AbortController();
const result = await executeThread(
refsDemoWorkflow,
"refs-demo",
{ prompt: "task", steps: [] },
{
maxRounds: 5,
depth: 0,
signal: ac.signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: null,
prefilledDiskSteps: null,
},
{ threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath },
logger,
);
expect(result.returnCode).toBe(0);
const dataText = await readFile(dataPath, "utf8");
const lines = dataText
.trim()
.split("\n")
.filter((l) => l !== "");
expect(lines.length).toBe(2);
const role1 = JSON.parse(lines[1] ?? "{}") as Record<string, unknown>;
expect(role1.role).toBe("planner");
expect(role1.refs).toEqual(["C9NMV6V2TQT81", "C9NMV6V2TQT82"]);
} finally {
await rm(root, { recursive: true, force: true });
}
});
test("buildForkPlan carries refs on historical steps", () => {
const text = `{"name":"demo","hash":"C9NMV6V2TQT81","threadId":"01AAA1111111111111111111","parameters":{"prompt":"hi","options":{"maxRounds":5}},"timestamp":100}
{"role":"planner","content":"p","meta":{},"refs":["KEEPREFAAAAAA"],"timestamp":101}
{"role":"coder","content":"c","meta":{},"refs":["CODERHASHAAAA"],"timestamp":102}
`;
const plan = buildForkPlan(text, null);
expect(plan.ok).toBe(true);
if (!plan.ok) {
return;
}
expect(plan.value.historicalSteps.length).toBe(1);
expect(plan.value.historicalSteps[0]?.refs).toEqual(["KEEPREFAAAAAA"]);
});
});
@@ -10,6 +10,7 @@ describe("RFC-001 thread JSONL shapes", () => {
prompt: "Fix the login redirect bug in #3",
options: {
maxRounds: 5,
depth: 0,
},
},
timestamp: 1714963200000,
@@ -19,13 +20,16 @@ describe("RFC-001 thread JSONL shapes", () => {
role: "planner",
content: "Plan: modify auth middleware...",
meta: { plan: "...", files: ["src/auth.ts"] },
refs: [] as string[],
timestamp: 1714963201000,
};
expect(Object.keys(startRecord).sort()).toEqual(
["hash", "name", "parameters", "threadId", "timestamp"].sort(),
);
expect(Object.keys(roleRecord).sort()).toEqual(["content", "meta", "role", "timestamp"].sort());
expect(Object.keys(roleRecord).sort()).toEqual(
["content", "meta", "refs", "role", "timestamp"].sort(),
);
});
test("documents the `.info.jsonl` debug record keys", () => {
@@ -0,0 +1,206 @@
import { afterEach, describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, readdir, readFile, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import * as z from "zod/v4";
import { createWorkflow } from "../src/create-workflow.js";
import { executeThread } from "../src/engine.js";
import { createExtract } from "../src/extract-fn.js";
import { hashWorkflowBundleBytes } from "../src/hash.js";
import { createLogger } from "../src/logger.js";
import {
readWorkflowRegistry,
registerWorkflowVersion,
writeWorkflowRegistry,
} from "../src/registry.js";
import { END } from "../src/types.js";
import { workflowAsAgent } from "../src/workflow-as-agent.js";
const callerMetaSchema = z.object({ done: z.literal(true) });
type ParentMeta = {
caller: z.infer<typeof callerMetaSchema>;
};
function installMockChatCompletions(sequence: ReadonlyArray<Record<string, unknown>>): () => void {
const origFetch = globalThis.fetch;
let i = 0;
const mockFetch = async (
input: Parameters<typeof fetch>[0],
init?: RequestInit,
): Promise<Response> => {
const args = sequence[i] ?? sequence[sequence.length - 1];
if (args === undefined) {
throw new Error("installMockChatCompletions: empty sequence");
}
i += 1;
void input;
const body = init?.body ? (JSON.parse(String(init.body)) as Record<string, unknown>) : {};
const tools = body.tools;
const firstTool =
Array.isArray(tools) && tools.length > 0 && tools[0] !== null && typeof tools[0] === "object"
? (tools[0] as Record<string, unknown>)
: null;
const fn =
firstTool !== null ? (firstTool.function as Record<string, unknown> | undefined) : undefined;
const toolName = typeof fn?.name === "string" ? fn.name : "extract";
return new Response(
JSON.stringify({
choices: [
{
message: {
tool_calls: [
{
type: "function",
function: {
name: toolName,
arguments: JSON.stringify(args),
},
},
],
},
},
],
}),
{ status: 200, headers: { "Content-Type": "application/json" } },
);
};
globalThis.fetch = Object.assign(mockFetch, {
preconnect: origFetch.preconnect.bind(origFetch),
}) as typeof fetch;
return () => {
globalThis.fetch = origFetch;
};
}
const parentExtract = createExtract({
baseUrl: "http://127.0.0.1:9",
apiKey: "test",
model: "test",
});
const childBundleSource = `export const descriptor = {
description: "child-integration",
roles: {
agent: {
description: "agent",
schema: { type: "object", properties: {}, additionalProperties: true },
},
},
};
export async function* run(input) {
yield { role: "agent", content: "child-body", meta: {}, refs: [] };
return { returnCode: 0, summary: "child-done:" + input.prompt };
}
`;
async function installChildWorkflow(storageRoot: string): Promise<{ hash: string }> {
const bytes = new TextEncoder().encode(childBundleSource);
const hash = hashWorkflowBundleBytes(bytes);
await mkdir(join(storageRoot, "bundles"), { recursive: true });
await writeFile(join(storageRoot, "bundles", `${hash}.esm.js`), childBundleSource, "utf8");
const reg = await readWorkflowRegistry(storageRoot);
if (!reg.ok) {
throw reg.error;
}
const next = registerWorkflowVersion(reg.value, "child-wf", hash, Date.now());
const wr = await writeWorkflowRegistry(storageRoot, next);
if (!wr.ok) {
throw wr.error;
}
return { hash };
}
describe("workflowAsAgent integration", () => {
let restoreFetch: (() => void) | null = null;
afterEach(() => {
restoreFetch?.();
restoreFetch = null;
});
test("createWorkflow parent invokes nested workflow via workflowAsAgent", async () => {
restoreFetch = installMockChatCompletions([{ done: true }]);
const root = await mkdtemp(join(tmpdir(), "wf-waa-int-"));
try {
const { hash: childHash } = await installChildWorkflow(root);
const parentWorkflow = createWorkflow<ParentMeta>(
{
roles: {
caller: {
description: "delegates to child workflow",
systemPrompt: "system",
extractPrompt: "extract done flag",
schema: callerMetaSchema,
extractRefs: null,
},
},
moderator: (ctx) => (ctx.steps.length === 0 ? "caller" : END),
},
{ agent: workflowAsAgent("child-wf", { storageRoot: root }) },
parentExtract,
);
const threadId = "01KQXKW18CT8G75T53R8F4G7YG";
const parentHash = "C9NMV6V2TQT81";
const dataPath = join(root, "logs", parentHash, `${threadId}.data.jsonl`);
const infoPath = join(root, "logs", parentHash, `${threadId}.info.jsonl`);
await mkdir(join(root, "logs", parentHash), { recursive: true });
const logger = createLogger({ sink: { kind: "file", path: infoPath } });
const ac = new AbortController();
const result = await executeThread(
parentWorkflow,
"parent-wf",
{ prompt: "from-parent", steps: [] },
{
maxRounds: 5,
depth: 0,
signal: ac.signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: null,
prefilledDiskSteps: null,
},
{ threadId, hash: parentHash, dataJsonlPath: dataPath, infoJsonlPath: infoPath },
logger,
);
expect(result.returnCode).toBe(0);
const parentText = await readFile(dataPath, "utf8");
const parentLines = parentText
.trim()
.split("\n")
.filter((l) => l !== "");
expect(parentLines.length).toBe(2);
const callerLine = JSON.parse(parentLines[1] ?? "{}") as Record<string, unknown>;
expect(callerLine.role).toBe("caller");
expect(callerLine.content).toBe("child-done:from-parent");
const childDir = join(root, "logs", childHash);
const childFiles = await readdir(childDir);
const childDataName = childFiles.find((n) => n.endsWith(".data.jsonl"));
expect(childDataName).toBeDefined();
const childText = await readFile(join(childDir, childDataName ?? ""), "utf8");
const childStart = JSON.parse(
childText
.trim()
.split("\n")
.filter((l) => l !== "")[0] ?? "{}",
) as Record<string, unknown>;
expect(childStart.forkFrom).toEqual({ threadId });
const childOpts = (childStart.parameters as Record<string, unknown>).options as Record<
string,
unknown
>;
expect(childOpts.depth).toBe(1);
} finally {
await rm(root, { recursive: true, force: true });
}
});
});
@@ -0,0 +1,101 @@
import { describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { hashWorkflowBundleBytes } from "../src/hash.js";
import {
readWorkflowRegistry,
registerWorkflowVersion,
writeWorkflowRegistry,
} from "../src/registry.js";
import { type AgentContext, START } from "../src/types.js";
import { workflowAsAgent } from "../src/workflow-as-agent.js";
function makeAgentCtx(params: { depth: number; prompt: string; maxRounds: number }): AgentContext {
const ts = Date.now();
return {
threadId: "01PARENT000000000000000001AA",
depth: params.depth,
start: {
role: START,
content: params.prompt,
meta: { maxRounds: params.maxRounds },
timestamp: ts,
},
steps: [],
currentRole: {
name: "caller",
systemPrompt: "caller",
},
};
}
const childBundleSource = `export const descriptor = {
description: "child-test",
roles: {
agent: {
description: "agent",
schema: { type: "object", properties: {}, additionalProperties: true },
},
},
};
export async function* run(input) {
yield { role: "agent", content: "child-body", meta: {}, refs: [] };
return { returnCode: 0, summary: "child-done:" + input.prompt };
}
`;
async function installChildWorkflow(storageRoot: string): Promise<{ hash: string }> {
const bytes = new TextEncoder().encode(childBundleSource);
const hash = hashWorkflowBundleBytes(bytes);
await mkdir(join(storageRoot, "bundles"), { recursive: true });
await writeFile(join(storageRoot, "bundles", `${hash}.esm.js`), childBundleSource, "utf8");
const reg = await readWorkflowRegistry(storageRoot);
if (!reg.ok) {
throw reg.error;
}
const next = registerWorkflowVersion(reg.value, "child-wf", hash, Date.now());
const wr = await writeWorkflowRegistry(storageRoot, next);
if (!wr.ok) {
throw wr.error;
}
return { hash };
}
describe("workflowAsAgent", () => {
test("returns error when workflow name is not registered", async () => {
const root = await mkdtemp(join(tmpdir(), "wf-waa-missing-"));
try {
const agent = workflowAsAgent("missing-wf", { storageRoot: root });
const out = await agent(makeAgentCtx({ depth: 0, prompt: "x", maxRounds: 5 }));
expect(out).toContain("not found in registry");
expect(out).toContain("missing-wf");
} finally {
await rm(root, { recursive: true, force: true });
}
});
test("runs registered workflow and returns child summary string", async () => {
const root = await mkdtemp(join(tmpdir(), "wf-waa-ok-"));
try {
await installChildWorkflow(root);
const agent = workflowAsAgent("child-wf", { storageRoot: root });
const out = await agent(makeAgentCtx({ depth: 0, prompt: "hello-parent", maxRounds: 5 }));
expect(out).toBe("child-done:hello-parent");
} finally {
await rm(root, { recursive: true, force: true });
}
});
test("enforces depth limit (returns error string, does not throw)", async () => {
const root = await mkdtemp(join(tmpdir(), "wf-waa-depth-"));
try {
const agent = workflowAsAgent("child-wf", { storageRoot: root });
const out = await agent(makeAgentCtx({ depth: 3, prompt: "x", maxRounds: 5 }));
expect(out).toContain("depth limit");
} finally {
await rm(root, { recursive: true, force: true });
}
});
});
+21 -1
View File
@@ -5,6 +5,7 @@ import {
END,
type ExtractContext,
type ModeratorContext,
type RoleDefinition,
type RoleMeta,
type RoleOutput,
type RoleStep,
@@ -22,6 +23,17 @@ function isRoleNext<M extends RoleMeta>(
return next !== END;
}
function resolveExtractedRefs(
roleDef: RoleDefinition<Record<string, unknown>>,
meta: unknown,
): string[] {
const extractRefsFn = roleDef.extractRefs;
if (extractRefsFn === null || typeof extractRefsFn !== "function") {
return [];
}
return extractRefsFn(meta as Record<string, unknown>);
}
/**
* Binds pure role definitions + moderator to runtime agents and structured extraction.
* Assign with `export const run = createWorkflow(def, binding, extract)`.
@@ -48,6 +60,7 @@ export function createWorkflow<M extends RoleMeta>(
role: out.role,
content: out.content,
meta: out.meta,
refs: out.refs,
timestamp: baseTs + i,
})) as RoleStep<M>[];
@@ -61,6 +74,7 @@ export function createWorkflow<M extends RoleMeta>(
const modCtx: ModeratorContext<M> = {
threadId: options.threadId,
depth: options.depth,
start,
steps,
};
@@ -96,15 +110,21 @@ export function createWorkflow<M extends RoleMeta>(
extractCtx as unknown as ExtractContext,
);
const refs = resolveExtractedRefs(
roleDef as unknown as RoleDefinition<Record<string, unknown>>,
meta,
);
const ts = Date.now();
const step = {
role: next,
content: raw,
meta,
refs,
timestamp: ts,
} as RoleStep<M>;
yield { role: step.role, content: step.content, meta: step.meta };
yield { role: step.role, content: step.content, meta: step.meta, refs: step.refs };
steps = [...steps, step];
}
+8
View File
@@ -2,6 +2,7 @@ import { appendFile, mkdir } from "node:fs/promises";
import { dirname } from "node:path";
import type { LogFn } from "./logger.js";
import { normalizeRefsField } from "./refs-field.js";
import type { ThreadInput, WorkflowFn, WorkflowFnOptions, WorkflowResult } from "./types.js";
export type ExecuteThreadIo = {
@@ -16,11 +17,14 @@ export type PrefilledDiskStep = {
role: string;
content: string;
meta: Record<string, unknown>;
refs: string[];
timestamp: number;
};
export type ExecuteThreadOptions = {
maxRounds: number;
/** Passed to the bundle as `WorkflowFnOptions.depth`. */
depth: number;
signal: AbortSignal;
/** Invoked after each successful yield (and outer-loop checks); used for pause/resume. */
awaitAfterEachYield: () => Promise<void>;
@@ -79,6 +83,7 @@ async function driveWorkflowGenerator(params: {
role: step.role,
content: step.content,
meta: step.meta,
refs: normalizeRefsField(step.refs),
timestamp: ts,
});
@@ -133,6 +138,7 @@ export async function executeThread(
prompt: input.prompt,
options: {
maxRounds: options.maxRounds,
depth: options.depth,
},
},
timestamp: nowMs,
@@ -151,6 +157,7 @@ export async function executeThread(
role: row.role,
content: row.content,
meta: row.meta,
refs: normalizeRefsField(row.refs),
timestamp: row.timestamp,
});
}
@@ -167,6 +174,7 @@ export async function executeThread(
const bundleOptions: WorkflowFnOptions = {
threadId: io.threadId,
maxRounds: options.maxRounds,
depth: options.depth,
};
return await driveWorkflowGenerator({
+10 -2
View File
@@ -1,3 +1,4 @@
import { normalizeRefsField } from "./refs-field.js";
import { err, ok, type Result } from "./result.js";
import type { RoleOutput } from "./types.js";
@@ -10,6 +11,7 @@ export type ParsedThreadStartRecord = {
threadId: string;
prompt: string;
maxRounds: number;
depth: number;
};
function parseRoleLine(
@@ -36,6 +38,7 @@ function parseRoleLine(
role,
content,
meta: meta as Record<string, unknown>,
refs: normalizeRefsField(obj.refs),
timestamp,
});
}
@@ -76,12 +79,17 @@ function parseStartRecordLine(firstLine: string): Result<ParsedThreadStartRecord
return err("start record missing parameters.options.maxRounds");
}
const depthRaw = optRec.depth;
const depth =
typeof depthRaw === "number" && Number.isFinite(depthRaw) ? Math.trunc(depthRaw) : 0;
return ok({
workflowName: name,
hash,
threadId,
prompt,
maxRounds,
depth,
});
}
@@ -194,7 +202,7 @@ export type ForkPlan = {
hash: string;
sourceThreadId: string;
prompt: string;
runOptions: { maxRounds: number };
runOptions: { maxRounds: number; depth: number };
historicalSteps: ForkHistoricalStep[];
};
@@ -219,7 +227,7 @@ export function buildForkPlan(
hash: start.hash,
sourceThreadId: start.threadId,
prompt: start.prompt,
runOptions: { maxRounds: start.maxRounds },
runOptions: { maxRounds: start.maxRounds, depth: start.depth },
historicalSteps: selected.value,
});
}
+131
View File
@@ -0,0 +1,131 @@
import { readdir, readFile } from "node:fs/promises";
import { join } from "node:path";
import { type CasStore, createCasStore } from "./cas.js";
import { parseThreadDataJsonl } from "./fork-thread.js";
import { err, ok, type Result } from "./result.js";
import { getGlobalCasDir } from "./storage-root.js";
export type GcResult = {
scannedThreads: number;
activeRefs: number;
deletedEntries: number;
deletedHashes: string[];
};
async function listThreadDataJsonlPaths(storageRoot: string): Promise<Result<string[], string>> {
const logsRoot = join(storageRoot, "logs");
const paths: string[] = [];
let hashes: string[];
try {
hashes = await readdir(logsRoot);
} catch (e) {
const errObj = e as NodeJS.ErrnoException;
if (errObj.code === "ENOENT") {
return ok([]);
}
return err(`failed to read logs directory: ${String(e)}`);
}
for (const hash of hashes) {
const dir = join(logsRoot, hash);
let entries: string[];
try {
entries = await readdir(dir);
} catch {
continue;
}
for (const fileName of entries) {
if (fileName.endsWith(".data.jsonl")) {
paths.push(join(dir, fileName));
}
}
}
paths.sort();
return ok(paths);
}
async function collectActiveRefsFromDataPaths(
dataPaths: string[],
): Promise<Result<Set<string>, string>> {
const activeRefs = new Set<string>();
for (const dataPath of dataPaths) {
let text: string;
try {
text = await readFile(dataPath, "utf8");
} catch (e) {
return err(`failed to read ${dataPath}: ${String(e)}`);
}
const parsed = parseThreadDataJsonl(text);
if (!parsed.ok) {
return err(`${dataPath}: ${parsed.error}`);
}
for (const step of parsed.value.roleSteps) {
for (const ref of step.refs) {
activeRefs.add(ref);
}
}
}
return ok(activeRefs);
}
async function deleteCasNotInSet(
cas: CasStore,
activeRefs: Set<string>,
): Promise<Result<string[], string>> {
let listed: string[];
try {
listed = await cas.list();
} catch (e) {
return err(`failed to list cas entries: ${String(e)}`);
}
const deletedHashes: string[] = [];
for (const hash of listed) {
if (activeRefs.has(hash)) {
continue;
}
try {
await cas.delete(hash);
} catch (e) {
return err(`failed to delete cas ${hash}: ${String(e)}`);
}
deletedHashes.push(hash);
}
deletedHashes.sort();
return ok(deletedHashes);
}
/**
* Mark-and-sweep CAS GC: collect `refs` from all thread `.data.jsonl` files under `storageRoot`,
* then delete CAS blobs not referenced by any surviving thread data.
*/
export async function garbageCollectCas(storageRoot: string): Promise<Result<GcResult, string>> {
const pathsResult = await listThreadDataJsonlPaths(storageRoot);
if (!pathsResult.ok) {
return pathsResult;
}
const paths = pathsResult.value;
const refsResult = await collectActiveRefsFromDataPaths(paths);
if (!refsResult.ok) {
return refsResult;
}
const activeRefs = refsResult.value;
const cas = createCasStore(getGlobalCasDir(storageRoot));
const deletedResult = await deleteCasNotInSet(cas, activeRefs);
if (!deletedResult.ok) {
return deletedResult;
}
const deletedHashes = deletedResult.value;
return ok({
scannedThreads: paths.length,
activeRefs: activeRefs.size,
deletedEntries: deletedHashes.length,
deletedHashes,
});
}
+2
View File
@@ -25,6 +25,7 @@ export {
parseThreadDataJsonl,
selectForkHistoricalSteps,
} from "./fork-thread.js";
export { type GcResult, garbageCollectCas } from "./gc.js";
export { stringifyWorkflowDescriptor } from "./generate-descriptor.js";
export { hashString, hashWorkflowBundleBytes } from "./hash.js";
export {
@@ -81,6 +82,7 @@ export {
} from "./types.js";
export { generateUlid } from "./ulid.js";
export { getWorkerHostScriptPath } from "./worker-entry-path.js";
export { type WorkflowAsAgentOptions, workflowAsAgent } from "./workflow-as-agent.js";
export {
validateWorkflowDescriptor,
type WorkflowDescriptor,
+13
View File
@@ -0,0 +1,13 @@
/** Normalize `refs` from persisted JSONL or IPC payloads (missing or invalid → []). */
export function normalizeRefsField(value: unknown): string[] {
if (!Array.isArray(value)) {
return [];
}
const out: string[] = [];
for (const x of value) {
if (typeof x === "string") {
out.push(x);
}
}
return out;
}
+15 -1
View File
@@ -19,6 +19,8 @@ export type RoleOutput = {
role: string;
content: string;
meta: Record<string, unknown>;
/** CAS hashes produced or consumed by this step (for GC traceability). */
refs: string[];
};
/** What the workflow AsyncGenerator returns when done. */
@@ -37,6 +39,8 @@ export type ThreadInput = {
export type WorkflowFnOptions = {
threadId: string;
maxRounds: number;
/** Nesting depth for workflow-as-agent chains; root threads use `0`. */
depth: number;
};
/** Bundle contract — named export `run` is a function returning an AsyncGenerator. */
@@ -55,12 +59,20 @@ export type StartStep = {
/** A completed role step in the thread. */
export type RoleStep<M extends RoleMeta> = {
[K in keyof M & string]: { role: K; meta: M[K]; content: string; timestamp: number };
[K in keyof M & string]: {
role: K;
meta: M[K];
content: string;
refs: string[];
timestamp: number;
};
}[keyof M & string];
/** Phase 1: Moderator decides next role. */
export type ModeratorContext<M extends RoleMeta = RoleMeta> = {
threadId: string;
/** Same as `WorkflowFnOptions.depth` for the active thread. */
depth: number;
start: StartStep;
steps: RoleStep<M>[];
};
@@ -96,6 +108,8 @@ export type RoleDefinition<Meta extends Record<string, unknown>> = {
systemPrompt: string;
extractPrompt: string;
schema: z.ZodType<Meta>;
/** When non-null, produces CAS hashes to persist on this role's steps (see `RoleOutput.refs`). */
extractRefs: ((meta: Meta) => string[]) | null;
};
/**
+13 -3
View File
@@ -5,6 +5,7 @@ import { pathToFileURL } from "node:url";
import type { PrefilledDiskStep } from "./engine.js";
import { type ExecuteThreadIo, executeThread } from "./engine.js";
import { createLogger } from "./logger.js";
import { normalizeRefsField } from "./refs-field.js";
import { err, ok, type Result } from "./result.js";
import { createThreadPauseGate, type ThreadPauseGate } from "./thread-pause-gate.js";
import type { RoleOutput, WorkflowFn } from "./types.js";
@@ -16,7 +17,7 @@ type RunCommand = {
threadId: string;
workflowName: string;
prompt: string;
options: { maxRounds: number };
options: { maxRounds: number; depth: number };
steps: RoleOutput[];
/** Timestamps aligned with `steps` for `.data.jsonl` replay; length must match `steps` when non-null. */
stepTimestamps: number[] | null;
@@ -55,7 +56,12 @@ function parseRoleOutputRecord(obj: Record<string, unknown>): RoleOutput | null
if (meta === null || typeof meta !== "object") {
return null;
}
return { role, content, meta: meta as Record<string, unknown> };
return {
role,
content,
meta: meta as Record<string, unknown>,
refs: normalizeRefsField(obj.refs),
};
}
function parseRunStepsPayload(rec: Record<string, unknown>): {
@@ -118,6 +124,9 @@ function parseRunControlPayload(rec: Record<string, unknown>): RunCommand | null
if (typeof maxRounds !== "number") {
return null;
}
const depthRaw = optRec.depth;
const depth =
typeof depthRaw === "number" && Number.isFinite(depthRaw) ? Math.trunc(depthRaw) : 0;
const parsedSteps = parseRunStepsPayload(rec);
if (parsedSteps === null) {
return null;
@@ -135,7 +144,7 @@ function parseRunControlPayload(rec: Record<string, unknown>): RunCommand | null
threadId,
workflowName,
prompt,
options: { maxRounds },
options: { maxRounds, depth },
steps: parsedSteps.steps,
stepTimestamps: parsedSteps.stepTimestamps,
forkSourceThreadId,
@@ -382,6 +391,7 @@ async function main(): Promise<void> {
role: step.role,
content: step.content,
meta: step.meta,
refs: normalizeRefsField(step.refs),
timestamp: typeof ts === "number" && ts > 0 ? ts : baseTs + i,
};
});
@@ -0,0 +1,99 @@
import { join } from "node:path";
import { type ExecuteThreadIo, executeThread } from "./engine.js";
import { extractBundleExports } from "./extract-bundle-exports.js";
import { createLogger } from "./logger.js";
import { getRegisteredWorkflow, readWorkflowRegistry } from "./registry.js";
import { getDefaultWorkflowStorageRoot } from "./storage-root.js";
import type { AgentContext, AgentFn, ThreadInput } from "./types.js";
import { generateUlid } from "./ulid.js";
/** Maximum `WorkflowFnOptions.depth` allowed for a child spawned via `workflowAsAgent`. */
const WORKFLOW_AS_AGENT_MAX_DEPTH = 3;
export type WorkflowAsAgentOptions = {
/** When `null`, uses `getDefaultWorkflowStorageRoot()`. */
storageRoot: string | null;
};
function resolveWorkflowAsAgentStorageRoot(options: WorkflowAsAgentOptions | null): string {
if (options !== null && options.storageRoot !== null) {
return options.storageRoot;
}
return getDefaultWorkflowStorageRoot();
}
/**
* Returns an {@link AgentFn} that runs another registered workflow in a new thread,
* using the parent thread's initial prompt (`ctx.start.content`) as the child {@link ThreadInput.prompt}.
*/
export function workflowAsAgent(
workflowName: string,
options: WorkflowAsAgentOptions | null = null,
): AgentFn {
return async (ctx: AgentContext): Promise<string> => {
const nextDepth = ctx.depth + 1;
if (nextDepth > WORKFLOW_AS_AGENT_MAX_DEPTH) {
return `ERROR: workflow-as-agent depth limit exceeded (max ${WORKFLOW_AS_AGENT_MAX_DEPTH})`;
}
const storageRoot = resolveWorkflowAsAgentStorageRoot(options);
const registryResult = await readWorkflowRegistry(storageRoot);
if (!registryResult.ok) {
return `ERROR: failed to read workflow registry: ${registryResult.error.message}`;
}
const entry = getRegisteredWorkflow(registryResult.value, workflowName);
if (entry === null) {
return `ERROR: workflow "${workflowName}" not found in registry`;
}
const bundlePath = join(storageRoot, "bundles", `${entry.hash}.esm.js`);
const bundleExportsResult = await extractBundleExports(bundlePath);
if (!bundleExportsResult.ok) {
return `ERROR: ${bundleExportsResult.error}`;
}
const input: ThreadInput = {
prompt: ctx.start.content,
steps: [],
};
const childThreadId = generateUlid(Date.now());
const dataJsonlPath = join(storageRoot, "logs", entry.hash, `${childThreadId}.data.jsonl`);
const infoJsonlPath = join(storageRoot, "logs", entry.hash, `${childThreadId}.info.jsonl`);
const io: ExecuteThreadIo = {
threadId: childThreadId,
hash: entry.hash,
dataJsonlPath,
infoJsonlPath,
};
const logger = createLogger({ sink: { kind: "file", path: infoJsonlPath } });
const signalNever = new AbortController();
try {
const result = await executeThread(
bundleExportsResult.value.run,
workflowName,
input,
{
maxRounds: ctx.start.meta.maxRounds,
depth: nextDepth,
signal: signalNever.signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: ctx.threadId,
prefilledDiskSteps: null,
},
io,
logger,
);
return result.summary;
} catch (e) {
const message = e instanceof Error ? e.message : String(e);
return `ERROR: ${message}`;
}
};
}