Merge pull request 'feat: Merkle node format + content → CAS' (#45) from feat/41-merkle-content-cas into main

This commit is contained in:
2026-05-07 13:14:16 +00:00
35 changed files with 628 additions and 193 deletions
@@ -16,6 +16,9 @@ import { addCliArgs } from "./bundle-fixture.js";
const fixtureDescriptor = `export const descriptor = { description: "fixture", roles: {} };
`;
const wfPutImport = `import { putContentMerkleNode } from "@uncaged/workflow";
`;
describe("cli workflow commands", () => {
let prevEnv: string | undefined;
let storageRoot: string;
@@ -41,11 +44,13 @@ describe("cli workflow commands", () => {
const bundlePath = join(bundleDir, "demo.esm.js");
await writeFile(
bundlePath,
`${fixtureDescriptor}import fs from "node:fs";
`${fixtureDescriptor}${wfPutImport}import fs from "node:fs";
export const run = async function* (input) {
export const run = async function* (input, options) {
fs.existsSync(".");
yield { role: "noop", content: input.prompt, meta: { done: true } };
const cas = options.cas;
const h = await putContentMerkleNode(cas, input.prompt);
yield { role: "noop", contentHash: h, meta: { done: true }, refs: [h] };
return { returnCode: 0, summary: "done" };
}
`,
@@ -112,8 +117,8 @@ export const run = async function* (input) { return { returnCode: 0, summary: in
const bundlePath = join(storageRoot, "solo.esm.js");
await writeFile(
bundlePath,
`export const run = async function* (input) {
yield { role: "x", content: input.prompt, meta: {} };
`export const run = async function* () {
yield { role: "x", contentHash: "STUBHASH00000000000000001", meta: {}, refs: [] };
return { returnCode: 0, summary: "ok" };
}
`,
@@ -141,8 +146,11 @@ export const run = async function* (input) { return { returnCode: 0, summary: in
},
},
};
export const run = async function* (input) {
yield { role: "greeter", content: input.prompt, meta: { greeting: "hi" } };
${wfPutImport}
export const run = async function* (input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, input.prompt);
yield { role: "greeter", contentHash: h, meta: { greeting: "hi" }, refs: [h] };
return { returnCode: 0, summary: "ok" };
};
`,
@@ -180,8 +188,10 @@ export const run = async function* (input) {
const bundlePath = join(bundleDir, "demo.esm.js");
await writeFile(
bundlePath,
`${fixtureDescriptor}export const run = async function* (input) {
yield { role: "a", content: "x", meta: {} };
`${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, "x");
yield { role: "a", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "x" };
}
`,
@@ -209,8 +219,10 @@ export const run = async function* (input) {
const dtsPath = join(bundleDir, "types.d.ts");
await writeFile(
bundlePath,
`${fixtureDescriptor}export const run = async function* (input) {
yield { role: "a", content: "x", meta: {} };
`${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, "x");
yield { role: "a", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "x" };
}
`,
@@ -240,8 +252,10 @@ export const run = async function* (input) {
const bundlePath = join(bundleDir, "demo.esm.js");
await writeFile(
bundlePath,
`${fixtureDescriptor}export const run = async function* (input) {
yield { role: "a", content: "x", meta: {} };
`${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, "x");
yield { role: "a", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "x" };
}
`,
@@ -261,13 +275,17 @@ export const run = async function* (input) {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
const v1 = `${fixtureDescriptor}export const run = async function* (input) {
yield { role: "a", content: "v1", meta: {} };
const v1 = `${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, "v1");
yield { role: "a", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "v1" };
}
`;
const v2 = `${fixtureDescriptor}export const run = async function* (input) {
yield { role: "a", content: "v2", meta: {} };
const v2 = `${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, "v2");
yield { role: "a", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "v2" };
}
`;
@@ -299,13 +317,17 @@ export const run = async function* (input) {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
const v1 = `${fixtureDescriptor}export const run = async function* (input) {
yield { role: "a", content: "v1", meta: {} };
const v1 = `${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, "v1");
yield { role: "a", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "v1" };
}
`;
const v2 = `${fixtureDescriptor}export const run = async function* (input) {
yield { role: "a", content: "v2", meta: {} };
const v2 = `${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, "v2");
yield { role: "a", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "v2" };
}
`;
@@ -347,8 +369,10 @@ export const run = async function* (input) {
const bundlePath = join(bundleDir, "demo.esm.js");
await writeFile(
bundlePath,
`${fixtureDescriptor}export const run = async function* (input) {
yield { role: "a", content: "x", meta: {} };
`${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, "x");
yield { role: "a", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "x" };
}
`,
@@ -358,8 +382,10 @@ export const run = async function* (input) {
expect(add1.ok).toBe(true);
await writeFile(
bundlePath,
`${fixtureDescriptor}export const run = async function* (input) {
yield { role: "a", content: "y", meta: {} };
`${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, "y");
yield { role: "a", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "y" };
}
`,
@@ -409,8 +435,10 @@ export const run = async function* (input) {
const bundlePath = join(bundleDir, "demo.esm.js");
await writeFile(
bundlePath,
`${fixtureDescriptor}export const run = async function* (input) {
yield { role: "a", content: "x", meta: {} };
`${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, "x");
yield { role: "a", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "x" };
}
`,
@@ -424,8 +452,10 @@ export const run = async function* (input) {
const hash1 = add1.value.hash;
await writeFile(
bundlePath,
`${fixtureDescriptor}export const run = async function* (input) {
yield { role: "a", content: "y", meta: {} };
`${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, "y");
yield { role: "a", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "y" };
}
`,
@@ -2,6 +2,7 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createCasStore, getContentMerklePayload, getGlobalCasDir } from "@uncaged/workflow";
import { cmdAdd } from "../src/cmd-add.js";
import { cmdFork } from "../src/cmd-fork.js";
import { cmdRun } from "../src/cmd-run.js";
@@ -9,7 +10,9 @@ import { pathExists } from "../src/fs-utils.js";
import { addCliArgs } from "./bundle-fixture.js";
/** Three-role workflow that respects `input.steps` for fork/resume. */
const threeRoleBundleSource = `export const descriptor = {
const threeRoleBundleSource = `import { putContentMerkleNode } from "@uncaged/workflow";
export const descriptor = {
description: "fork-cli",
roles: {
planner: { description: "planner", schema: {} },
@@ -17,20 +20,21 @@ const threeRoleBundleSource = `export const descriptor = {
reviewer: { description: "reviewer", schema: {} },
},
};
export const run = async function* (input) {
export const run = async function* (input, options) {
const cas = options.cas;
const has = (r) => input.steps.some((s) => s.role === r);
if (!has("planner")) {
yield { role: "planner", content: "p1", meta: { k: "planner" } };
const h = await putContentMerkleNode(cas, "p1");
yield { role: "planner", contentHash: h, meta: { k: "planner" }, refs: [h] };
}
if (!has("coder")) {
yield { role: "coder", content: "c1", meta: { k: "coder" } };
const h = await putContentMerkleNode(cas, "c1");
yield { role: "coder", contentHash: h, meta: { k: "coder" }, refs: [h] };
}
if (!has("reviewer")) {
yield {
role: "reviewer",
content: "rev-" + String(input.steps.length),
meta: { k: "reviewer" },
};
const body = "rev-" + String(input.steps.length);
const h = await putContentMerkleNode(cas, body);
yield { role: "reviewer", contentHash: h, meta: { k: "reviewer" }, refs: [h] };
}
return { returnCode: 0, summary: "done" };
};
@@ -132,7 +136,8 @@ describe("cli fork", () => {
const last = JSON.parse(lines[lines.length - 1] ?? "{}") as Record<string, unknown>;
expect(last.role).toBe("reviewer");
expect(last.content).toBe("rev-1");
const cas = createCasStore(getGlobalCasDir(storageRoot));
expect(await getContentMerklePayload(cas, String(last.contentHash))).toBe("rev-1");
});
test("fork without --from-role retries last role", async () => {
@@ -179,11 +184,12 @@ describe("cli fork", () => {
const replayCoder = JSON.parse(lines[2] ?? "{}") as Record<string, unknown>;
expect(replayCoder.role).toBe("coder");
expect(replayCoder.content).toBe("c1");
const cas = createCasStore(getGlobalCasDir(storageRoot));
expect(await getContentMerklePayload(cas, String(replayCoder.contentHash))).toBe("c1");
const last = JSON.parse(lines[lines.length - 1] ?? "{}") as Record<string, unknown>;
expect(last.role).toBe("reviewer");
expect(last.content).toBe("rev-2");
expect(await getContentMerklePayload(cas, String(last.contentHash))).toBe("rev-2");
});
test("fork rejects unknown role with available names", async () => {
+43 -25
View File
@@ -4,31 +4,43 @@ import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { fileURLToPath } from "node:url";
import { createCasStore, garbageCollectCas, getGlobalCasDir } from "@uncaged/workflow";
import {
createCasStore,
garbageCollectCas,
getGlobalCasDir,
putContentMerkleNode,
} from "@uncaged/workflow";
import { cmdThreadRemove } from "../src/cmd-thread.js";
import { pathExists } from "../src/fs-utils.js";
const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url));
/** Minimal valid `.data.jsonl` with one role step referencing `activeHash` in `refs`. */
function makeDataJsonl(threadId: string, bundleHash: string, activeHash: string): string {
return [
async function writeDemoDataJsonl(params: {
path: string;
threadId: string;
bundleHash: string;
cas: ReturnType<typeof createCasStore>;
activeHash: string;
}): Promise<void> {
const bodyHash = await putContentMerkleNode(params.cas, "p");
const text = [
JSON.stringify({
name: "demo",
hash: bundleHash,
threadId,
hash: params.bundleHash,
threadId: params.threadId,
parameters: { prompt: "hi", options: { maxRounds: 5 } },
timestamp: 100,
}),
JSON.stringify({
role: "planner",
content: "p",
contentHash: bodyHash,
meta: {},
refs: [activeHash],
refs: [params.activeHash, bodyHash],
timestamp: 101,
}),
"",
].join("\n");
await writeFile(params.path, text, "utf8");
}
describe("gc cli and garbageCollectCas", () => {
@@ -60,11 +72,13 @@ describe("gc cli and garbageCollectCas", () => {
const activeHash = await cas.put("active-blob");
const orphanHash = await cas.put("orphan-blob");
await writeFile(
join(logsDir, `${threadId}.data.jsonl`),
makeDataJsonl(threadId, bundleHash, activeHash),
"utf8",
);
await writeDemoDataJsonl({
path: join(logsDir, `${threadId}.data.jsonl`),
threadId,
bundleHash,
cas,
activeHash,
});
const gc = await garbageCollectCas(storageRoot);
expect(gc.ok).toBe(true);
@@ -72,7 +86,7 @@ describe("gc cli and garbageCollectCas", () => {
return;
}
expect(gc.value.scannedThreads).toBe(1);
expect(gc.value.activeRefs).toBe(1);
expect(gc.value.activeRefs).toBe(2);
expect(gc.value.deletedEntries).toBe(1);
expect(gc.value.deletedHashes).toEqual([orphanHash]);
@@ -106,16 +120,18 @@ describe("gc cli and garbageCollectCas", () => {
const activeHash = await cas.put("keep-me");
await cas.put("drop-me");
await writeFile(
join(logsDir, `${threadId}.data.jsonl`),
makeDataJsonl(threadId, bundleHash, activeHash),
"utf8",
);
await writeDemoDataJsonl({
path: join(logsDir, `${threadId}.data.jsonl`),
threadId,
bundleHash,
cas,
activeHash,
});
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const proc = spawnSync(process.execPath, [cliEntryPath, "gc"], { env, encoding: "utf8" });
expect(proc.status).toBe(0);
expect(String(proc.stdout).trim()).toBe("scanned 1 threads, 1 active refs, deleted 1 entries");
expect(String(proc.stdout).trim()).toBe("scanned 1 threads, 2 active refs, deleted 1 entries");
});
test("thread rm triggers gc so unreferenced CAS is removed", async () => {
@@ -126,11 +142,13 @@ describe("gc cli and garbageCollectCas", () => {
const cas = createCasStore(getGlobalCasDir(storageRoot));
const activeHash = await cas.put("pinned-by-ref");
await writeFile(
join(logsDir, `${threadId}.data.jsonl`),
makeDataJsonl(threadId, bundleHash, activeHash),
"utf8",
);
await writeDemoDataJsonl({
path: join(logsDir, `${threadId}.data.jsonl`),
threadId,
bundleHash,
cas,
activeHash,
});
const orphanHash = await cas.put("orphan-after-rm");
const orphanPath = join(getGlobalCasDir(storageRoot), `${orphanHash}.txt`);
@@ -17,6 +17,9 @@ import { cmdThreads } from "../src/cmd-threads.js";
import { pathExists, readTextFileIfExists } from "../src/fs-utils.js";
import { addCliArgs } from "./bundle-fixture.js";
const wfPutImport = `import { putContentMerkleNode } from "@uncaged/workflow";
`;
const threadFixtureDescriptor = `export const descriptor = {
description: "thread-cli",
roles: {
@@ -31,18 +34,26 @@ const threadFixtureDescriptor = `export const descriptor = {
`;
const fastBundleSource = `${threadFixtureDescriptor}
export const run = async function* (input) {
yield { role: "planner", content: "plan", meta: { plan: input.prompt } };
yield { role: "coder", content: "code", meta: { diff: "y" } };
${wfPutImport}
export const run = async function* (input, options) {
const cas = options.cas;
let h = await putContentMerkleNode(cas, "plan");
yield { role: "planner", contentHash: h, meta: { plan: input.prompt }, refs: [h] };
h = await putContentMerkleNode(cas, "code");
yield { role: "coder", contentHash: h, meta: { diff: "y" }, refs: [h] };
return { returnCode: 0, summary: "done" };
};
`;
const slowPlannerBundleSource = `${threadFixtureDescriptor}
export const run = async function* (input) {
${wfPutImport}
export const run = async function* (input, options) {
await new Promise((r) => setTimeout(r, 400));
yield { role: "planner", content: "plan", meta: { plan: input.prompt } };
yield { role: "coder", content: "code", meta: { diff: "y" } };
const cas = options.cas;
let h = await putContentMerkleNode(cas, "plan");
yield { role: "planner", contentHash: h, meta: { plan: input.prompt }, refs: [h] };
h = await putContentMerkleNode(cas, "code");
yield { role: "coder", contentHash: h, meta: { diff: "y" }, refs: [h] };
return { returnCode: 0, summary: "done" };
};
`;
@@ -50,27 +61,38 @@ export const run = async function* (input) {
const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url));
const abortablePlannerBundleSource = `${threadFixtureDescriptor}
export const run = async function* (input) {
${wfPutImport}
export const run = async function* (input, options) {
await new Promise((r) => setTimeout(r, 600));
yield { role: "planner", content: "plan", meta: { plan: input.prompt } };
yield { role: "coder", content: "code", meta: { diff: "y" } };
const cas = options.cas;
let h = await putContentMerkleNode(cas, "plan");
yield { role: "planner", contentHash: h, meta: { plan: input.prompt }, refs: [h] };
h = await putContentMerkleNode(cas, "code");
yield { role: "coder", contentHash: h, meta: { diff: "y" }, refs: [h] };
return { returnCode: 0, summary: "done" };
};
`;
const pauseResumeBundleSource = `${threadFixtureDescriptor}
export const run = async function* (input) {
yield { role: "first", content: "f", meta: {} };
${wfPutImport}
export const run = async function* (_input, options) {
const cas = options.cas;
let h = await putContentMerkleNode(cas, "f");
yield { role: "first", contentHash: h, meta: {}, refs: [h] };
await new Promise((r) => setTimeout(r, 1500));
yield { role: "second", content: "s", meta: {} };
h = await putContentMerkleNode(cas, "s");
yield { role: "second", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "done" };
};
`;
const delayedFirstYieldBundleSource = `${threadFixtureDescriptor}
export const run = async function* (input) {
${wfPutImport}
export const run = async function* (_input, options) {
await new Promise((r) => setTimeout(r, 900));
yield { role: "only", content: "x", meta: {} };
const cas = options.cas;
const h = await putContentMerkleNode(cas, "x");
yield { role: "only", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "done" };
};
`;
+1 -1
View File
@@ -192,7 +192,7 @@ export async function cmdAdd(
return validated;
}
const extracted = await extractBundleExports(resolvedPath);
const extracted = await extractBundleExports(resolvedPath, { storageRoot });
if (!extracted.ok) {
return extracted;
}
+2 -1
View File
@@ -65,8 +65,9 @@ export async function cmdFork(
const newThreadId = generateUlid(Date.now());
const stepsOnWire = plan.value.historicalSteps.map((s) => ({
role: s.role,
content: s.content,
contentHash: s.contentHash,
meta: s.meta,
refs: s.refs,
timestamp: s.timestamp,
}));
+1 -1
View File
@@ -54,7 +54,7 @@ export function createCursorAgent(config: CursorAgentConfig): AgentFn {
"From the thread context, determine the absolute filesystem path where the project/repository is located.",
extractCtx,
);
const fullPrompt = buildAgentPrompt(ctx);
const fullPrompt = await buildAgentPrompt(ctx);
const args = [
"-p",
fullPrompt,
+1 -1
View File
@@ -35,7 +35,7 @@ export function createHermesAgent(config: HermesAgentConfig): AgentFn {
const timeoutMs = config.timeout;
return async (ctx) => {
const fullPrompt = buildAgentPrompt(ctx);
const fullPrompt = await buildAgentPrompt(ctx);
const args = [
"chat",
"-q",
@@ -1,8 +1,14 @@
import { describe, expect, test } from "bun:test";
import { START, type ThreadContext } from "@uncaged/workflow";
import { mkdtempSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createCasStore, START, type ThreadContext } from "@uncaged/workflow";
import { createLlmAdapter } from "../src/create-llm-adapter.js";
const casDir = mkdtempSync(join(tmpdir(), "wf-llm-adapter-cas-"));
const testCas = createCasStore(casDir);
function makeCtx(userContent: string): ThreadContext {
return {
start: {
@@ -15,6 +21,7 @@ function makeCtx(userContent: string): ThreadContext {
steps: [],
threadId: "01TEST000000000000000000TR",
currentRole: { name: "planner", systemPrompt: "system instructions" },
cas: testCas,
};
}
@@ -1,5 +1,9 @@
import { afterEach, describe, expect, test } from "bun:test";
import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import {
createCasStore,
createExtract,
END,
type ModeratorContext,
@@ -113,7 +117,7 @@ function makeCtx(
function preparerStep(): RoleStep<SolveIssueMeta> {
return {
role: "preparer",
content: "prepared",
contentHash: "STUBHASHPREPARER01",
meta: {
repoPath: "/home/user/repos/test",
defaultBranch: "main",
@@ -133,7 +137,7 @@ function preparerStep(): RoleStep<SolveIssueMeta> {
function plannerStep(phases: PlannerMeta["phases"] = DEFAULT_PHASES): RoleStep<SolveIssueMeta> {
return {
role: "planner",
content: "plan",
contentHash: "STUBHASHPLANNER001",
meta: { phases },
refs: phases.map((p) => p.hash),
timestamp: 1,
@@ -143,7 +147,7 @@ function plannerStep(phases: PlannerMeta["phases"] = DEFAULT_PHASES): RoleStep<S
function coderStep(completedPhase = "4KNMR2PX"): RoleStep<SolveIssueMeta> {
return {
role: "coder",
content: "code",
contentHash: "STUBHASHCODER00001",
meta: { completedPhase, filesChanged: ["a.ts"], summary: "fixed" },
refs: [completedPhase],
timestamp: 2,
@@ -153,7 +157,7 @@ function coderStep(completedPhase = "4KNMR2PX"): RoleStep<SolveIssueMeta> {
function reviewerStep(approved: boolean): RoleStep<SolveIssueMeta> {
return {
role: "reviewer",
content: "rev",
contentHash: "STUBHASHREVIEWER01",
meta: approved
? { status: "approved" as const }
: { status: "rejected" as const, issues: ["needs fix"] },
@@ -165,7 +169,7 @@ function reviewerStep(approved: boolean): RoleStep<SolveIssueMeta> {
function committerStep(): RoleStep<SolveIssueMeta> {
return {
role: "committer",
content: "commit",
contentHash: "STUBHASHCOMMITTER1",
meta: { status: "committed", branch: "feat/issue-1", commitSha: "abc1234" },
refs: [],
timestamp: 4,
@@ -281,10 +285,15 @@ describe("solveIssueModerator", () => {
describe("createSolveIssueRun", () => {
let restoreFetch: (() => void) | null = null;
let casDir: string | undefined;
afterEach(() => {
afterEach(async () => {
restoreFetch?.();
restoreFetch = null;
if (casDir !== undefined) {
await rm(casDir, { recursive: true, force: true }).catch(() => {});
casDir = undefined;
}
});
test("structured extraction yields preparer then planner meta from mocked chat completions", async () => {
@@ -301,10 +310,13 @@ describe("createSolveIssueRun", () => {
};
restoreFetch = installMockChatCompletions([EXPECT_PREPARER_META, EXPECT_PLANNER_META]);
casDir = await mkdtemp(join(tmpdir(), "solve-issue-cas-"));
const cas = createCasStore(casDir);
const run = createSolveIssueRun({ agent: async () => "" }, stubExtract);
const gen = run(
{ prompt: "task", steps: [] },
{ threadId: "01TEST000000000000000000TR", maxRounds: 20, depth: 0 },
{ threadId: "01TEST000000000000000000TR", maxRounds: 20, depth: 0, cas },
);
const first = await gen.next();
expect(first.done).toBe(false);
@@ -336,6 +348,9 @@ describe("createSolveIssueRun", () => {
EXPECT_CODER_META,
]);
casDir = await mkdtemp(join(tmpdir(), "solve-issue-cas-"));
const cas = createCasStore(casDir);
const calls: string[] = [];
const run = createSolveIssueRun(
{
@@ -362,7 +377,7 @@ describe("createSolveIssueRun", () => {
);
const gen = run(
{ prompt: "task", steps: [] },
{ threadId: "01TEST000000000000000000TR", maxRounds: 20, depth: 0 },
{ threadId: "01TEST000000000000000000TR", maxRounds: 20, depth: 0, cas },
);
await gen.next();
expect(calls).toEqual(["preparer"]);
@@ -1,5 +1,8 @@
import { describe, expect, test } from "bun:test";
import { START, type ThreadContext } from "@uncaged/workflow";
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createCasStore, putContentMerkleNode, START, type ThreadContext } from "@uncaged/workflow";
import { buildAgentPrompt } from "../src/index.js";
@@ -13,38 +16,53 @@ function startTask(content: string): ThreadContext["start"] {
}
describe("buildAgentPrompt", () => {
test("includes system prompt and full task; omits tools when there are no steps", () => {
let casRoot: string;
beforeEach(async () => {
casRoot = await mkdtemp(join(tmpdir(), "wf-build-prompt-cas-"));
});
afterEach(async () => {
await rm(casRoot, { recursive: true, force: true });
});
test("includes system prompt and full task; omits tools when there are no steps", async () => {
const cas = createCasStore(casRoot);
const ctx: ThreadContext = {
start: startTask("fix the bug"),
depth: 0,
steps: [],
threadId: "01TEST000000000000000000TR",
currentRole: { name: START, systemPrompt: "You are an agent." },
cas,
};
const text = buildAgentPrompt(ctx);
const text = await buildAgentPrompt(ctx);
expect(text).toContain("You are an agent.");
expect(text).toContain("## Task");
expect(text).toContain("fix the bug");
expect(text).not.toContain("## Tools");
});
test("single step shows full content and meta, and includes tools", () => {
test("single step shows full content and meta, and includes tools", async () => {
const cas = createCasStore(casRoot);
const onlyHash = await putContentMerkleNode(cas, "only step full body");
const ctx: ThreadContext = {
start: startTask("user task"),
depth: 0,
threadId: "01TEST000000000000000000TR",
currentRole: { name: "coder", systemPrompt: "Be helpful." },
cas,
steps: [
{
role: "coder",
content: "only step full body",
contentHash: onlyHash,
meta: { files: ["a.ts"] },
refs: [],
refs: [onlyHash],
timestamp: 2,
},
],
};
const text = buildAgentPrompt(ctx);
const text = await buildAgentPrompt(ctx);
expect(text).toContain("## Task");
expect(text).toContain("user task");
expect(text).toContain("## Step: coder");
@@ -54,30 +72,34 @@ describe("buildAgentPrompt", () => {
expect(text).toContain("uncaged-workflow thread 01TEST000000000000000000TR");
});
test("two or more steps: previous steps are meta-only; latest step is full", () => {
test("two or more steps: previous steps are meta-only; latest step is full", async () => {
const cas = createCasStore(casRoot);
const plannerHash = await putContentMerkleNode(cas, "PLANNER_SECRET_FULL_TEXT");
const coderHash = await putContentMerkleNode(cas, "last step full content");
const ctx: ThreadContext = {
start: startTask("first message full: task content here"),
depth: 0,
threadId: "01TEST000000000000000000TR",
currentRole: { name: "coder", systemPrompt: "System." },
cas,
steps: [
{
role: "planner",
content: "PLANNER_SECRET_FULL_TEXT",
contentHash: plannerHash,
meta: { plan: "short" },
refs: [],
refs: [plannerHash],
timestamp: 2,
},
{
role: "coder",
content: "last step full content",
contentHash: coderHash,
meta: { done: true },
refs: [],
refs: [coderHash],
timestamp: 3,
},
],
};
const text = buildAgentPrompt(ctx);
const text = await buildAgentPrompt(ctx);
expect(text).toContain("first message full: task content here");
expect(text).toContain("## Previous Steps");
expect(text).toContain("### Step 1: planner");
@@ -90,37 +112,42 @@ describe("buildAgentPrompt", () => {
expect(text).toContain("uncaged-workflow thread 01TEST000000000000000000TR");
});
test("middle steps show meta summary only, not full content", () => {
test("middle steps show meta summary only, not full content", async () => {
const cas = createCasStore(casRoot);
const ha = await putContentMerkleNode(cas, "HIDDEN_A");
const hb = await putContentMerkleNode(cas, "HIDDEN_B_MIDDLE");
const hc = await putContentMerkleNode(cas, "VISIBLE_LAST");
const ctx: ThreadContext = {
start: startTask("start"),
depth: 0,
threadId: "01TEST000000000000000000TR",
currentRole: { name: "c", systemPrompt: "S" },
cas,
steps: [
{
role: "a",
content: "HIDDEN_A",
contentHash: ha,
meta: { n: 1 },
refs: [],
refs: [ha],
timestamp: 2,
},
{
role: "b",
content: "HIDDEN_B_MIDDLE",
contentHash: hb,
meta: { n: 2 },
refs: [],
refs: [hb],
timestamp: 3,
},
{
role: "c",
content: "VISIBLE_LAST",
contentHash: hc,
meta: { n: 3 },
refs: [],
refs: [hc],
timestamp: 4,
},
],
};
const text = buildAgentPrompt(ctx);
const text = await buildAgentPrompt(ctx);
expect(text).not.toContain("HIDDEN_A");
expect(text).not.toContain("HIDDEN_B_MIDDLE");
expect(text).toContain('Summary: {"n":1}');
@@ -1,7 +1,16 @@
import type { AgentContext } from "@uncaged/workflow";
import { getContentMerklePayload } from "@uncaged/workflow";
async function resolveStepText(ctx: AgentContext, contentHash: string): Promise<string> {
const text = await getContentMerklePayload(ctx.cas, contentHash);
if (text === null) {
throw new Error(`buildAgentPrompt: missing CAS blob for ${contentHash}`);
}
return text;
}
/** Builds the full agent prompt: system instructions plus summarized thread history. */
export function buildAgentPrompt(ctx: AgentContext): string {
export async function buildAgentPrompt(ctx: AgentContext): Promise<string> {
const lines: string[] = [];
lines.push(ctx.currentRole.systemPrompt);
lines.push("");
@@ -15,10 +24,11 @@ export function buildAgentPrompt(ctx: AgentContext): string {
if (steps.length === 1) {
const s = steps[0];
const body = await resolveStepText(ctx, s.contentHash);
lines.push("");
lines.push(`## Step: ${s.role}`);
lines.push("");
lines.push(s.content);
lines.push(body);
lines.push("");
lines.push(`Meta: ${JSON.stringify(s.meta)}`);
} else {
@@ -31,10 +41,11 @@ export function buildAgentPrompt(ctx: AgentContext): string {
lines.push(`Summary: ${JSON.stringify(s.meta)}`);
}
const last = steps[steps.length - 1];
const lastBody = await resolveStepText(ctx, last.contentHash);
lines.push("");
lines.push(`## Latest Step: ${last.role}`);
lines.push("");
lines.push(last.content);
lines.push(lastBody);
lines.push("");
lines.push(`Meta: ${JSON.stringify(last.meta)}`);
}
@@ -26,6 +26,19 @@ export const run = async function* (input) {
expect(r.ok).toBe(true);
});
test("allows static import of @uncaged/workflow", () => {
const source = `${minimalDescriptor}import { putContentMerkleNode } from "@uncaged/workflow";
export const run = async function* (_input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, "x");
return { returnCode: 0, summary: h };
};
`;
const r = validateWorkflowBundle({ filePath: "/tmp/w.esm.js", source });
expect(r.ok).toBe(true);
});
test("rejects wrong filename suffix", () => {
const r = validateWorkflowBundle({
filePath: "/tmp/w.js",
+24 -12
View File
@@ -4,10 +4,16 @@ import { tmpdir } from "node:os";
import { join } from "node:path";
import * as z from "zod/v4";
import { createCasStore } from "../src/cas.js";
import { createWorkflow } from "../src/create-workflow.js";
import { executeThread } from "../src/engine.js";
import { createExtract } from "../src/extract-fn.js";
import { createLogger } from "../src/logger.js";
import {
createContentMerkleNode,
getContentMerklePayload,
serializeMerkleNode,
} from "../src/merkle.js";
import { END } from "../src/types.js";
const plannerMetaSchema = z.object({
@@ -140,6 +146,7 @@ describe("executeThread", () => {
const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`);
const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`);
await mkdir(join(root, "logs", hash), { recursive: true });
const cas = createCasStore(join(root, "cas"));
const logger = createLogger({ sink: { kind: "file", path: infoPath } });
const ac = new AbortController();
@@ -156,7 +163,7 @@ describe("executeThread", () => {
forkSourceThreadId: null,
prefilledDiskSteps: null,
},
{ threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath },
{ threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath, cas },
logger,
);
@@ -184,14 +191,15 @@ describe("executeThread", () => {
const role1 = JSON.parse(lines[1] ?? "{}") as Record<string, unknown>;
expect(role1.role).toBe("planner");
expect(role1.content).toBe("plan-body");
expect(typeof role1.contentHash).toBe("string");
expect(await getContentMerklePayload(cas, String(role1.contentHash))).toBe("plan-body");
expect(role1.meta).toEqual({ plan: "do-it", files: ["a.ts"] });
expect(role1.refs).toEqual([]);
expect(role1.refs).toEqual([role1.contentHash]);
expect(typeof role1.timestamp).toBe("number");
const role2 = JSON.parse(lines[2] ?? "{}") as Record<string, unknown>;
expect(role2.role).toBe("coder");
expect(role2.refs).toEqual([]);
expect(role2.refs).toEqual([role2.contentHash]);
const infoText = await readFile(infoPath, "utf8");
const infoLines = infoText
@@ -219,11 +227,14 @@ describe("executeThread", () => {
const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`);
const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`);
await mkdir(join(root, "logs", hash), { recursive: true });
const cas = createCasStore(join(root, "cas"));
const plannerHash = await cas.put(serializeMerkleNode(createContentMerkleNode("plan-body")));
const logger = createLogger({ sink: { kind: "file", path: infoPath } });
const ac = new AbortController();
const histTs = 9_000_000;
const mergedPlannerRefs = ["CAS111AAAAAAA", plannerHash];
const result = await executeThread(
demoWorkflow,
"demo-flow",
@@ -232,9 +243,9 @@ describe("executeThread", () => {
steps: [
{
role: "planner",
content: "plan-body",
contentHash: plannerHash,
meta: { plan: "do-it", files: ["a.ts"] },
refs: ["CAS111AAAAAAA"],
refs: mergedPlannerRefs,
},
],
},
@@ -247,14 +258,14 @@ describe("executeThread", () => {
prefilledDiskSteps: [
{
role: "planner",
content: "plan-body",
contentHash: plannerHash,
meta: { plan: "do-it", files: ["a.ts"] },
refs: ["CAS111AAAAAAA"],
refs: mergedPlannerRefs,
timestamp: histTs,
},
],
},
{ threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath },
{ threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath, cas },
logger,
);
@@ -273,11 +284,11 @@ describe("executeThread", () => {
const role0 = JSON.parse(lines[1] ?? "{}") as Record<string, unknown>;
expect(role0.role).toBe("planner");
expect(role0.timestamp).toBe(histTs);
expect(role0.refs).toEqual(["CAS111AAAAAAA"]);
expect(role0.refs).toEqual(mergedPlannerRefs);
const role1 = JSON.parse(lines[2] ?? "{}") as Record<string, unknown>;
expect(role1.role).toBe("coder");
expect(role1.content).toBe("code-body");
expect(await getContentMerklePayload(cas, String(role1.contentHash))).toBe("code-body");
} finally {
await rm(root, { recursive: true, force: true });
}
@@ -291,6 +302,7 @@ describe("executeThread", () => {
const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`);
const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`);
await mkdir(join(root, "logs", hash), { recursive: true });
const cas = createCasStore(join(root, "cas"));
const logger = createLogger({ sink: { kind: "file", path: infoPath } });
const ac = new AbortController();
@@ -307,7 +319,7 @@ describe("executeThread", () => {
forkSourceThreadId: null,
prefilledDiskSteps: null,
},
{ threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath },
{ threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath, cas },
logger,
);
@@ -7,9 +7,9 @@ import {
} from "../src/fork-thread.js";
const sampleDataJsonl = `{"name":"demo","hash":"C9NMV6V2TQT81","threadId":"01AAA1111111111111111111","parameters":{"prompt":"hi","options":{"maxRounds":5}},"timestamp":100}
{"role":"planner","content":"p","meta":{},"timestamp":101}
{"role":"coder","content":"c","meta":{},"timestamp":102}
{"role":"reviewer","content":"r","meta":{},"timestamp":103}
{"role":"planner","contentHash":"HP0000000000000000000001","meta":{},"refs":[],"timestamp":101}
{"role":"coder","contentHash":"HP0000000000000000000002","meta":{},"refs":[],"timestamp":102}
{"role":"reviewer","contentHash":"HP0000000000000000000003","meta":{},"refs":[],"timestamp":103}
`;
describe("fork-thread", () => {
@@ -89,7 +89,7 @@ describe("fork-thread", () => {
test("parseThreadDataJsonl reads explicit depth from start record", () => {
const text = `{"name":"demo","hash":"H","threadId":"01ZZZZZZZZZZZZZZZZZZZZZZ","parameters":{"prompt":"p","options":{"maxRounds":3,"depth":2}},"timestamp":1}
{"role":"planner","content":"x","meta":{},"timestamp":2}
{"role":"planner","contentHash":"HP0000000000000000000099","meta":{},"refs":[],"timestamp":2}
`;
const r = parseThreadDataJsonl(text);
expect(r.ok).toBe(true);
@@ -0,0 +1,29 @@
import { describe, expect, test } from "bun:test";
import { createContentMerkleNode, parseMerkleNode, serializeMerkleNode } from "../src/merkle.js";
describe("merkle", () => {
test("content node roundtrips through YAML", () => {
const node = createContentMerkleNode("hello\nworld");
const yaml = serializeMerkleNode(node);
const back = parseMerkleNode(yaml);
expect(back).toEqual(node);
});
test("step node with object payload roundtrips", () => {
const node = {
type: "step" as const,
payload: { role: "planner", foo: 1 },
children: ["ABC123", "DEF456"],
};
const yaml = serializeMerkleNode(node);
const back = parseMerkleNode(yaml);
expect(back.type).toBe("step");
expect(back.payload).toEqual({ role: "planner", foo: 1 });
expect(back.children).toEqual(["ABC123", "DEF456"]);
});
test("parse rejects invalid YAML root", () => {
expect(() => parseMerkleNode("[]")).toThrow();
});
});
@@ -4,6 +4,7 @@ import { tmpdir } from "node:os";
import { join } from "node:path";
import * as z from "zod/v4";
import { createCasStore } from "../src/cas.js";
import { createWorkflow } from "../src/create-workflow.js";
import { executeThread } from "../src/engine.js";
import { createExtract } from "../src/extract-fn.js";
@@ -110,8 +111,8 @@ describe("RoleStep refs tracking", () => {
test("parseThreadDataJsonl reads refs and defaults missing refs to []", () => {
const text = `{"name":"demo","hash":"C9NMV6V2TQT81","threadId":"01AAA1111111111111111111","parameters":{"prompt":"hi","options":{"maxRounds":5}},"timestamp":100}
{"role":"planner","content":"p","meta":{},"refs":["H111AAAAAAAAA","H222AAAAAAAAA"],"timestamp":101}
{"role":"coder","content":"c","meta":{},"timestamp":102}
{"role":"planner","contentHash":"HPAYLOAD111111","meta":{},"refs":["H111AAAAAAAAA","H222AAAAAAAAA"],"timestamp":101}
{"role":"coder","contentHash":"HPAYLOAD222222","meta":{},"timestamp":102}
`;
const r = parseThreadDataJsonl(text);
expect(r.ok).toBe(true);
@@ -139,6 +140,7 @@ describe("RoleStep refs tracking", () => {
const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`);
const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`);
await mkdir(join(root, "logs", hash), { recursive: true });
const cas = createCasStore(join(root, "cas"));
const logger = createLogger({ sink: { kind: "file", path: infoPath } });
const ac = new AbortController();
@@ -155,7 +157,7 @@ describe("RoleStep refs tracking", () => {
forkSourceThreadId: null,
prefilledDiskSteps: null,
},
{ threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath },
{ threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath, cas },
logger,
);
@@ -170,7 +172,12 @@ describe("RoleStep refs tracking", () => {
const role1 = JSON.parse(lines[1] ?? "{}") as Record<string, unknown>;
expect(role1.role).toBe("planner");
expect(role1.refs).toEqual(["C9NMV6V2TQT81", "C9NMV6V2TQT82"]);
const refs = role1.refs as string[];
expect(refs).toContain("C9NMV6V2TQT81");
expect(refs).toContain("C9NMV6V2TQT82");
expect(typeof role1.contentHash).toBe("string");
expect(refs).toContain(String(role1.contentHash));
expect(refs.length).toBe(3);
} finally {
await rm(root, { recursive: true, force: true });
}
@@ -178,8 +185,8 @@ describe("RoleStep refs tracking", () => {
test("buildForkPlan carries refs on historical steps", () => {
const text = `{"name":"demo","hash":"C9NMV6V2TQT81","threadId":"01AAA1111111111111111111","parameters":{"prompt":"hi","options":{"maxRounds":5}},"timestamp":100}
{"role":"planner","content":"p","meta":{},"refs":["KEEPREFAAAAAA"],"timestamp":101}
{"role":"coder","content":"c","meta":{},"refs":["CODERHASHAAAA"],"timestamp":102}
{"role":"planner","contentHash":"HP111111111111","meta":{},"refs":["KEEPREFAAAAAA"],"timestamp":101}
{"role":"coder","contentHash":"HP222222222222","meta":{},"refs":["CODERHASHAAAA"],"timestamp":102}
`;
const plan = buildForkPlan(text, null);
expect(plan.ok).toBe(true);
@@ -18,7 +18,7 @@ describe("RFC-001 thread JSONL shapes", () => {
const roleRecord = {
role: "planner",
content: "Plan: modify auth middleware...",
contentHash: "CPHASH000000000000000001",
meta: { plan: "...", files: ["src/auth.ts"] },
refs: [] as string[],
timestamp: 1714963201000,
@@ -28,7 +28,7 @@ describe("RFC-001 thread JSONL shapes", () => {
["hash", "name", "parameters", "threadId", "timestamp"].sort(),
);
expect(Object.keys(roleRecord).sort()).toEqual(
["content", "meta", "refs", "role", "timestamp"].sort(),
["contentHash", "meta", "refs", "role", "timestamp"].sort(),
);
});
+20 -7
View File
@@ -5,22 +5,29 @@ import { createConnection } from "node:net";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createCasStore } from "../src/cas.js";
import { createContentMerkleNode, serializeMerkleNode } from "../src/merkle.js";
import { getWorkerHostScriptPath } from "../src/worker-entry-path.js";
const bundleSource = `export const descriptor = {
const bundleSource = `import { putContentMerkleNode } from "@uncaged/workflow";
export const descriptor = {
description: "worker-test",
roles: {
planner: { description: "planner", schema: {} },
coder: { description: "coder", schema: {} },
},
};
export const run = async function* (input) {
export const run = async function* (input, options) {
const cas = options.cas;
const has = (r) => input.steps.some((s) => s.role === r);
if (!has("planner")) {
yield { role: "planner", content: "p", meta: { plan: input.prompt } };
const h = await putContentMerkleNode(cas, "p");
yield { role: "planner", contentHash: h, meta: { plan: input.prompt }, refs: [h] };
}
if (!has("coder")) {
yield { role: "coder", content: "c", meta: { diff: "y" } };
const h = await putContentMerkleNode(cas, "c");
yield { role: "coder", contentHash: h, meta: { diff: "y" }, refs: [h] };
}
return { returnCode: 0, summary: "completed: moderator returned END" };
};
@@ -102,7 +109,7 @@ describe("worker process", () => {
threadId,
workflowName: "demo-flow",
prompt: "hello",
options: { maxRounds: 5 },
options: { maxRounds: 5, depth: 0 },
});
const exitCode: number = await new Promise((resolve) => {
@@ -143,6 +150,11 @@ describe("worker process", () => {
const port = await readReadyPort(child);
const cas = createCasStore(join(root, "cas"));
const plannerReplayHash = await cas.put(
serializeMerkleNode(createContentMerkleNode("p-old")),
);
const threadId = "01KQXKW18CT8G75T53R8F4G7YG";
const srcId = "01SRCMMMMMMMMMMMMMMMMMMMM";
await sendJson(port, {
@@ -150,12 +162,13 @@ describe("worker process", () => {
threadId,
workflowName: "demo-flow",
prompt: "hello",
options: { maxRounds: 5 },
options: { maxRounds: 5, depth: 0 },
steps: [
{
role: "planner",
content: "p-old",
contentHash: plannerReplayHash,
meta: { plan: "z" },
refs: [plannerReplayHash],
timestamp: 555,
},
],
@@ -4,11 +4,13 @@ import { tmpdir } from "node:os";
import { join } from "node:path";
import * as z from "zod/v4";
import { createCasStore } from "../src/cas.js";
import { createWorkflow } from "../src/create-workflow.js";
import { executeThread } from "../src/engine.js";
import { createExtract } from "../src/extract-fn.js";
import { hashWorkflowBundleBytes } from "../src/hash.js";
import { createLogger } from "../src/logger.js";
import { getContentMerklePayload } from "../src/merkle.js";
import {
readWorkflowRegistry,
registerWorkflowVersion,
@@ -80,7 +82,9 @@ const parentExtract = createExtract({
model: "test",
});
const childBundleSource = `export const descriptor = {
const childBundleSource = `import { putContentMerkleNode } from "@uncaged/workflow";
export const descriptor = {
description: "child-integration",
roles: {
agent: {
@@ -89,8 +93,10 @@ const childBundleSource = `export const descriptor = {
},
},
};
export async function* run(input) {
yield { role: "agent", content: "child-body", meta: {}, refs: [] };
export async function* run(input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, "child-body");
yield { role: "agent", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "child-done:" + input.prompt };
}
`;
@@ -149,6 +155,7 @@ describe("workflowAsAgent integration", () => {
const dataPath = join(root, "logs", parentHash, `${threadId}.data.jsonl`);
const infoPath = join(root, "logs", parentHash, `${threadId}.info.jsonl`);
await mkdir(join(root, "logs", parentHash), { recursive: true });
const cas = createCasStore(join(root, "cas"));
const logger = createLogger({ sink: { kind: "file", path: infoPath } });
const ac = new AbortController();
@@ -165,7 +172,7 @@ describe("workflowAsAgent integration", () => {
forkSourceThreadId: null,
prefilledDiskSteps: null,
},
{ threadId, hash: parentHash, dataJsonlPath: dataPath, infoJsonlPath: infoPath },
{ threadId, hash: parentHash, dataJsonlPath: dataPath, infoJsonlPath: infoPath, cas },
logger,
);
@@ -179,7 +186,9 @@ describe("workflowAsAgent integration", () => {
expect(parentLines.length).toBe(2);
const callerLine = JSON.parse(parentLines[1] ?? "{}") as Record<string, unknown>;
expect(callerLine.role).toBe("caller");
expect(callerLine.content).toBe("child-done:from-parent");
expect(await getContentMerklePayload(cas, String(callerLine.contentHash))).toBe(
"child-done:from-parent",
);
const childDir = join(root, "logs", childHash);
const childFiles = await readdir(childDir);
@@ -3,6 +3,7 @@ import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createCasStore } from "../src/cas.js";
import { hashWorkflowBundleBytes } from "../src/hash.js";
import {
readWorkflowRegistry,
@@ -12,7 +13,12 @@ import {
import { type AgentContext, START } from "../src/types.js";
import { workflowAsAgent } from "../src/workflow-as-agent.js";
function makeAgentCtx(params: { depth: number; prompt: string; maxRounds: number }): AgentContext {
function makeAgentCtx(params: {
storageRoot: string;
depth: number;
prompt: string;
maxRounds: number;
}): AgentContext {
const ts = Date.now();
return {
threadId: "01PARENT000000000000000001AA",
@@ -28,10 +34,13 @@ function makeAgentCtx(params: { depth: number; prompt: string; maxRounds: number
name: "caller",
systemPrompt: "caller",
},
cas: createCasStore(join(params.storageRoot, "agent-ctx-cas")),
};
}
const childBundleSource = `export const descriptor = {
const childBundleSource = `import { putContentMerkleNode } from "@uncaged/workflow";
export const descriptor = {
description: "child-test",
roles: {
agent: {
@@ -40,8 +49,10 @@ const childBundleSource = `export const descriptor = {
},
},
};
export async function* run(input) {
yield { role: "agent", content: "child-body", meta: {}, refs: [] };
export async function* run(input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, "child-body");
yield { role: "agent", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "child-done:" + input.prompt };
}
`;
@@ -68,7 +79,9 @@ describe("workflowAsAgent", () => {
const root = await mkdtemp(join(tmpdir(), "wf-waa-missing-"));
try {
const agent = workflowAsAgent("missing-wf", { storageRoot: root });
const out = await agent(makeAgentCtx({ depth: 0, prompt: "x", maxRounds: 5 }));
const out = await agent(
makeAgentCtx({ storageRoot: root, depth: 0, prompt: "x", maxRounds: 5 }),
);
expect(out).toContain("not found in registry");
expect(out).toContain("missing-wf");
} finally {
@@ -81,7 +94,9 @@ describe("workflowAsAgent", () => {
try {
await installChildWorkflow(root);
const agent = workflowAsAgent("child-wf", { storageRoot: root });
const out = await agent(makeAgentCtx({ depth: 0, prompt: "hello-parent", maxRounds: 5 }));
const out = await agent(
makeAgentCtx({ storageRoot: root, depth: 0, prompt: "hello-parent", maxRounds: 5 }),
);
expect(out).toBe("child-done:hello-parent");
} finally {
await rm(root, { recursive: true, force: true });
@@ -92,7 +107,9 @@ describe("workflowAsAgent", () => {
const root = await mkdtemp(join(tmpdir(), "wf-waa-depth-"));
try {
const agent = workflowAsAgent("child-wf", { storageRoot: root });
const out = await agent(makeAgentCtx({ depth: 3, prompt: "x", maxRounds: 5 }));
const out = await agent(
makeAgentCtx({ storageRoot: root, depth: 3, prompt: "x", maxRounds: 5 }),
);
expect(out).toContain("depth limit");
} finally {
await rm(root, { recursive: true, force: true });
@@ -0,0 +1,8 @@
import { pathToFileURL } from "node:url";
/**
* Dynamic-import a workflow bundle path (see {@link extractBundleExports} — symlink must exist first).
*/
export async function importWorkflowBundleModule(bundlePath: string): Promise<unknown> {
return import(pathToFileURL(bundlePath).href);
}
+7 -4
View File
@@ -41,9 +41,12 @@ function isAllowedImportSpecifier(spec: string): boolean {
if (spec.length === 0) {
return false;
}
if (spec.startsWith(".") || spec.startsWith("/")) {
if (spec.startsWith(".") || spec.startsWith("/") || spec.startsWith("file:")) {
return false;
}
if (spec === "@uncaged/workflow") {
return true;
}
return isBuiltin(spec);
}
@@ -297,7 +300,7 @@ function validateImportDeclaration(node: ImportDeclaration): string | null {
return "only static string import specifiers are allowed";
}
if (!isAllowedImportSpecifier(spec)) {
return `disallowed import specifier "${spec}" (only Node built-ins are allowed)`;
return `disallowed import specifier "${spec}" (only Node built-ins and "@uncaged/workflow" are allowed)`;
}
return null;
}
@@ -312,7 +315,7 @@ function validateExportSource(
return staticMessage;
}
if (!isAllowedImportSpecifier(spec)) {
return `${disallowedPrefix} "${spec}" (only Node built-ins are allowed)`;
return `${disallowedPrefix} "${spec}" (only Node built-ins and "@uncaged/workflow" are allowed)`;
}
return null;
}
@@ -365,7 +368,7 @@ function bundleConstraintViolationForNode(node: Node): string | null {
/**
* Validate RFC-001 bundle rules: single-file ESM shape, named exports `run` + `descriptor`,
* no default export, no dynamic `import()`, static imports restricted to Node builtins.
* no default export, no dynamic `import()`, static imports restricted to Node builtins plus `@uncaged/workflow`.
*/
export function validateWorkflowBundle(input: WorkflowBundleValidationInput): Result<void, string> {
if (!endsWithEsmJs(input.filePath)) {
+16 -6
View File
@@ -1,4 +1,6 @@
import type { ExtractFn } from "./extract-fn.js";
import { putContentMerkleNode } from "./merkle.js";
import { mergeRefsWithContentHash } from "./refs-field.js";
import {
type AgentBinding,
type AgentContext,
@@ -58,7 +60,7 @@ export function createWorkflow<M extends RoleMeta>(
const baseTs = Date.now();
let steps: RoleStep<M>[] = input.steps.map((out, i) => ({
role: out.role,
content: out.content,
contentHash: out.contentHash,
meta: out.meta,
refs: out.refs,
timestamp: baseTs + i,
@@ -93,6 +95,7 @@ export function createWorkflow<M extends RoleMeta>(
const agentCtx: AgentContext<M> = {
...modCtx,
currentRole: { name: next, systemPrompt: roleDef.systemPrompt },
cas: options.cas,
};
const agent = binding.overrides?.[next] ?? binding.agent;
@@ -110,21 +113,28 @@ export function createWorkflow<M extends RoleMeta>(
extractCtx as unknown as ExtractContext,
);
const refs = resolveExtractedRefs(
roleDef as unknown as RoleDefinition<Record<string, unknown>>,
meta,
const contentHash = await putContentMerkleNode(options.cas, raw);
const refs = mergeRefsWithContentHash(
resolveExtractedRefs(roleDef as unknown as RoleDefinition<Record<string, unknown>>, meta),
contentHash,
);
const ts = Date.now();
const step = {
role: next,
content: raw,
contentHash,
meta,
refs,
timestamp: ts,
} as RoleStep<M>;
yield { role: step.role, content: step.content, meta: step.meta, refs: step.refs };
yield {
role: step.role,
contentHash: step.contentHash,
meta: step.meta,
refs: step.refs,
};
steps = [...steps, step];
}
+22 -4
View File
@@ -1,7 +1,9 @@
import { appendFile, mkdir } from "node:fs/promises";
import { dirname } from "node:path";
import type { CasStore } from "./cas.js";
import type { LogFn } from "./logger.js";
import { getContentMerklePayload } from "./merkle.js";
import { normalizeRefsField } from "./refs-field.js";
import type { ThreadInput, WorkflowFn, WorkflowFnOptions, WorkflowResult } from "./types.js";
@@ -10,12 +12,13 @@ export type ExecuteThreadIo = {
hash: string;
dataJsonlPath: string;
infoJsonlPath: string;
cas: CasStore;
};
/** One persisted role line in `.data.jsonl` (engine adds these for fork replay before running the generator). */
export type PrefilledDiskStep = {
role: string;
content: string;
contentHash: string;
meta: Record<string, unknown>;
refs: string[];
timestamp: number;
@@ -50,8 +53,9 @@ async function driveWorkflowGenerator(params: {
dataJsonlPath: string;
threadId: string;
logger: LogFn;
cas: CasStore;
}): Promise<WorkflowResult> {
const { fn, input, bundleOptions, executeOptions, dataJsonlPath, threadId, logger } = params;
const { fn, input, bundleOptions, executeOptions, dataJsonlPath, threadId, logger, cas } = params;
const gen = fn(input, bundleOptions);
let written = 0;
@@ -78,10 +82,16 @@ async function driveWorkflowGenerator(params: {
written++;
const step = iterResult.value;
const resolved = await getContentMerklePayload(cas, step.contentHash);
if (resolved === null) {
throw new Error(
`role step ${step.role}: CAS blob missing for contentHash ${step.contentHash}`,
);
}
const ts = Date.now();
await appendDataLine(dataJsonlPath, {
role: step.role,
content: step.content,
contentHash: step.contentHash,
meta: step.meta,
refs: normalizeRefsField(step.refs),
timestamp: ts,
@@ -153,9 +163,15 @@ export async function executeThread(
if (prefilled !== null) {
for (const row of prefilled) {
const prefilledPayload = await getContentMerklePayload(io.cas, row.contentHash);
if (prefilledPayload === null) {
throw new Error(
`prefilled step ${row.role}: CAS blob missing for contentHash ${row.contentHash}`,
);
}
await appendDataLine(io.dataJsonlPath, {
role: row.role,
content: row.content,
contentHash: row.contentHash,
meta: row.meta,
refs: normalizeRefsField(row.refs),
timestamp: row.timestamp,
@@ -175,6 +191,7 @@ export async function executeThread(
threadId: io.threadId,
maxRounds: options.maxRounds,
depth: options.depth,
cas: io.cas,
};
return await driveWorkflowGenerator({
@@ -185,5 +202,6 @@ export async function executeThread(
dataJsonlPath: io.dataJsonlPath,
threadId: io.threadId,
logger,
cas: io.cas,
});
}
@@ -0,0 +1,36 @@
import { mkdir, readlink, symlink, unlink } from "node:fs/promises";
import path from "node:path";
import { fileURLToPath } from "node:url";
/** This module lives in `@uncaged/workflow/src`; parent dir is the package root. */
function installedWorkflowPackageDir(): string {
return fileURLToPath(new URL("..", import.meta.url));
}
/**
* Ensures `<storageRoot>/node_modules/@uncaged/workflow` points at the installed `@uncaged/workflow`
* package so workflow bundles loaded from `<storageRoot>/bundles/*.esm.js` can resolve `import "@uncaged/workflow"`.
*/
export async function ensureUncagedWorkflowSymlink(storageRoot: string): Promise<void> {
const target = installedWorkflowPackageDir();
const linkDir = path.join(storageRoot, "node_modules", "@uncaged");
const linkPath = path.join(linkDir, "workflow");
await mkdir(linkDir, { recursive: true });
try {
const existing = await readlink(linkPath);
const normalizedExisting = path.resolve(linkDir, existing);
if (normalizedExisting === target) {
return;
}
await unlink(linkPath);
} catch (e) {
const errObj = e as NodeJS.ErrnoException;
if (errObj.code !== "ENOENT" && errObj.code !== "EINVAL") {
throw e;
}
}
const linkType = process.platform === "win32" ? "junction" : "dir";
await symlink(target, linkPath, linkType);
}
@@ -1,5 +1,5 @@
import { pathToFileURL } from "node:url";
import { importWorkflowBundleModule } from "./bundle-import-env.js";
import { ensureUncagedWorkflowSymlink } from "./ensure-uncaged-workflow-symlink.js";
import { err, ok, type Result } from "./result.js";
import type { WorkflowFn } from "./types.js";
import type { WorkflowDescriptor } from "./workflow-descriptor.js";
@@ -10,14 +10,23 @@ export type ExtractedBundleExports = {
descriptor: WorkflowDescriptor;
};
export type ExtractBundleExportsOptions = {
/** When set, ensures `node_modules/@uncaged/workflow` exists under this root before import. */
storageRoot: string | null;
};
/** Load a workflow `.esm.js` bundle and read its named exports (`run`, `descriptor`). */
export async function extractBundleExports(
bundlePath: string,
options: ExtractBundleExportsOptions = { storageRoot: null },
): Promise<Result<ExtractedBundleExports, string>> {
let modUnknown: unknown;
try {
if (options.storageRoot !== null) {
await ensureUncagedWorkflowSymlink(options.storageRoot);
}
// Dynamic import required: user bundle path resolved at runtime
modUnknown = await import(pathToFileURL(bundlePath).href);
modUnknown = await importWorkflowBundleModule(bundlePath);
} catch (e) {
const message = e instanceof Error ? e.message : String(e);
return err(`failed to import bundle: ${message}`);
+6 -1
View File
@@ -1,6 +1,7 @@
import type * as z from "zod/v4";
import { llmExtractWithRetry } from "./llm-extract.js";
import { getContentMerklePayload } from "./merkle.js";
import type { ExtractContext, LlmProvider } from "./types.js";
export type ExtractFn = <T extends Record<string, unknown>>(
@@ -29,8 +30,12 @@ export function createExtract(provider: LlmProvider): ExtractFn {
if (ctx.steps.length > 0) {
lines.push("## Thread History");
for (const step of ctx.steps) {
const body = await getContentMerklePayload(ctx.cas, step.contentHash);
if (body === null) {
throw new Error(`extract: missing CAS blob for step ${step.role}: ${step.contentHash}`);
}
lines.push(`### ${step.role}`);
lines.push(step.content);
lines.push(body);
lines.push(`Meta: ${JSON.stringify(step.meta)}`);
lines.push("");
}
+4 -4
View File
@@ -19,14 +19,14 @@ function parseRoleLine(
lineIndex: number,
): Result<ForkHistoricalStep, string> {
const role = obj.role;
const content = obj.content;
const contentHash = obj.contentHash;
const meta = obj.meta;
const timestamp = obj.timestamp;
if (typeof role !== "string") {
return err(`invalid role record at line ${lineIndex}: missing role`);
}
if (typeof content !== "string") {
return err(`invalid role record at line ${lineIndex}: missing content`);
if (typeof contentHash !== "string") {
return err(`invalid role record at line ${lineIndex}: missing contentHash`);
}
if (meta === null || typeof meta !== "object") {
return err(`invalid role record at line ${lineIndex}: missing meta`);
@@ -36,7 +36,7 @@ function parseRoleLine(
}
return ok({
role,
content,
contentHash,
meta: meta as Record<string, unknown>,
refs: normalizeRefsField(obj.refs),
timestamp,
+9
View File
@@ -40,6 +40,15 @@ export {
type LogFn,
type LoggerSink,
} from "./logger.js";
export {
createContentMerkleNode,
getContentMerklePayload,
type MerkleNode,
type MerkleNodeType,
parseMerkleNode,
putContentMerkleNode,
serializeMerkleNode,
} from "./merkle.js";
export {
getRegisteredWorkflow,
listRegisteredWorkflowNames,
+76
View File
@@ -0,0 +1,76 @@
import { parse, stringify } from "yaml";
import type { CasStore } from "./cas.js";
export type MerkleNodeType = "content" | "step" | "thread";
export type MerkleNode = {
type: MerkleNodeType;
payload: string | Record<string, unknown>;
children: string[];
};
export function serializeMerkleNode(node: MerkleNode): string {
return stringify(
{ type: node.type, payload: node.payload, children: node.children },
{ indent: 2 },
);
}
export function parseMerkleNode(yamlText: string): MerkleNode {
const raw = parse(yamlText) as unknown;
if (raw === null || typeof raw !== "object") {
throw new Error("merkle: YAML root must be an object");
}
const rec = raw as Record<string, unknown>;
const type = rec.type;
const payload = rec.payload;
const children = rec.children;
if (type !== "content" && type !== "step" && type !== "thread") {
throw new Error("merkle: invalid or missing type");
}
if (typeof payload !== "string" && (payload === null || typeof payload !== "object")) {
throw new Error("merkle: payload must be a string or object");
}
if (!Array.isArray(children)) {
throw new Error("merkle: children must be an array");
}
const childHashes: string[] = [];
for (const c of children) {
if (typeof c !== "string") {
throw new Error("merkle: child hash must be a string");
}
childHashes.push(c);
}
return {
type,
payload: typeof payload === "string" ? payload : (payload as Record<string, unknown>),
children: childHashes,
};
}
export function createContentMerkleNode(payload: string): MerkleNode {
return { type: "content", payload, children: [] };
}
/** Serializes a content Merkle node and stores it in CAS; returns its hash. */
export async function putContentMerkleNode(store: CasStore, content: string): Promise<string> {
const yamlText = serializeMerkleNode(createContentMerkleNode(content));
return store.put(yamlText);
}
/** Loads a CAS blob and returns the payload string for a `content` Merkle node. */
export async function getContentMerklePayload(
store: CasStore,
hash: string,
): Promise<string | null> {
const yamlText = await store.get(hash);
if (yamlText === null) {
return null;
}
const node = parseMerkleNode(yamlText);
if (node.type !== "content" || typeof node.payload !== "string") {
return null;
}
return node.payload;
}
+9
View File
@@ -1,3 +1,12 @@
/** Append `contentHash` to `refs` when not already present (dedupe by first occurrence order). */
export function mergeRefsWithContentHash(refs: string[], contentHash: string): string[] {
const out = [...refs];
if (!out.includes(contentHash)) {
out.push(contentHash);
}
return out;
}
/** Normalize `refs` from persisted JSONL or IPC payloads (missing or invalid → []). */
export function normalizeRefsField(value: unknown): string[] {
if (!Array.isArray(value)) {
+8 -2
View File
@@ -1,5 +1,7 @@
import type * as z from "zod/v4";
import type { CasStore } from "./cas.js";
/** Sentinel values for automaton control flow. */
export const START = "__start__" as const;
export const END = "__end__" as const;
@@ -17,7 +19,8 @@ export type LlmProvider = {
/** What each generator yield produces — one role's output (engine adds `timestamp` when persisting). */
export type RoleOutput = {
role: string;
content: string;
/** CAS hash of the serialized Merkle content node for this step's body text. */
contentHash: string;
meta: Record<string, unknown>;
/** CAS hashes produced or consumed by this step (for GC traceability). */
refs: string[];
@@ -41,6 +44,8 @@ export type WorkflowFnOptions = {
maxRounds: number;
/** Nesting depth for workflow-as-agent chains; root threads use `0`. */
depth: number;
/** Global CAS store for Merkle content blobs (role step bodies). */
cas: CasStore;
};
/** Bundle contract — named export `run` is a function returning an AsyncGenerator. */
@@ -62,7 +67,7 @@ export type RoleStep<M extends RoleMeta> = {
[K in keyof M & string]: {
role: K;
meta: M[K];
content: string;
contentHash: string;
refs: string[];
timestamp: number;
};
@@ -83,6 +88,7 @@ export type AgentContext<M extends RoleMeta = RoleMeta> = ModeratorContext<M> &
name: string;
systemPrompt: string;
};
cas: CasStore;
};
/** Phase 3: Extractor runs — has agent output; the extraction instruction is a separate argument to the extract function. */
+13 -6
View File
@@ -1,12 +1,15 @@
import { mkdir, unlink, writeFile } from "node:fs/promises";
import { createServer, type Socket } from "node:net";
import { dirname, join } from "node:path";
import { pathToFileURL } from "node:url";
import { importWorkflowBundleModule } from "./bundle-import-env.js";
import { createCasStore } from "./cas.js";
import type { PrefilledDiskStep } from "./engine.js";
import { type ExecuteThreadIo, executeThread } from "./engine.js";
import { ensureUncagedWorkflowSymlink } from "./ensure-uncaged-workflow-symlink.js";
import { createLogger } from "./logger.js";
import { normalizeRefsField } from "./refs-field.js";
import { err, ok, type Result } from "./result.js";
import { getGlobalCasDir } from "./storage-root.js";
import { createThreadPauseGate, type ThreadPauseGate } from "./thread-pause-gate.js";
import type { RoleOutput, WorkflowFn } from "./types.js";
@@ -48,9 +51,9 @@ type ThreadHandle = {
function parseRoleOutputRecord(obj: Record<string, unknown>): RoleOutput | null {
const role = obj.role;
const content = obj.content;
const contentHash = obj.contentHash;
const meta = obj.meta;
if (typeof role !== "string" || typeof content !== "string") {
if (typeof role !== "string" || typeof contentHash !== "string") {
return null;
}
if (meta === null || typeof meta !== "object") {
@@ -58,7 +61,7 @@ function parseRoleOutputRecord(obj: Record<string, unknown>): RoleOutput | null
}
return {
role,
content,
contentHash,
meta: meta as Record<string, unknown>,
refs: normalizeRefsField(obj.refs),
};
@@ -300,8 +303,9 @@ async function main(): Promise<void> {
return;
}
await ensureUncagedWorkflowSymlink(storageRoot);
// Dynamic import required: user bundle path resolved at runtime
const modUnknown: unknown = await import(pathToFileURL(bundlePath).href);
const modUnknown: unknown = await importWorkflowBundleModule(bundlePath);
const modRec = modUnknown as Record<string, unknown>;
const runExport = modRec.run;
if (!isWorkflowFnLike(runExport)) {
@@ -315,6 +319,8 @@ async function main(): Promise<void> {
let activeThreads = 0;
let shutdownTimer: ReturnType<typeof setTimeout> | null = null;
const cas = createCasStore(getGlobalCasDir(storageRoot));
const workerCtlPath = join(storageRoot, "workers", `${hash}.json`);
function cancelShutdownTimer(): void {
@@ -363,6 +369,7 @@ async function main(): Promise<void> {
hash,
dataJsonlPath,
infoJsonlPath,
cas,
};
const existing = threads.get(threadId);
@@ -389,7 +396,7 @@ async function main(): Promise<void> {
const ts = cmd.stepTimestamps?.[i];
return {
role: step.role,
content: step.content,
contentHash: step.contentHash,
meta: step.meta,
refs: normalizeRefsField(step.refs),
timestamp: typeof ts === "number" && ts > 0 ? ts : baseTs + i,
+4 -2
View File
@@ -1,10 +1,11 @@
import { join } from "node:path";
import { createCasStore } from "./cas.js";
import { type ExecuteThreadIo, executeThread } from "./engine.js";
import { extractBundleExports } from "./extract-bundle-exports.js";
import { createLogger } from "./logger.js";
import { getRegisteredWorkflow, readWorkflowRegistry } from "./registry.js";
import { getDefaultWorkflowStorageRoot } from "./storage-root.js";
import { getDefaultWorkflowStorageRoot, getGlobalCasDir } from "./storage-root.js";
import type { AgentContext, AgentFn, ThreadInput } from "./types.js";
import { generateUlid } from "./ulid.js";
@@ -50,7 +51,7 @@ export function workflowAsAgent(
}
const bundlePath = join(storageRoot, "bundles", `${entry.hash}.esm.js`);
const bundleExportsResult = await extractBundleExports(bundlePath);
const bundleExportsResult = await extractBundleExports(bundlePath, { storageRoot });
if (!bundleExportsResult.ok) {
return `ERROR: ${bundleExportsResult.error}`;
}
@@ -69,6 +70,7 @@ export function workflowAsAgent(
hash: entry.hash,
dataJsonlPath,
infoJsonlPath,
cas: createCasStore(getGlobalCasDir(storageRoot)),
};
const logger = createLogger({ sink: { kind: "file", path: infoJsonlPath } });