Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e95e76c145 | |||
| af69e773a0 | |||
| 6488b7bbb4 | |||
| 15d39c96a7 | |||
| 30e4e99908 | |||
| a3c70a5041 |
@@ -28,6 +28,7 @@ const greeter: RoleDefinition<Roles["greeter"]> = {
|
||||
systemPrompt: "You greet the user briefly.",
|
||||
extractPrompt: "Extract the greeting string produced for the user.",
|
||||
schema: greeterMetaSchema,
|
||||
extractRefs: null,
|
||||
};
|
||||
|
||||
const extract = createExtract({
|
||||
|
||||
@@ -0,0 +1,144 @@
|
||||
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
|
||||
import { spawnSync } from "node:child_process";
|
||||
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
import { createCasStore, garbageCollectCas, getGlobalCasDir } from "@uncaged/workflow";
|
||||
import { cmdThreadRemove } from "../src/cmd-thread.js";
|
||||
import { pathExists } from "../src/fs-utils.js";
|
||||
|
||||
const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url));
|
||||
|
||||
/** Minimal valid `.data.jsonl` with one role step referencing `activeHash` in `refs`. */
|
||||
function makeDataJsonl(threadId: string, bundleHash: string, activeHash: string): string {
|
||||
return [
|
||||
JSON.stringify({
|
||||
name: "demo",
|
||||
hash: bundleHash,
|
||||
threadId,
|
||||
parameters: { prompt: "hi", options: { maxRounds: 5 } },
|
||||
timestamp: 100,
|
||||
}),
|
||||
JSON.stringify({
|
||||
role: "planner",
|
||||
content: "p",
|
||||
meta: {},
|
||||
refs: [activeHash],
|
||||
timestamp: 101,
|
||||
}),
|
||||
"",
|
||||
].join("\n");
|
||||
}
|
||||
|
||||
describe("gc cli and garbageCollectCas", () => {
|
||||
let prevEnv: string | undefined;
|
||||
let storageRoot: string;
|
||||
|
||||
beforeEach(async () => {
|
||||
prevEnv = process.env.UNCAGED_WORKFLOW_STORAGE_ROOT;
|
||||
storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-gc-"));
|
||||
process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = storageRoot;
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
if (prevEnv === undefined) {
|
||||
delete process.env.UNCAGED_WORKFLOW_STORAGE_ROOT;
|
||||
} else {
|
||||
process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = prevEnv;
|
||||
}
|
||||
await rm(storageRoot, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
test("garbageCollectCas keeps CAS entries referenced by thread refs", async () => {
|
||||
const bundleHash = "C9NMV6V2TQT81";
|
||||
const threadId = "01AAA1111111111111111111";
|
||||
const logsDir = join(storageRoot, "logs", bundleHash);
|
||||
await mkdir(logsDir, { recursive: true });
|
||||
|
||||
const cas = createCasStore(getGlobalCasDir(storageRoot));
|
||||
const activeHash = await cas.put("active-blob");
|
||||
const orphanHash = await cas.put("orphan-blob");
|
||||
|
||||
await writeFile(
|
||||
join(logsDir, `${threadId}.data.jsonl`),
|
||||
makeDataJsonl(threadId, bundleHash, activeHash),
|
||||
"utf8",
|
||||
);
|
||||
|
||||
const gc = await garbageCollectCas(storageRoot);
|
||||
expect(gc.ok).toBe(true);
|
||||
if (!gc.ok) {
|
||||
return;
|
||||
}
|
||||
expect(gc.value.scannedThreads).toBe(1);
|
||||
expect(gc.value.activeRefs).toBe(1);
|
||||
expect(gc.value.deletedEntries).toBe(1);
|
||||
expect(gc.value.deletedHashes).toEqual([orphanHash]);
|
||||
|
||||
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${activeHash}.txt`))).toBe(true);
|
||||
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${orphanHash}.txt`))).toBe(false);
|
||||
});
|
||||
|
||||
test("garbageCollectCas deletes orphaned CAS when no threads reference them", async () => {
|
||||
const cas = createCasStore(getGlobalCasDir(storageRoot));
|
||||
const orphanHash = await cas.put("lonely");
|
||||
|
||||
const gc = await garbageCollectCas(storageRoot);
|
||||
expect(gc.ok).toBe(true);
|
||||
if (!gc.ok) {
|
||||
return;
|
||||
}
|
||||
expect(gc.value.scannedThreads).toBe(0);
|
||||
expect(gc.value.activeRefs).toBe(0);
|
||||
expect(gc.value.deletedEntries).toBe(1);
|
||||
expect(gc.value.deletedHashes).toEqual([orphanHash]);
|
||||
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${orphanHash}.txt`))).toBe(false);
|
||||
});
|
||||
|
||||
test("cli gc prints stats", async () => {
|
||||
const bundleHash = "C9NMV6V2TQT81";
|
||||
const threadId = "01BBB2222222222222222222";
|
||||
const logsDir = join(storageRoot, "logs", bundleHash);
|
||||
await mkdir(logsDir, { recursive: true });
|
||||
|
||||
const cas = createCasStore(getGlobalCasDir(storageRoot));
|
||||
const activeHash = await cas.put("keep-me");
|
||||
await cas.put("drop-me");
|
||||
|
||||
await writeFile(
|
||||
join(logsDir, `${threadId}.data.jsonl`),
|
||||
makeDataJsonl(threadId, bundleHash, activeHash),
|
||||
"utf8",
|
||||
);
|
||||
|
||||
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
|
||||
const proc = spawnSync(process.execPath, [cliEntryPath, "gc"], { env, encoding: "utf8" });
|
||||
expect(proc.status).toBe(0);
|
||||
expect(String(proc.stdout).trim()).toBe("scanned 1 threads, 1 active refs, deleted 1 entries");
|
||||
});
|
||||
|
||||
test("thread rm triggers gc so unreferenced CAS is removed", async () => {
|
||||
const bundleHash = "C9NMV6V2TQT81";
|
||||
const threadId = "01CCC3333333333333333333";
|
||||
const logsDir = join(storageRoot, "logs", bundleHash);
|
||||
await mkdir(logsDir, { recursive: true });
|
||||
|
||||
const cas = createCasStore(getGlobalCasDir(storageRoot));
|
||||
const activeHash = await cas.put("pinned-by-ref");
|
||||
await writeFile(
|
||||
join(logsDir, `${threadId}.data.jsonl`),
|
||||
makeDataJsonl(threadId, bundleHash, activeHash),
|
||||
"utf8",
|
||||
);
|
||||
|
||||
const orphanHash = await cas.put("orphan-after-rm");
|
||||
const orphanPath = join(getGlobalCasDir(storageRoot), `${orphanHash}.txt`);
|
||||
|
||||
const removed = await cmdThreadRemove(storageRoot, threadId);
|
||||
expect(removed.ok).toBe(true);
|
||||
|
||||
expect(await pathExists(orphanPath)).toBe(false);
|
||||
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${activeHash}.txt`))).toBe(false);
|
||||
});
|
||||
});
|
||||
@@ -177,7 +177,7 @@ describe("cli thread commands", () => {
|
||||
expect(await pathExists(dataPath)).toBe(false);
|
||||
});
|
||||
|
||||
test("thread rm does not delete global cas blobs for that thread id", async () => {
|
||||
test("thread rm runs GC and removes CAS blobs not referenced by any remaining thread", async () => {
|
||||
const bundleDir = join(storageRoot, "src");
|
||||
await mkdir(bundleDir, { recursive: true });
|
||||
const bundlePath = join(bundleDir, "demo.esm.js");
|
||||
@@ -223,7 +223,7 @@ describe("cli thread commands", () => {
|
||||
expect(removed.ok).toBe(true);
|
||||
|
||||
const stillThere = await readTextFileIfExists(casBlob);
|
||||
expect(stillThere).toBe("keep-after-thread-rm");
|
||||
expect(stillThere).toBeNull();
|
||||
});
|
||||
|
||||
test("cli entrypoint dispatches threads / ps (spawn)", () => {
|
||||
|
||||
@@ -2,6 +2,7 @@ import { printCliError, printCliLine, printCliWarn } from "./cli-output.js";
|
||||
import { cmdAdd, formatAddSuccess, parseAddArgv } from "./cmd-add.js";
|
||||
import { cmdCasGet, cmdCasList, cmdCasPut, cmdCasRm } from "./cmd-cas.js";
|
||||
import { cmdFork, parseForkArgv } from "./cmd-fork.js";
|
||||
import { cmdGc } from "./cmd-gc.js";
|
||||
import { cmdHistory } from "./cmd-history.js";
|
||||
import { cmdKill } from "./cmd-kill.js";
|
||||
import { cmdList, formatListLines } from "./cmd-list.js";
|
||||
@@ -34,6 +35,7 @@ function usage(): string {
|
||||
" uncaged-workflow thread <id>",
|
||||
" uncaged-workflow thread rm <id>",
|
||||
" uncaged-workflow fork <thread-id> [--from-role <role>]",
|
||||
" uncaged-workflow gc",
|
||||
" uncaged-workflow cas get <thread-id> <hash>",
|
||||
" uncaged-workflow cas put <thread-id> <content>",
|
||||
" uncaged-workflow cas list <thread-id>",
|
||||
@@ -266,6 +268,23 @@ async function dispatchThreadBranch(storageRoot: string, rest: string[]): Promis
|
||||
return dispatchThread(storageRoot, rest);
|
||||
}
|
||||
|
||||
async function dispatchGc(storageRoot: string, argv: string[]): Promise<number> {
|
||||
if (argv.length > 0) {
|
||||
printCliError(`${usage()}\n\nerror: gc takes no arguments`);
|
||||
return 1;
|
||||
}
|
||||
const result = await cmdGc(storageRoot);
|
||||
if (!result.ok) {
|
||||
printCliError(result.error);
|
||||
return 1;
|
||||
}
|
||||
const stats = result.value;
|
||||
printCliLine(
|
||||
`scanned ${stats.scannedThreads} threads, ${stats.activeRefs} active refs, deleted ${stats.deletedEntries} entries`,
|
||||
);
|
||||
return 0;
|
||||
}
|
||||
|
||||
async function dispatchFork(storageRoot: string, argv: string[]): Promise<number> {
|
||||
const parsed = parseForkArgv(argv);
|
||||
if (!parsed.ok) {
|
||||
@@ -387,6 +406,7 @@ const COMMAND_TABLE: Record<string, DispatchFn> = {
|
||||
threads: dispatchThreads,
|
||||
thread: dispatchThreadBranch,
|
||||
fork: dispatchFork,
|
||||
gc: dispatchGc,
|
||||
cas: dispatchCas,
|
||||
};
|
||||
|
||||
|
||||
@@ -0,0 +1,5 @@
|
||||
import { type GcResult, garbageCollectCas, type Result } from "@uncaged/workflow";
|
||||
|
||||
export async function cmdGc(storageRoot: string): Promise<Result<GcResult, string>> {
|
||||
return garbageCollectCas(storageRoot);
|
||||
}
|
||||
@@ -46,7 +46,7 @@ export async function cmdRun(
|
||||
threadId,
|
||||
workflowName: name,
|
||||
prompt,
|
||||
options: { maxRounds },
|
||||
options: { maxRounds, depth: 0 },
|
||||
},
|
||||
{ awaitResponseLine: false },
|
||||
);
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { unlink } from "node:fs/promises";
|
||||
import { dirname, join } from "node:path";
|
||||
|
||||
import { err, ok, type Result } from "@uncaged/workflow";
|
||||
import { err, garbageCollectCas, ok, type Result } from "@uncaged/workflow";
|
||||
|
||||
import { readTextFileIfExists } from "./fs-utils.js";
|
||||
import { resolveThreadDataPath } from "./thread-scan.js";
|
||||
@@ -38,5 +38,7 @@ export async function cmdThreadRemove(
|
||||
await unlink(infoPath).catch(() => {});
|
||||
await unlink(runningPath).catch(() => {});
|
||||
|
||||
await garbageCollectCas(storageRoot);
|
||||
|
||||
return ok(undefined);
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ function makeCtx(userContent: string): ThreadContext {
|
||||
meta: { maxRounds: 10 },
|
||||
timestamp: 1,
|
||||
},
|
||||
depth: 0,
|
||||
steps: [],
|
||||
threadId: "01TEST000000000000000000TR",
|
||||
currentRole: { name: "planner", systemPrompt: "system instructions" },
|
||||
|
||||
@@ -38,4 +38,5 @@ export const coderRole: RoleDefinition<CoderMeta> = {
|
||||
extractPrompt:
|
||||
"Extract completedPhase: the planner phase hash finished this round (exact hash string from the plan). If multiple phases were finished in one round, use the last finished phase hash. Extract filesChanged and a summary of the work.",
|
||||
schema: coderMetaSchema,
|
||||
extractRefs: (meta) => [meta.completedPhase],
|
||||
};
|
||||
|
||||
@@ -31,4 +31,5 @@ export const committerRole: RoleDefinition<CommitterMeta> = {
|
||||
extractPrompt:
|
||||
"Extract the commit result: committed (with branch and SHA), recoverable failure, or unrecoverable failure. Include error details and log references if applicable.",
|
||||
schema: committerMetaSchema,
|
||||
extractRefs: null,
|
||||
};
|
||||
|
||||
@@ -49,4 +49,5 @@ export const plannerRole: RoleDefinition<PlannerMeta> = {
|
||||
extractPrompt:
|
||||
"Extract the implementation phases from the agent's output. Each phase has a hash (the CAS content-hash returned by the cas put command) and a title (one-line summary).",
|
||||
schema: plannerMetaSchema,
|
||||
extractRefs: (meta) => meta.phases.map((p) => p.hash),
|
||||
};
|
||||
|
||||
@@ -47,4 +47,5 @@ export const preparerRole: RoleDefinition<PreparerMeta> = {
|
||||
extractPrompt:
|
||||
"Extract repoPath (absolute path), defaultBranch, conventions (summary string or null), and toolchain (packageManager, testCommand, lintCommand, buildCommand — each string or null).",
|
||||
schema: preparerMetaSchema,
|
||||
extractRefs: null,
|
||||
};
|
||||
|
||||
@@ -21,4 +21,5 @@ export const reviewerRole: RoleDefinition<ReviewerMeta> = {
|
||||
extractPrompt:
|
||||
"Extract the review verdict: approved or rejected. If rejected, list the blocking issues.",
|
||||
schema: reviewerMetaSchema,
|
||||
extractRefs: null,
|
||||
};
|
||||
|
||||
@@ -104,6 +104,7 @@ function makeCtx(
|
||||
): ModeratorContext<SolveIssueMeta> {
|
||||
return {
|
||||
threadId: "01TEST000000000000000000TR",
|
||||
depth: 0,
|
||||
start: makeStart(maxRounds),
|
||||
steps,
|
||||
};
|
||||
@@ -124,6 +125,7 @@ function preparerStep(): RoleStep<SolveIssueMeta> {
|
||||
buildCommand: "bun run build",
|
||||
},
|
||||
},
|
||||
refs: [],
|
||||
timestamp: 0,
|
||||
};
|
||||
}
|
||||
@@ -133,6 +135,7 @@ function plannerStep(phases: PlannerMeta["phases"] = DEFAULT_PHASES): RoleStep<S
|
||||
role: "planner",
|
||||
content: "plan",
|
||||
meta: { phases },
|
||||
refs: phases.map((p) => p.hash),
|
||||
timestamp: 1,
|
||||
};
|
||||
}
|
||||
@@ -142,6 +145,7 @@ function coderStep(completedPhase = "4KNMR2PX"): RoleStep<SolveIssueMeta> {
|
||||
role: "coder",
|
||||
content: "code",
|
||||
meta: { completedPhase, filesChanged: ["a.ts"], summary: "fixed" },
|
||||
refs: [completedPhase],
|
||||
timestamp: 2,
|
||||
};
|
||||
}
|
||||
@@ -153,6 +157,7 @@ function reviewerStep(approved: boolean): RoleStep<SolveIssueMeta> {
|
||||
meta: approved
|
||||
? { status: "approved" as const }
|
||||
: { status: "rejected" as const, issues: ["needs fix"] },
|
||||
refs: [],
|
||||
timestamp: 3,
|
||||
};
|
||||
}
|
||||
@@ -162,6 +167,7 @@ function committerStep(): RoleStep<SolveIssueMeta> {
|
||||
role: "committer",
|
||||
content: "commit",
|
||||
meta: { status: "committed", branch: "feat/issue-1", commitSha: "abc1234" },
|
||||
refs: [],
|
||||
timestamp: 4,
|
||||
};
|
||||
}
|
||||
@@ -298,7 +304,7 @@ describe("createSolveIssueRun", () => {
|
||||
const run = createSolveIssueRun({ agent: async () => "" }, stubExtract);
|
||||
const gen = run(
|
||||
{ prompt: "task", steps: [] },
|
||||
{ threadId: "01TEST000000000000000000TR", maxRounds: 20 },
|
||||
{ threadId: "01TEST000000000000000000TR", maxRounds: 20, depth: 0 },
|
||||
);
|
||||
const first = await gen.next();
|
||||
expect(first.done).toBe(false);
|
||||
@@ -356,7 +362,7 @@ describe("createSolveIssueRun", () => {
|
||||
);
|
||||
const gen = run(
|
||||
{ prompt: "task", steps: [] },
|
||||
{ threadId: "01TEST000000000000000000TR", maxRounds: 20 },
|
||||
{ threadId: "01TEST000000000000000000TR", maxRounds: 20, depth: 0 },
|
||||
);
|
||||
await gen.next();
|
||||
expect(calls).toEqual(["preparer"]);
|
||||
|
||||
@@ -16,6 +16,7 @@ describe("buildAgentPrompt", () => {
|
||||
test("includes system prompt and full task; omits tools when there are no steps", () => {
|
||||
const ctx: ThreadContext = {
|
||||
start: startTask("fix the bug"),
|
||||
depth: 0,
|
||||
steps: [],
|
||||
threadId: "01TEST000000000000000000TR",
|
||||
currentRole: { name: START, systemPrompt: "You are an agent." },
|
||||
@@ -30,6 +31,7 @@ describe("buildAgentPrompt", () => {
|
||||
test("single step shows full content and meta, and includes tools", () => {
|
||||
const ctx: ThreadContext = {
|
||||
start: startTask("user task"),
|
||||
depth: 0,
|
||||
threadId: "01TEST000000000000000000TR",
|
||||
currentRole: { name: "coder", systemPrompt: "Be helpful." },
|
||||
steps: [
|
||||
@@ -37,6 +39,7 @@ describe("buildAgentPrompt", () => {
|
||||
role: "coder",
|
||||
content: "only step full body",
|
||||
meta: { files: ["a.ts"] },
|
||||
refs: [],
|
||||
timestamp: 2,
|
||||
},
|
||||
],
|
||||
@@ -54,6 +57,7 @@ describe("buildAgentPrompt", () => {
|
||||
test("two or more steps: previous steps are meta-only; latest step is full", () => {
|
||||
const ctx: ThreadContext = {
|
||||
start: startTask("first message full: task content here"),
|
||||
depth: 0,
|
||||
threadId: "01TEST000000000000000000TR",
|
||||
currentRole: { name: "coder", systemPrompt: "System." },
|
||||
steps: [
|
||||
@@ -61,12 +65,14 @@ describe("buildAgentPrompt", () => {
|
||||
role: "planner",
|
||||
content: "PLANNER_SECRET_FULL_TEXT",
|
||||
meta: { plan: "short" },
|
||||
refs: [],
|
||||
timestamp: 2,
|
||||
},
|
||||
{
|
||||
role: "coder",
|
||||
content: "last step full content",
|
||||
meta: { done: true },
|
||||
refs: [],
|
||||
timestamp: 3,
|
||||
},
|
||||
],
|
||||
@@ -87,6 +93,7 @@ describe("buildAgentPrompt", () => {
|
||||
test("middle steps show meta summary only, not full content", () => {
|
||||
const ctx: ThreadContext = {
|
||||
start: startTask("start"),
|
||||
depth: 0,
|
||||
threadId: "01TEST000000000000000000TR",
|
||||
currentRole: { name: "c", systemPrompt: "S" },
|
||||
steps: [
|
||||
@@ -94,18 +101,21 @@ describe("buildAgentPrompt", () => {
|
||||
role: "a",
|
||||
content: "HIDDEN_A",
|
||||
meta: { n: 1 },
|
||||
refs: [],
|
||||
timestamp: 2,
|
||||
},
|
||||
{
|
||||
role: "b",
|
||||
content: "HIDDEN_B_MIDDLE",
|
||||
meta: { n: 2 },
|
||||
refs: [],
|
||||
timestamp: 3,
|
||||
},
|
||||
{
|
||||
role: "c",
|
||||
content: "VISIBLE_LAST",
|
||||
meta: { n: 3 },
|
||||
refs: [],
|
||||
timestamp: 4,
|
||||
},
|
||||
],
|
||||
|
||||
@@ -22,6 +22,7 @@ describe("buildDescriptor", () => {
|
||||
systemPrompt: "You are an analyst.",
|
||||
extractPrompt: "Extract title and count from the analysis.",
|
||||
schema,
|
||||
extractRefs: null,
|
||||
},
|
||||
},
|
||||
moderator: () => END,
|
||||
|
||||
@@ -89,12 +89,14 @@ const demoWorkflow = createWorkflow<DemoMeta>(
|
||||
systemPrompt: "You are a planner.",
|
||||
extractPrompt: "Extract plan text and affected files list.",
|
||||
schema: plannerMetaSchema,
|
||||
extractRefs: null,
|
||||
},
|
||||
coder: {
|
||||
description: "Demo coder",
|
||||
systemPrompt: "You are a coder.",
|
||||
extractPrompt: "Extract the code diff summary.",
|
||||
schema: coderMetaSchema,
|
||||
extractRefs: null,
|
||||
},
|
||||
},
|
||||
moderator: (ctx) => {
|
||||
@@ -148,6 +150,7 @@ describe("executeThread", () => {
|
||||
{ prompt: "Fix the login redirect bug in #3", steps: [] },
|
||||
{
|
||||
maxRounds: 5,
|
||||
depth: 0,
|
||||
signal: ac.signal,
|
||||
awaitAfterEachYield: async () => {},
|
||||
forkSourceThreadId: null,
|
||||
@@ -176,16 +179,19 @@ describe("executeThread", () => {
|
||||
expect(params.prompt).toBe("Fix the login redirect bug in #3");
|
||||
const opts = params.options as Record<string, unknown>;
|
||||
expect(opts.maxRounds).toBe(5);
|
||||
expect(Object.keys(opts).sort()).toEqual(["maxRounds"]);
|
||||
expect(opts.depth).toBe(0);
|
||||
expect(Object.keys(opts).sort()).toEqual(["depth", "maxRounds"]);
|
||||
|
||||
const role1 = JSON.parse(lines[1] ?? "{}") as Record<string, unknown>;
|
||||
expect(role1.role).toBe("planner");
|
||||
expect(role1.content).toBe("plan-body");
|
||||
expect(role1.meta).toEqual({ plan: "do-it", files: ["a.ts"] });
|
||||
expect(role1.refs).toEqual([]);
|
||||
expect(typeof role1.timestamp).toBe("number");
|
||||
|
||||
const role2 = JSON.parse(lines[2] ?? "{}") as Record<string, unknown>;
|
||||
expect(role2.role).toBe("coder");
|
||||
expect(role2.refs).toEqual([]);
|
||||
|
||||
const infoText = await readFile(infoPath, "utf8");
|
||||
const infoLines = infoText
|
||||
@@ -228,11 +234,13 @@ describe("executeThread", () => {
|
||||
role: "planner",
|
||||
content: "plan-body",
|
||||
meta: { plan: "do-it", files: ["a.ts"] },
|
||||
refs: ["CAS111AAAAAAA"],
|
||||
},
|
||||
],
|
||||
},
|
||||
{
|
||||
maxRounds: 5,
|
||||
depth: 0,
|
||||
signal: ac.signal,
|
||||
awaitAfterEachYield: async () => {},
|
||||
forkSourceThreadId: "01SRC1111111111111111111",
|
||||
@@ -241,6 +249,7 @@ describe("executeThread", () => {
|
||||
role: "planner",
|
||||
content: "plan-body",
|
||||
meta: { plan: "do-it", files: ["a.ts"] },
|
||||
refs: ["CAS111AAAAAAA"],
|
||||
timestamp: histTs,
|
||||
},
|
||||
],
|
||||
@@ -264,6 +273,7 @@ describe("executeThread", () => {
|
||||
const role0 = JSON.parse(lines[1] ?? "{}") as Record<string, unknown>;
|
||||
expect(role0.role).toBe("planner");
|
||||
expect(role0.timestamp).toBe(histTs);
|
||||
expect(role0.refs).toEqual(["CAS111AAAAAAA"]);
|
||||
|
||||
const role1 = JSON.parse(lines[2] ?? "{}") as Record<string, unknown>;
|
||||
expect(role1.role).toBe("coder");
|
||||
@@ -291,6 +301,7 @@ describe("executeThread", () => {
|
||||
{ prompt: "hello", steps: [] },
|
||||
{
|
||||
maxRounds: 0,
|
||||
depth: 0,
|
||||
signal: ac.signal,
|
||||
awaitAfterEachYield: async () => {},
|
||||
forkSourceThreadId: null,
|
||||
|
||||
@@ -24,6 +24,7 @@ describe("fork-thread", () => {
|
||||
expect(r.value.start.threadId).toBe("01AAA1111111111111111111");
|
||||
expect(r.value.start.prompt).toBe("hi");
|
||||
expect(r.value.start.maxRounds).toBe(5);
|
||||
expect(r.value.start.depth).toBe(0);
|
||||
expect(r.value.roleSteps.length).toBe(3);
|
||||
expect(r.value.roleSteps[0]?.role).toBe("planner");
|
||||
});
|
||||
@@ -83,6 +84,24 @@ describe("fork-thread", () => {
|
||||
expect(r.value.workflowName).toBe("demo");
|
||||
expect(r.value.historicalSteps.length).toBe(1);
|
||||
expect(r.value.historicalSteps[0]?.timestamp).toBe(101);
|
||||
expect(r.value.runOptions).toEqual({ maxRounds: 5 });
|
||||
expect(r.value.runOptions).toEqual({ maxRounds: 5, depth: 0 });
|
||||
});
|
||||
|
||||
test("parseThreadDataJsonl reads explicit depth from start record", () => {
|
||||
const text = `{"name":"demo","hash":"H","threadId":"01ZZZZZZZZZZZZZZZZZZZZZZ","parameters":{"prompt":"p","options":{"maxRounds":3,"depth":2}},"timestamp":1}
|
||||
{"role":"planner","content":"x","meta":{},"timestamp":2}
|
||||
`;
|
||||
const r = parseThreadDataJsonl(text);
|
||||
expect(r.ok).toBe(true);
|
||||
if (!r.ok) {
|
||||
return;
|
||||
}
|
||||
expect(r.value.start.depth).toBe(2);
|
||||
const plan = buildForkPlan(text, null);
|
||||
expect(plan.ok).toBe(true);
|
||||
if (!plan.ok) {
|
||||
return;
|
||||
}
|
||||
expect(plan.value.runOptions).toEqual({ maxRounds: 3, depth: 2 });
|
||||
});
|
||||
});
|
||||
|
||||
@@ -0,0 +1,192 @@
|
||||
import { afterEach, describe, expect, test } from "bun:test";
|
||||
import { mkdir, mkdtemp, readFile, rm } from "node:fs/promises";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import * as z from "zod/v4";
|
||||
|
||||
import { createWorkflow } from "../src/create-workflow.js";
|
||||
import { executeThread } from "../src/engine.js";
|
||||
import { createExtract } from "../src/extract-fn.js";
|
||||
import { buildForkPlan, parseThreadDataJsonl } from "../src/fork-thread.js";
|
||||
import { createLogger } from "../src/logger.js";
|
||||
import { END } from "../src/types.js";
|
||||
|
||||
const phaseSchema = z.object({
|
||||
hash: z.string(),
|
||||
title: z.string(),
|
||||
});
|
||||
|
||||
const plannerMetaSchema = z.object({
|
||||
phases: z.array(phaseSchema),
|
||||
});
|
||||
|
||||
type RefsDemoMeta = {
|
||||
planner: z.infer<typeof plannerMetaSchema>;
|
||||
};
|
||||
|
||||
function installMockChatCompletions(sequence: ReadonlyArray<Record<string, unknown>>): () => void {
|
||||
const origFetch = globalThis.fetch;
|
||||
let i = 0;
|
||||
const mockFetch = async (
|
||||
input: Parameters<typeof fetch>[0],
|
||||
init?: RequestInit,
|
||||
): Promise<Response> => {
|
||||
const args = sequence[i] ?? sequence[sequence.length - 1];
|
||||
if (args === undefined) {
|
||||
throw new Error("installMockChatCompletions: empty sequence");
|
||||
}
|
||||
i += 1;
|
||||
void input;
|
||||
const body = init?.body ? (JSON.parse(String(init.body)) as Record<string, unknown>) : {};
|
||||
const tools = body.tools;
|
||||
const firstTool =
|
||||
Array.isArray(tools) && tools.length > 0 && tools[0] !== null && typeof tools[0] === "object"
|
||||
? (tools[0] as Record<string, unknown>)
|
||||
: null;
|
||||
const fn =
|
||||
firstTool !== null ? (firstTool.function as Record<string, unknown> | undefined) : undefined;
|
||||
const toolName = typeof fn?.name === "string" ? fn.name : "extract";
|
||||
return new Response(
|
||||
JSON.stringify({
|
||||
choices: [
|
||||
{
|
||||
message: {
|
||||
tool_calls: [
|
||||
{
|
||||
type: "function",
|
||||
function: {
|
||||
name: toolName,
|
||||
arguments: JSON.stringify(args),
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
],
|
||||
}),
|
||||
{ status: 200, headers: { "Content-Type": "application/json" } },
|
||||
);
|
||||
};
|
||||
globalThis.fetch = Object.assign(mockFetch, {
|
||||
preconnect: origFetch.preconnect.bind(origFetch),
|
||||
}) as typeof fetch;
|
||||
return () => {
|
||||
globalThis.fetch = origFetch;
|
||||
};
|
||||
}
|
||||
|
||||
const refsDemoExtract = createExtract({
|
||||
baseUrl: "http://127.0.0.1:9",
|
||||
apiKey: "test",
|
||||
model: "test",
|
||||
});
|
||||
|
||||
const refsDemoWorkflow = createWorkflow<RefsDemoMeta>(
|
||||
{
|
||||
roles: {
|
||||
planner: {
|
||||
description: "Planner with phase hashes",
|
||||
systemPrompt: "Plan.",
|
||||
extractPrompt: "Extract phases with CAS hashes.",
|
||||
schema: plannerMetaSchema,
|
||||
extractRefs: (meta) => meta.phases.map((p) => p.hash),
|
||||
},
|
||||
},
|
||||
moderator: (ctx) => (ctx.steps.length === 0 ? "planner" : END),
|
||||
},
|
||||
{
|
||||
agent: async () => "plan-output",
|
||||
},
|
||||
refsDemoExtract,
|
||||
);
|
||||
|
||||
describe("RoleStep refs tracking", () => {
|
||||
let restoreFetch: (() => void) | null = null;
|
||||
|
||||
afterEach(() => {
|
||||
restoreFetch?.();
|
||||
restoreFetch = null;
|
||||
});
|
||||
|
||||
test("parseThreadDataJsonl reads refs and defaults missing refs to []", () => {
|
||||
const text = `{"name":"demo","hash":"C9NMV6V2TQT81","threadId":"01AAA1111111111111111111","parameters":{"prompt":"hi","options":{"maxRounds":5}},"timestamp":100}
|
||||
{"role":"planner","content":"p","meta":{},"refs":["H111AAAAAAAAA","H222AAAAAAAAA"],"timestamp":101}
|
||||
{"role":"coder","content":"c","meta":{},"timestamp":102}
|
||||
`;
|
||||
const r = parseThreadDataJsonl(text);
|
||||
expect(r.ok).toBe(true);
|
||||
if (!r.ok) {
|
||||
return;
|
||||
}
|
||||
expect(r.value.roleSteps[0]?.refs).toEqual(["H111AAAAAAAAA", "H222AAAAAAAAA"]);
|
||||
expect(r.value.roleSteps[1]?.refs).toEqual([]);
|
||||
});
|
||||
|
||||
test("executeThread persists refs from extractRefs on role yields", async () => {
|
||||
restoreFetch = installMockChatCompletions([
|
||||
{
|
||||
phases: [
|
||||
{ hash: "C9NMV6V2TQT81", title: "phase-a" },
|
||||
{ hash: "C9NMV6V2TQT82", title: "phase-b" },
|
||||
],
|
||||
},
|
||||
]);
|
||||
|
||||
const root = await mkdtemp(join(tmpdir(), "wf-refs-"));
|
||||
try {
|
||||
const threadId = "01KQXKW18CT8G75T53R8F4G7YG";
|
||||
const hash = "C9NMV6V2TQT81";
|
||||
const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`);
|
||||
const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`);
|
||||
await mkdir(join(root, "logs", hash), { recursive: true });
|
||||
|
||||
const logger = createLogger({ sink: { kind: "file", path: infoPath } });
|
||||
const ac = new AbortController();
|
||||
|
||||
const result = await executeThread(
|
||||
refsDemoWorkflow,
|
||||
"refs-demo",
|
||||
{ prompt: "task", steps: [] },
|
||||
{
|
||||
maxRounds: 5,
|
||||
depth: 0,
|
||||
signal: ac.signal,
|
||||
awaitAfterEachYield: async () => {},
|
||||
forkSourceThreadId: null,
|
||||
prefilledDiskSteps: null,
|
||||
},
|
||||
{ threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath },
|
||||
logger,
|
||||
);
|
||||
|
||||
expect(result.returnCode).toBe(0);
|
||||
|
||||
const dataText = await readFile(dataPath, "utf8");
|
||||
const lines = dataText
|
||||
.trim()
|
||||
.split("\n")
|
||||
.filter((l) => l !== "");
|
||||
expect(lines.length).toBe(2);
|
||||
|
||||
const role1 = JSON.parse(lines[1] ?? "{}") as Record<string, unknown>;
|
||||
expect(role1.role).toBe("planner");
|
||||
expect(role1.refs).toEqual(["C9NMV6V2TQT81", "C9NMV6V2TQT82"]);
|
||||
} finally {
|
||||
await rm(root, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
test("buildForkPlan carries refs on historical steps", () => {
|
||||
const text = `{"name":"demo","hash":"C9NMV6V2TQT81","threadId":"01AAA1111111111111111111","parameters":{"prompt":"hi","options":{"maxRounds":5}},"timestamp":100}
|
||||
{"role":"planner","content":"p","meta":{},"refs":["KEEPREFAAAAAA"],"timestamp":101}
|
||||
{"role":"coder","content":"c","meta":{},"refs":["CODERHASHAAAA"],"timestamp":102}
|
||||
`;
|
||||
const plan = buildForkPlan(text, null);
|
||||
expect(plan.ok).toBe(true);
|
||||
if (!plan.ok) {
|
||||
return;
|
||||
}
|
||||
expect(plan.value.historicalSteps.length).toBe(1);
|
||||
expect(plan.value.historicalSteps[0]?.refs).toEqual(["KEEPREFAAAAAA"]);
|
||||
});
|
||||
});
|
||||
@@ -10,6 +10,7 @@ describe("RFC-001 thread JSONL shapes", () => {
|
||||
prompt: "Fix the login redirect bug in #3",
|
||||
options: {
|
||||
maxRounds: 5,
|
||||
depth: 0,
|
||||
},
|
||||
},
|
||||
timestamp: 1714963200000,
|
||||
@@ -19,13 +20,16 @@ describe("RFC-001 thread JSONL shapes", () => {
|
||||
role: "planner",
|
||||
content: "Plan: modify auth middleware...",
|
||||
meta: { plan: "...", files: ["src/auth.ts"] },
|
||||
refs: [] as string[],
|
||||
timestamp: 1714963201000,
|
||||
};
|
||||
|
||||
expect(Object.keys(startRecord).sort()).toEqual(
|
||||
["hash", "name", "parameters", "threadId", "timestamp"].sort(),
|
||||
);
|
||||
expect(Object.keys(roleRecord).sort()).toEqual(["content", "meta", "role", "timestamp"].sort());
|
||||
expect(Object.keys(roleRecord).sort()).toEqual(
|
||||
["content", "meta", "refs", "role", "timestamp"].sort(),
|
||||
);
|
||||
});
|
||||
|
||||
test("documents the `.info.jsonl` debug record keys", () => {
|
||||
|
||||
@@ -0,0 +1,206 @@
|
||||
import { afterEach, describe, expect, test } from "bun:test";
|
||||
import { mkdir, mkdtemp, readdir, readFile, rm, writeFile } from "node:fs/promises";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import * as z from "zod/v4";
|
||||
|
||||
import { createWorkflow } from "../src/create-workflow.js";
|
||||
import { executeThread } from "../src/engine.js";
|
||||
import { createExtract } from "../src/extract-fn.js";
|
||||
import { hashWorkflowBundleBytes } from "../src/hash.js";
|
||||
import { createLogger } from "../src/logger.js";
|
||||
import {
|
||||
readWorkflowRegistry,
|
||||
registerWorkflowVersion,
|
||||
writeWorkflowRegistry,
|
||||
} from "../src/registry.js";
|
||||
import { END } from "../src/types.js";
|
||||
import { workflowAsAgent } from "../src/workflow-as-agent.js";
|
||||
|
||||
const callerMetaSchema = z.object({ done: z.literal(true) });
|
||||
|
||||
type ParentMeta = {
|
||||
caller: z.infer<typeof callerMetaSchema>;
|
||||
};
|
||||
|
||||
function installMockChatCompletions(sequence: ReadonlyArray<Record<string, unknown>>): () => void {
|
||||
const origFetch = globalThis.fetch;
|
||||
let i = 0;
|
||||
const mockFetch = async (
|
||||
input: Parameters<typeof fetch>[0],
|
||||
init?: RequestInit,
|
||||
): Promise<Response> => {
|
||||
const args = sequence[i] ?? sequence[sequence.length - 1];
|
||||
if (args === undefined) {
|
||||
throw new Error("installMockChatCompletions: empty sequence");
|
||||
}
|
||||
i += 1;
|
||||
void input;
|
||||
const body = init?.body ? (JSON.parse(String(init.body)) as Record<string, unknown>) : {};
|
||||
const tools = body.tools;
|
||||
const firstTool =
|
||||
Array.isArray(tools) && tools.length > 0 && tools[0] !== null && typeof tools[0] === "object"
|
||||
? (tools[0] as Record<string, unknown>)
|
||||
: null;
|
||||
const fn =
|
||||
firstTool !== null ? (firstTool.function as Record<string, unknown> | undefined) : undefined;
|
||||
const toolName = typeof fn?.name === "string" ? fn.name : "extract";
|
||||
return new Response(
|
||||
JSON.stringify({
|
||||
choices: [
|
||||
{
|
||||
message: {
|
||||
tool_calls: [
|
||||
{
|
||||
type: "function",
|
||||
function: {
|
||||
name: toolName,
|
||||
arguments: JSON.stringify(args),
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
},
|
||||
],
|
||||
}),
|
||||
{ status: 200, headers: { "Content-Type": "application/json" } },
|
||||
);
|
||||
};
|
||||
globalThis.fetch = Object.assign(mockFetch, {
|
||||
preconnect: origFetch.preconnect.bind(origFetch),
|
||||
}) as typeof fetch;
|
||||
return () => {
|
||||
globalThis.fetch = origFetch;
|
||||
};
|
||||
}
|
||||
|
||||
const parentExtract = createExtract({
|
||||
baseUrl: "http://127.0.0.1:9",
|
||||
apiKey: "test",
|
||||
model: "test",
|
||||
});
|
||||
|
||||
const childBundleSource = `export const descriptor = {
|
||||
description: "child-integration",
|
||||
roles: {
|
||||
agent: {
|
||||
description: "agent",
|
||||
schema: { type: "object", properties: {}, additionalProperties: true },
|
||||
},
|
||||
},
|
||||
};
|
||||
export async function* run(input) {
|
||||
yield { role: "agent", content: "child-body", meta: {}, refs: [] };
|
||||
return { returnCode: 0, summary: "child-done:" + input.prompt };
|
||||
}
|
||||
`;
|
||||
|
||||
async function installChildWorkflow(storageRoot: string): Promise<{ hash: string }> {
|
||||
const bytes = new TextEncoder().encode(childBundleSource);
|
||||
const hash = hashWorkflowBundleBytes(bytes);
|
||||
await mkdir(join(storageRoot, "bundles"), { recursive: true });
|
||||
await writeFile(join(storageRoot, "bundles", `${hash}.esm.js`), childBundleSource, "utf8");
|
||||
const reg = await readWorkflowRegistry(storageRoot);
|
||||
if (!reg.ok) {
|
||||
throw reg.error;
|
||||
}
|
||||
const next = registerWorkflowVersion(reg.value, "child-wf", hash, Date.now());
|
||||
const wr = await writeWorkflowRegistry(storageRoot, next);
|
||||
if (!wr.ok) {
|
||||
throw wr.error;
|
||||
}
|
||||
return { hash };
|
||||
}
|
||||
|
||||
describe("workflowAsAgent integration", () => {
|
||||
let restoreFetch: (() => void) | null = null;
|
||||
|
||||
afterEach(() => {
|
||||
restoreFetch?.();
|
||||
restoreFetch = null;
|
||||
});
|
||||
|
||||
test("createWorkflow parent invokes nested workflow via workflowAsAgent", async () => {
|
||||
restoreFetch = installMockChatCompletions([{ done: true }]);
|
||||
|
||||
const root = await mkdtemp(join(tmpdir(), "wf-waa-int-"));
|
||||
try {
|
||||
const { hash: childHash } = await installChildWorkflow(root);
|
||||
|
||||
const parentWorkflow = createWorkflow<ParentMeta>(
|
||||
{
|
||||
roles: {
|
||||
caller: {
|
||||
description: "delegates to child workflow",
|
||||
systemPrompt: "system",
|
||||
extractPrompt: "extract done flag",
|
||||
schema: callerMetaSchema,
|
||||
extractRefs: null,
|
||||
},
|
||||
},
|
||||
moderator: (ctx) => (ctx.steps.length === 0 ? "caller" : END),
|
||||
},
|
||||
{ agent: workflowAsAgent("child-wf", { storageRoot: root }) },
|
||||
parentExtract,
|
||||
);
|
||||
|
||||
const threadId = "01KQXKW18CT8G75T53R8F4G7YG";
|
||||
const parentHash = "C9NMV6V2TQT81";
|
||||
const dataPath = join(root, "logs", parentHash, `${threadId}.data.jsonl`);
|
||||
const infoPath = join(root, "logs", parentHash, `${threadId}.info.jsonl`);
|
||||
await mkdir(join(root, "logs", parentHash), { recursive: true });
|
||||
|
||||
const logger = createLogger({ sink: { kind: "file", path: infoPath } });
|
||||
const ac = new AbortController();
|
||||
|
||||
const result = await executeThread(
|
||||
parentWorkflow,
|
||||
"parent-wf",
|
||||
{ prompt: "from-parent", steps: [] },
|
||||
{
|
||||
maxRounds: 5,
|
||||
depth: 0,
|
||||
signal: ac.signal,
|
||||
awaitAfterEachYield: async () => {},
|
||||
forkSourceThreadId: null,
|
||||
prefilledDiskSteps: null,
|
||||
},
|
||||
{ threadId, hash: parentHash, dataJsonlPath: dataPath, infoJsonlPath: infoPath },
|
||||
logger,
|
||||
);
|
||||
|
||||
expect(result.returnCode).toBe(0);
|
||||
|
||||
const parentText = await readFile(dataPath, "utf8");
|
||||
const parentLines = parentText
|
||||
.trim()
|
||||
.split("\n")
|
||||
.filter((l) => l !== "");
|
||||
expect(parentLines.length).toBe(2);
|
||||
const callerLine = JSON.parse(parentLines[1] ?? "{}") as Record<string, unknown>;
|
||||
expect(callerLine.role).toBe("caller");
|
||||
expect(callerLine.content).toBe("child-done:from-parent");
|
||||
|
||||
const childDir = join(root, "logs", childHash);
|
||||
const childFiles = await readdir(childDir);
|
||||
const childDataName = childFiles.find((n) => n.endsWith(".data.jsonl"));
|
||||
expect(childDataName).toBeDefined();
|
||||
|
||||
const childText = await readFile(join(childDir, childDataName ?? ""), "utf8");
|
||||
const childStart = JSON.parse(
|
||||
childText
|
||||
.trim()
|
||||
.split("\n")
|
||||
.filter((l) => l !== "")[0] ?? "{}",
|
||||
) as Record<string, unknown>;
|
||||
expect(childStart.forkFrom).toEqual({ threadId });
|
||||
const childOpts = (childStart.parameters as Record<string, unknown>).options as Record<
|
||||
string,
|
||||
unknown
|
||||
>;
|
||||
expect(childOpts.depth).toBe(1);
|
||||
} finally {
|
||||
await rm(root, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,101 @@
|
||||
import { describe, expect, test } from "bun:test";
|
||||
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
|
||||
import { hashWorkflowBundleBytes } from "../src/hash.js";
|
||||
import {
|
||||
readWorkflowRegistry,
|
||||
registerWorkflowVersion,
|
||||
writeWorkflowRegistry,
|
||||
} from "../src/registry.js";
|
||||
import { type AgentContext, START } from "../src/types.js";
|
||||
import { workflowAsAgent } from "../src/workflow-as-agent.js";
|
||||
|
||||
function makeAgentCtx(params: { depth: number; prompt: string; maxRounds: number }): AgentContext {
|
||||
const ts = Date.now();
|
||||
return {
|
||||
threadId: "01PARENT000000000000000001AA",
|
||||
depth: params.depth,
|
||||
start: {
|
||||
role: START,
|
||||
content: params.prompt,
|
||||
meta: { maxRounds: params.maxRounds },
|
||||
timestamp: ts,
|
||||
},
|
||||
steps: [],
|
||||
currentRole: {
|
||||
name: "caller",
|
||||
systemPrompt: "caller",
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
const childBundleSource = `export const descriptor = {
|
||||
description: "child-test",
|
||||
roles: {
|
||||
agent: {
|
||||
description: "agent",
|
||||
schema: { type: "object", properties: {}, additionalProperties: true },
|
||||
},
|
||||
},
|
||||
};
|
||||
export async function* run(input) {
|
||||
yield { role: "agent", content: "child-body", meta: {}, refs: [] };
|
||||
return { returnCode: 0, summary: "child-done:" + input.prompt };
|
||||
}
|
||||
`;
|
||||
|
||||
async function installChildWorkflow(storageRoot: string): Promise<{ hash: string }> {
|
||||
const bytes = new TextEncoder().encode(childBundleSource);
|
||||
const hash = hashWorkflowBundleBytes(bytes);
|
||||
await mkdir(join(storageRoot, "bundles"), { recursive: true });
|
||||
await writeFile(join(storageRoot, "bundles", `${hash}.esm.js`), childBundleSource, "utf8");
|
||||
const reg = await readWorkflowRegistry(storageRoot);
|
||||
if (!reg.ok) {
|
||||
throw reg.error;
|
||||
}
|
||||
const next = registerWorkflowVersion(reg.value, "child-wf", hash, Date.now());
|
||||
const wr = await writeWorkflowRegistry(storageRoot, next);
|
||||
if (!wr.ok) {
|
||||
throw wr.error;
|
||||
}
|
||||
return { hash };
|
||||
}
|
||||
|
||||
describe("workflowAsAgent", () => {
|
||||
test("returns error when workflow name is not registered", async () => {
|
||||
const root = await mkdtemp(join(tmpdir(), "wf-waa-missing-"));
|
||||
try {
|
||||
const agent = workflowAsAgent("missing-wf", { storageRoot: root });
|
||||
const out = await agent(makeAgentCtx({ depth: 0, prompt: "x", maxRounds: 5 }));
|
||||
expect(out).toContain("not found in registry");
|
||||
expect(out).toContain("missing-wf");
|
||||
} finally {
|
||||
await rm(root, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
test("runs registered workflow and returns child summary string", async () => {
|
||||
const root = await mkdtemp(join(tmpdir(), "wf-waa-ok-"));
|
||||
try {
|
||||
await installChildWorkflow(root);
|
||||
const agent = workflowAsAgent("child-wf", { storageRoot: root });
|
||||
const out = await agent(makeAgentCtx({ depth: 0, prompt: "hello-parent", maxRounds: 5 }));
|
||||
expect(out).toBe("child-done:hello-parent");
|
||||
} finally {
|
||||
await rm(root, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
|
||||
test("enforces depth limit (returns error string, does not throw)", async () => {
|
||||
const root = await mkdtemp(join(tmpdir(), "wf-waa-depth-"));
|
||||
try {
|
||||
const agent = workflowAsAgent("child-wf", { storageRoot: root });
|
||||
const out = await agent(makeAgentCtx({ depth: 3, prompt: "x", maxRounds: 5 }));
|
||||
expect(out).toContain("depth limit");
|
||||
} finally {
|
||||
await rm(root, { recursive: true, force: true });
|
||||
}
|
||||
});
|
||||
});
|
||||
@@ -5,6 +5,7 @@ import {
|
||||
END,
|
||||
type ExtractContext,
|
||||
type ModeratorContext,
|
||||
type RoleDefinition,
|
||||
type RoleMeta,
|
||||
type RoleOutput,
|
||||
type RoleStep,
|
||||
@@ -22,6 +23,17 @@ function isRoleNext<M extends RoleMeta>(
|
||||
return next !== END;
|
||||
}
|
||||
|
||||
function resolveExtractedRefs(
|
||||
roleDef: RoleDefinition<Record<string, unknown>>,
|
||||
meta: unknown,
|
||||
): string[] {
|
||||
const extractRefsFn = roleDef.extractRefs;
|
||||
if (extractRefsFn === null || typeof extractRefsFn !== "function") {
|
||||
return [];
|
||||
}
|
||||
return extractRefsFn(meta as Record<string, unknown>);
|
||||
}
|
||||
|
||||
/**
|
||||
* Binds pure role definitions + moderator to runtime agents and structured extraction.
|
||||
* Assign with `export const run = createWorkflow(def, binding, extract)`.
|
||||
@@ -48,6 +60,7 @@ export function createWorkflow<M extends RoleMeta>(
|
||||
role: out.role,
|
||||
content: out.content,
|
||||
meta: out.meta,
|
||||
refs: out.refs,
|
||||
timestamp: baseTs + i,
|
||||
})) as RoleStep<M>[];
|
||||
|
||||
@@ -61,6 +74,7 @@ export function createWorkflow<M extends RoleMeta>(
|
||||
|
||||
const modCtx: ModeratorContext<M> = {
|
||||
threadId: options.threadId,
|
||||
depth: options.depth,
|
||||
start,
|
||||
steps,
|
||||
};
|
||||
@@ -96,15 +110,21 @@ export function createWorkflow<M extends RoleMeta>(
|
||||
extractCtx as unknown as ExtractContext,
|
||||
);
|
||||
|
||||
const refs = resolveExtractedRefs(
|
||||
roleDef as unknown as RoleDefinition<Record<string, unknown>>,
|
||||
meta,
|
||||
);
|
||||
|
||||
const ts = Date.now();
|
||||
const step = {
|
||||
role: next,
|
||||
content: raw,
|
||||
meta,
|
||||
refs,
|
||||
timestamp: ts,
|
||||
} as RoleStep<M>;
|
||||
|
||||
yield { role: step.role, content: step.content, meta: step.meta };
|
||||
yield { role: step.role, content: step.content, meta: step.meta, refs: step.refs };
|
||||
|
||||
steps = [...steps, step];
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ import { appendFile, mkdir } from "node:fs/promises";
|
||||
import { dirname } from "node:path";
|
||||
|
||||
import type { LogFn } from "./logger.js";
|
||||
import { normalizeRefsField } from "./refs-field.js";
|
||||
import type { ThreadInput, WorkflowFn, WorkflowFnOptions, WorkflowResult } from "./types.js";
|
||||
|
||||
export type ExecuteThreadIo = {
|
||||
@@ -16,11 +17,14 @@ export type PrefilledDiskStep = {
|
||||
role: string;
|
||||
content: string;
|
||||
meta: Record<string, unknown>;
|
||||
refs: string[];
|
||||
timestamp: number;
|
||||
};
|
||||
|
||||
export type ExecuteThreadOptions = {
|
||||
maxRounds: number;
|
||||
/** Passed to the bundle as `WorkflowFnOptions.depth`. */
|
||||
depth: number;
|
||||
signal: AbortSignal;
|
||||
/** Invoked after each successful yield (and outer-loop checks); used for pause/resume. */
|
||||
awaitAfterEachYield: () => Promise<void>;
|
||||
@@ -79,6 +83,7 @@ async function driveWorkflowGenerator(params: {
|
||||
role: step.role,
|
||||
content: step.content,
|
||||
meta: step.meta,
|
||||
refs: normalizeRefsField(step.refs),
|
||||
timestamp: ts,
|
||||
});
|
||||
|
||||
@@ -133,6 +138,7 @@ export async function executeThread(
|
||||
prompt: input.prompt,
|
||||
options: {
|
||||
maxRounds: options.maxRounds,
|
||||
depth: options.depth,
|
||||
},
|
||||
},
|
||||
timestamp: nowMs,
|
||||
@@ -151,6 +157,7 @@ export async function executeThread(
|
||||
role: row.role,
|
||||
content: row.content,
|
||||
meta: row.meta,
|
||||
refs: normalizeRefsField(row.refs),
|
||||
timestamp: row.timestamp,
|
||||
});
|
||||
}
|
||||
@@ -167,6 +174,7 @@ export async function executeThread(
|
||||
const bundleOptions: WorkflowFnOptions = {
|
||||
threadId: io.threadId,
|
||||
maxRounds: options.maxRounds,
|
||||
depth: options.depth,
|
||||
};
|
||||
|
||||
return await driveWorkflowGenerator({
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { normalizeRefsField } from "./refs-field.js";
|
||||
import { err, ok, type Result } from "./result.js";
|
||||
import type { RoleOutput } from "./types.js";
|
||||
|
||||
@@ -10,6 +11,7 @@ export type ParsedThreadStartRecord = {
|
||||
threadId: string;
|
||||
prompt: string;
|
||||
maxRounds: number;
|
||||
depth: number;
|
||||
};
|
||||
|
||||
function parseRoleLine(
|
||||
@@ -36,6 +38,7 @@ function parseRoleLine(
|
||||
role,
|
||||
content,
|
||||
meta: meta as Record<string, unknown>,
|
||||
refs: normalizeRefsField(obj.refs),
|
||||
timestamp,
|
||||
});
|
||||
}
|
||||
@@ -76,12 +79,17 @@ function parseStartRecordLine(firstLine: string): Result<ParsedThreadStartRecord
|
||||
return err("start record missing parameters.options.maxRounds");
|
||||
}
|
||||
|
||||
const depthRaw = optRec.depth;
|
||||
const depth =
|
||||
typeof depthRaw === "number" && Number.isFinite(depthRaw) ? Math.trunc(depthRaw) : 0;
|
||||
|
||||
return ok({
|
||||
workflowName: name,
|
||||
hash,
|
||||
threadId,
|
||||
prompt,
|
||||
maxRounds,
|
||||
depth,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -194,7 +202,7 @@ export type ForkPlan = {
|
||||
hash: string;
|
||||
sourceThreadId: string;
|
||||
prompt: string;
|
||||
runOptions: { maxRounds: number };
|
||||
runOptions: { maxRounds: number; depth: number };
|
||||
historicalSteps: ForkHistoricalStep[];
|
||||
};
|
||||
|
||||
@@ -219,7 +227,7 @@ export function buildForkPlan(
|
||||
hash: start.hash,
|
||||
sourceThreadId: start.threadId,
|
||||
prompt: start.prompt,
|
||||
runOptions: { maxRounds: start.maxRounds },
|
||||
runOptions: { maxRounds: start.maxRounds, depth: start.depth },
|
||||
historicalSteps: selected.value,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -0,0 +1,131 @@
|
||||
import { readdir, readFile } from "node:fs/promises";
|
||||
import { join } from "node:path";
|
||||
|
||||
import { type CasStore, createCasStore } from "./cas.js";
|
||||
import { parseThreadDataJsonl } from "./fork-thread.js";
|
||||
import { err, ok, type Result } from "./result.js";
|
||||
import { getGlobalCasDir } from "./storage-root.js";
|
||||
|
||||
export type GcResult = {
|
||||
scannedThreads: number;
|
||||
activeRefs: number;
|
||||
deletedEntries: number;
|
||||
deletedHashes: string[];
|
||||
};
|
||||
|
||||
async function listThreadDataJsonlPaths(storageRoot: string): Promise<Result<string[], string>> {
|
||||
const logsRoot = join(storageRoot, "logs");
|
||||
const paths: string[] = [];
|
||||
let hashes: string[];
|
||||
try {
|
||||
hashes = await readdir(logsRoot);
|
||||
} catch (e) {
|
||||
const errObj = e as NodeJS.ErrnoException;
|
||||
if (errObj.code === "ENOENT") {
|
||||
return ok([]);
|
||||
}
|
||||
return err(`failed to read logs directory: ${String(e)}`);
|
||||
}
|
||||
|
||||
for (const hash of hashes) {
|
||||
const dir = join(logsRoot, hash);
|
||||
let entries: string[];
|
||||
try {
|
||||
entries = await readdir(dir);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
for (const fileName of entries) {
|
||||
if (fileName.endsWith(".data.jsonl")) {
|
||||
paths.push(join(dir, fileName));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
paths.sort();
|
||||
return ok(paths);
|
||||
}
|
||||
|
||||
async function collectActiveRefsFromDataPaths(
|
||||
dataPaths: string[],
|
||||
): Promise<Result<Set<string>, string>> {
|
||||
const activeRefs = new Set<string>();
|
||||
for (const dataPath of dataPaths) {
|
||||
let text: string;
|
||||
try {
|
||||
text = await readFile(dataPath, "utf8");
|
||||
} catch (e) {
|
||||
return err(`failed to read ${dataPath}: ${String(e)}`);
|
||||
}
|
||||
const parsed = parseThreadDataJsonl(text);
|
||||
if (!parsed.ok) {
|
||||
return err(`${dataPath}: ${parsed.error}`);
|
||||
}
|
||||
for (const step of parsed.value.roleSteps) {
|
||||
for (const ref of step.refs) {
|
||||
activeRefs.add(ref);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ok(activeRefs);
|
||||
}
|
||||
|
||||
async function deleteCasNotInSet(
|
||||
cas: CasStore,
|
||||
activeRefs: Set<string>,
|
||||
): Promise<Result<string[], string>> {
|
||||
let listed: string[];
|
||||
try {
|
||||
listed = await cas.list();
|
||||
} catch (e) {
|
||||
return err(`failed to list cas entries: ${String(e)}`);
|
||||
}
|
||||
|
||||
const deletedHashes: string[] = [];
|
||||
for (const hash of listed) {
|
||||
if (activeRefs.has(hash)) {
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
await cas.delete(hash);
|
||||
} catch (e) {
|
||||
return err(`failed to delete cas ${hash}: ${String(e)}`);
|
||||
}
|
||||
deletedHashes.push(hash);
|
||||
}
|
||||
|
||||
deletedHashes.sort();
|
||||
return ok(deletedHashes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark-and-sweep CAS GC: collect `refs` from all thread `.data.jsonl` files under `storageRoot`,
|
||||
* then delete CAS blobs not referenced by any surviving thread data.
|
||||
*/
|
||||
export async function garbageCollectCas(storageRoot: string): Promise<Result<GcResult, string>> {
|
||||
const pathsResult = await listThreadDataJsonlPaths(storageRoot);
|
||||
if (!pathsResult.ok) {
|
||||
return pathsResult;
|
||||
}
|
||||
const paths = pathsResult.value;
|
||||
|
||||
const refsResult = await collectActiveRefsFromDataPaths(paths);
|
||||
if (!refsResult.ok) {
|
||||
return refsResult;
|
||||
}
|
||||
const activeRefs = refsResult.value;
|
||||
|
||||
const cas = createCasStore(getGlobalCasDir(storageRoot));
|
||||
const deletedResult = await deleteCasNotInSet(cas, activeRefs);
|
||||
if (!deletedResult.ok) {
|
||||
return deletedResult;
|
||||
}
|
||||
const deletedHashes = deletedResult.value;
|
||||
|
||||
return ok({
|
||||
scannedThreads: paths.length,
|
||||
activeRefs: activeRefs.size,
|
||||
deletedEntries: deletedHashes.length,
|
||||
deletedHashes,
|
||||
});
|
||||
}
|
||||
@@ -25,6 +25,7 @@ export {
|
||||
parseThreadDataJsonl,
|
||||
selectForkHistoricalSteps,
|
||||
} from "./fork-thread.js";
|
||||
export { type GcResult, garbageCollectCas } from "./gc.js";
|
||||
export { stringifyWorkflowDescriptor } from "./generate-descriptor.js";
|
||||
export { hashString, hashWorkflowBundleBytes } from "./hash.js";
|
||||
export {
|
||||
@@ -81,6 +82,7 @@ export {
|
||||
} from "./types.js";
|
||||
export { generateUlid } from "./ulid.js";
|
||||
export { getWorkerHostScriptPath } from "./worker-entry-path.js";
|
||||
export { type WorkflowAsAgentOptions, workflowAsAgent } from "./workflow-as-agent.js";
|
||||
export {
|
||||
validateWorkflowDescriptor,
|
||||
type WorkflowDescriptor,
|
||||
|
||||
@@ -0,0 +1,13 @@
|
||||
/** Normalize `refs` from persisted JSONL or IPC payloads (missing or invalid → []). */
|
||||
export function normalizeRefsField(value: unknown): string[] {
|
||||
if (!Array.isArray(value)) {
|
||||
return [];
|
||||
}
|
||||
const out: string[] = [];
|
||||
for (const x of value) {
|
||||
if (typeof x === "string") {
|
||||
out.push(x);
|
||||
}
|
||||
}
|
||||
return out;
|
||||
}
|
||||
@@ -19,6 +19,8 @@ export type RoleOutput = {
|
||||
role: string;
|
||||
content: string;
|
||||
meta: Record<string, unknown>;
|
||||
/** CAS hashes produced or consumed by this step (for GC traceability). */
|
||||
refs: string[];
|
||||
};
|
||||
|
||||
/** What the workflow AsyncGenerator returns when done. */
|
||||
@@ -37,6 +39,8 @@ export type ThreadInput = {
|
||||
export type WorkflowFnOptions = {
|
||||
threadId: string;
|
||||
maxRounds: number;
|
||||
/** Nesting depth for workflow-as-agent chains; root threads use `0`. */
|
||||
depth: number;
|
||||
};
|
||||
|
||||
/** Bundle contract — named export `run` is a function returning an AsyncGenerator. */
|
||||
@@ -55,12 +59,20 @@ export type StartStep = {
|
||||
|
||||
/** A completed role step in the thread. */
|
||||
export type RoleStep<M extends RoleMeta> = {
|
||||
[K in keyof M & string]: { role: K; meta: M[K]; content: string; timestamp: number };
|
||||
[K in keyof M & string]: {
|
||||
role: K;
|
||||
meta: M[K];
|
||||
content: string;
|
||||
refs: string[];
|
||||
timestamp: number;
|
||||
};
|
||||
}[keyof M & string];
|
||||
|
||||
/** Phase 1: Moderator decides next role. */
|
||||
export type ModeratorContext<M extends RoleMeta = RoleMeta> = {
|
||||
threadId: string;
|
||||
/** Same as `WorkflowFnOptions.depth` for the active thread. */
|
||||
depth: number;
|
||||
start: StartStep;
|
||||
steps: RoleStep<M>[];
|
||||
};
|
||||
@@ -96,6 +108,8 @@ export type RoleDefinition<Meta extends Record<string, unknown>> = {
|
||||
systemPrompt: string;
|
||||
extractPrompt: string;
|
||||
schema: z.ZodType<Meta>;
|
||||
/** When non-null, produces CAS hashes to persist on this role's steps (see `RoleOutput.refs`). */
|
||||
extractRefs: ((meta: Meta) => string[]) | null;
|
||||
};
|
||||
|
||||
/**
|
||||
|
||||
@@ -5,6 +5,7 @@ import { pathToFileURL } from "node:url";
|
||||
import type { PrefilledDiskStep } from "./engine.js";
|
||||
import { type ExecuteThreadIo, executeThread } from "./engine.js";
|
||||
import { createLogger } from "./logger.js";
|
||||
import { normalizeRefsField } from "./refs-field.js";
|
||||
import { err, ok, type Result } from "./result.js";
|
||||
import { createThreadPauseGate, type ThreadPauseGate } from "./thread-pause-gate.js";
|
||||
import type { RoleOutput, WorkflowFn } from "./types.js";
|
||||
@@ -16,7 +17,7 @@ type RunCommand = {
|
||||
threadId: string;
|
||||
workflowName: string;
|
||||
prompt: string;
|
||||
options: { maxRounds: number };
|
||||
options: { maxRounds: number; depth: number };
|
||||
steps: RoleOutput[];
|
||||
/** Timestamps aligned with `steps` for `.data.jsonl` replay; length must match `steps` when non-null. */
|
||||
stepTimestamps: number[] | null;
|
||||
@@ -55,7 +56,12 @@ function parseRoleOutputRecord(obj: Record<string, unknown>): RoleOutput | null
|
||||
if (meta === null || typeof meta !== "object") {
|
||||
return null;
|
||||
}
|
||||
return { role, content, meta: meta as Record<string, unknown> };
|
||||
return {
|
||||
role,
|
||||
content,
|
||||
meta: meta as Record<string, unknown>,
|
||||
refs: normalizeRefsField(obj.refs),
|
||||
};
|
||||
}
|
||||
|
||||
function parseRunStepsPayload(rec: Record<string, unknown>): {
|
||||
@@ -118,6 +124,9 @@ function parseRunControlPayload(rec: Record<string, unknown>): RunCommand | null
|
||||
if (typeof maxRounds !== "number") {
|
||||
return null;
|
||||
}
|
||||
const depthRaw = optRec.depth;
|
||||
const depth =
|
||||
typeof depthRaw === "number" && Number.isFinite(depthRaw) ? Math.trunc(depthRaw) : 0;
|
||||
const parsedSteps = parseRunStepsPayload(rec);
|
||||
if (parsedSteps === null) {
|
||||
return null;
|
||||
@@ -135,7 +144,7 @@ function parseRunControlPayload(rec: Record<string, unknown>): RunCommand | null
|
||||
threadId,
|
||||
workflowName,
|
||||
prompt,
|
||||
options: { maxRounds },
|
||||
options: { maxRounds, depth },
|
||||
steps: parsedSteps.steps,
|
||||
stepTimestamps: parsedSteps.stepTimestamps,
|
||||
forkSourceThreadId,
|
||||
@@ -382,6 +391,7 @@ async function main(): Promise<void> {
|
||||
role: step.role,
|
||||
content: step.content,
|
||||
meta: step.meta,
|
||||
refs: normalizeRefsField(step.refs),
|
||||
timestamp: typeof ts === "number" && ts > 0 ? ts : baseTs + i,
|
||||
};
|
||||
});
|
||||
|
||||
@@ -0,0 +1,99 @@
|
||||
import { join } from "node:path";
|
||||
|
||||
import { type ExecuteThreadIo, executeThread } from "./engine.js";
|
||||
import { extractBundleExports } from "./extract-bundle-exports.js";
|
||||
import { createLogger } from "./logger.js";
|
||||
import { getRegisteredWorkflow, readWorkflowRegistry } from "./registry.js";
|
||||
import { getDefaultWorkflowStorageRoot } from "./storage-root.js";
|
||||
import type { AgentContext, AgentFn, ThreadInput } from "./types.js";
|
||||
import { generateUlid } from "./ulid.js";
|
||||
|
||||
/** Maximum `WorkflowFnOptions.depth` allowed for a child spawned via `workflowAsAgent`. */
|
||||
const WORKFLOW_AS_AGENT_MAX_DEPTH = 3;
|
||||
|
||||
export type WorkflowAsAgentOptions = {
|
||||
/** When `null`, uses `getDefaultWorkflowStorageRoot()`. */
|
||||
storageRoot: string | null;
|
||||
};
|
||||
|
||||
function resolveWorkflowAsAgentStorageRoot(options: WorkflowAsAgentOptions | null): string {
|
||||
if (options !== null && options.storageRoot !== null) {
|
||||
return options.storageRoot;
|
||||
}
|
||||
return getDefaultWorkflowStorageRoot();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an {@link AgentFn} that runs another registered workflow in a new thread,
|
||||
* using the parent thread's initial prompt (`ctx.start.content`) as the child {@link ThreadInput.prompt}.
|
||||
*/
|
||||
export function workflowAsAgent(
|
||||
workflowName: string,
|
||||
options: WorkflowAsAgentOptions | null = null,
|
||||
): AgentFn {
|
||||
return async (ctx: AgentContext): Promise<string> => {
|
||||
const nextDepth = ctx.depth + 1;
|
||||
if (nextDepth > WORKFLOW_AS_AGENT_MAX_DEPTH) {
|
||||
return `ERROR: workflow-as-agent depth limit exceeded (max ${WORKFLOW_AS_AGENT_MAX_DEPTH})`;
|
||||
}
|
||||
|
||||
const storageRoot = resolveWorkflowAsAgentStorageRoot(options);
|
||||
|
||||
const registryResult = await readWorkflowRegistry(storageRoot);
|
||||
if (!registryResult.ok) {
|
||||
return `ERROR: failed to read workflow registry: ${registryResult.error.message}`;
|
||||
}
|
||||
|
||||
const entry = getRegisteredWorkflow(registryResult.value, workflowName);
|
||||
if (entry === null) {
|
||||
return `ERROR: workflow "${workflowName}" not found in registry`;
|
||||
}
|
||||
|
||||
const bundlePath = join(storageRoot, "bundles", `${entry.hash}.esm.js`);
|
||||
const bundleExportsResult = await extractBundleExports(bundlePath);
|
||||
if (!bundleExportsResult.ok) {
|
||||
return `ERROR: ${bundleExportsResult.error}`;
|
||||
}
|
||||
|
||||
const input: ThreadInput = {
|
||||
prompt: ctx.start.content,
|
||||
steps: [],
|
||||
};
|
||||
|
||||
const childThreadId = generateUlid(Date.now());
|
||||
const dataJsonlPath = join(storageRoot, "logs", entry.hash, `${childThreadId}.data.jsonl`);
|
||||
const infoJsonlPath = join(storageRoot, "logs", entry.hash, `${childThreadId}.info.jsonl`);
|
||||
|
||||
const io: ExecuteThreadIo = {
|
||||
threadId: childThreadId,
|
||||
hash: entry.hash,
|
||||
dataJsonlPath,
|
||||
infoJsonlPath,
|
||||
};
|
||||
|
||||
const logger = createLogger({ sink: { kind: "file", path: infoJsonlPath } });
|
||||
const signalNever = new AbortController();
|
||||
|
||||
try {
|
||||
const result = await executeThread(
|
||||
bundleExportsResult.value.run,
|
||||
workflowName,
|
||||
input,
|
||||
{
|
||||
maxRounds: ctx.start.meta.maxRounds,
|
||||
depth: nextDepth,
|
||||
signal: signalNever.signal,
|
||||
awaitAfterEachYield: async () => {},
|
||||
forkSourceThreadId: ctx.threadId,
|
||||
prefilledDiskSteps: null,
|
||||
},
|
||||
io,
|
||||
logger,
|
||||
);
|
||||
return result.summary;
|
||||
} catch (e) {
|
||||
const message = e instanceof Error ? e.message : String(e);
|
||||
return `ERROR: ${message}`;
|
||||
}
|
||||
};
|
||||
}
|
||||
Reference in New Issue
Block a user