diff --git a/packages/cli-workflow/__tests__/commands.test.ts b/packages/cli-workflow/__tests__/commands.test.ts index 9020604..cc4d8e2 100644 --- a/packages/cli-workflow/__tests__/commands.test.ts +++ b/packages/cli-workflow/__tests__/commands.test.ts @@ -16,6 +16,9 @@ import { addCliArgs } from "./bundle-fixture.js"; const fixtureDescriptor = `export const descriptor = { description: "fixture", roles: {} }; `; +const wfPutImport = `import { putContentMerkleNode } from "@uncaged/workflow"; +`; + describe("cli workflow commands", () => { let prevEnv: string | undefined; let storageRoot: string; @@ -41,11 +44,13 @@ describe("cli workflow commands", () => { const bundlePath = join(bundleDir, "demo.esm.js"); await writeFile( bundlePath, - `${fixtureDescriptor}import fs from "node:fs"; + `${fixtureDescriptor}${wfPutImport}import fs from "node:fs"; -export const run = async function* (input) { +export const run = async function* (input, options) { fs.existsSync("."); - yield { role: "noop", content: input.prompt, meta: { done: true } }; + const cas = options.cas; + const h = await putContentMerkleNode(cas, input.prompt); + yield { role: "noop", contentHash: h, meta: { done: true }, refs: [h] }; return { returnCode: 0, summary: "done" }; } `, @@ -112,8 +117,8 @@ export const run = async function* (input) { return { returnCode: 0, summary: in const bundlePath = join(storageRoot, "solo.esm.js"); await writeFile( bundlePath, - `export const run = async function* (input) { - yield { role: "x", content: input.prompt, meta: {} }; + `export const run = async function* () { + yield { role: "x", contentHash: "STUBHASH00000000000000001", meta: {}, refs: [] }; return { returnCode: 0, summary: "ok" }; } `, @@ -141,8 +146,11 @@ export const run = async function* (input) { return { returnCode: 0, summary: in }, }, }; -export const run = async function* (input) { - yield { role: "greeter", content: input.prompt, meta: { greeting: "hi" } }; +${wfPutImport} +export const run = async function* (input, options) { + const cas = options.cas; + const h = await putContentMerkleNode(cas, input.prompt); + yield { role: "greeter", contentHash: h, meta: { greeting: "hi" }, refs: [h] }; return { returnCode: 0, summary: "ok" }; }; `, @@ -180,8 +188,10 @@ export const run = async function* (input) { const bundlePath = join(bundleDir, "demo.esm.js"); await writeFile( bundlePath, - `${fixtureDescriptor}export const run = async function* (input) { - yield { role: "a", content: "x", meta: {} }; + `${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) { + const cas = options.cas; + const h = await putContentMerkleNode(cas, "x"); + yield { role: "a", contentHash: h, meta: {}, refs: [h] }; return { returnCode: 0, summary: "x" }; } `, @@ -209,8 +219,10 @@ export const run = async function* (input) { const dtsPath = join(bundleDir, "types.d.ts"); await writeFile( bundlePath, - `${fixtureDescriptor}export const run = async function* (input) { - yield { role: "a", content: "x", meta: {} }; + `${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) { + const cas = options.cas; + const h = await putContentMerkleNode(cas, "x"); + yield { role: "a", contentHash: h, meta: {}, refs: [h] }; return { returnCode: 0, summary: "x" }; } `, @@ -240,8 +252,10 @@ export const run = async function* (input) { const bundlePath = join(bundleDir, "demo.esm.js"); await writeFile( bundlePath, - `${fixtureDescriptor}export const run = async function* (input) { - yield { role: "a", content: "x", meta: {} }; + `${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) { + const cas = options.cas; + const h = await putContentMerkleNode(cas, "x"); + yield { role: "a", contentHash: h, meta: {}, refs: [h] }; return { returnCode: 0, summary: "x" }; } `, @@ -261,13 +275,17 @@ export const run = async function* (input) { const bundleDir = join(storageRoot, "src"); await mkdir(bundleDir, { recursive: true }); const bundlePath = join(bundleDir, "demo.esm.js"); - const v1 = `${fixtureDescriptor}export const run = async function* (input) { - yield { role: "a", content: "v1", meta: {} }; + const v1 = `${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) { + const cas = options.cas; + const h = await putContentMerkleNode(cas, "v1"); + yield { role: "a", contentHash: h, meta: {}, refs: [h] }; return { returnCode: 0, summary: "v1" }; } `; - const v2 = `${fixtureDescriptor}export const run = async function* (input) { - yield { role: "a", content: "v2", meta: {} }; + const v2 = `${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) { + const cas = options.cas; + const h = await putContentMerkleNode(cas, "v2"); + yield { role: "a", contentHash: h, meta: {}, refs: [h] }; return { returnCode: 0, summary: "v2" }; } `; @@ -299,13 +317,17 @@ export const run = async function* (input) { const bundleDir = join(storageRoot, "src"); await mkdir(bundleDir, { recursive: true }); const bundlePath = join(bundleDir, "demo.esm.js"); - const v1 = `${fixtureDescriptor}export const run = async function* (input) { - yield { role: "a", content: "v1", meta: {} }; + const v1 = `${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) { + const cas = options.cas; + const h = await putContentMerkleNode(cas, "v1"); + yield { role: "a", contentHash: h, meta: {}, refs: [h] }; return { returnCode: 0, summary: "v1" }; } `; - const v2 = `${fixtureDescriptor}export const run = async function* (input) { - yield { role: "a", content: "v2", meta: {} }; + const v2 = `${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) { + const cas = options.cas; + const h = await putContentMerkleNode(cas, "v2"); + yield { role: "a", contentHash: h, meta: {}, refs: [h] }; return { returnCode: 0, summary: "v2" }; } `; @@ -347,8 +369,10 @@ export const run = async function* (input) { const bundlePath = join(bundleDir, "demo.esm.js"); await writeFile( bundlePath, - `${fixtureDescriptor}export const run = async function* (input) { - yield { role: "a", content: "x", meta: {} }; + `${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) { + const cas = options.cas; + const h = await putContentMerkleNode(cas, "x"); + yield { role: "a", contentHash: h, meta: {}, refs: [h] }; return { returnCode: 0, summary: "x" }; } `, @@ -358,8 +382,10 @@ export const run = async function* (input) { expect(add1.ok).toBe(true); await writeFile( bundlePath, - `${fixtureDescriptor}export const run = async function* (input) { - yield { role: "a", content: "y", meta: {} }; + `${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) { + const cas = options.cas; + const h = await putContentMerkleNode(cas, "y"); + yield { role: "a", contentHash: h, meta: {}, refs: [h] }; return { returnCode: 0, summary: "y" }; } `, @@ -409,8 +435,10 @@ export const run = async function* (input) { const bundlePath = join(bundleDir, "demo.esm.js"); await writeFile( bundlePath, - `${fixtureDescriptor}export const run = async function* (input) { - yield { role: "a", content: "x", meta: {} }; + `${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) { + const cas = options.cas; + const h = await putContentMerkleNode(cas, "x"); + yield { role: "a", contentHash: h, meta: {}, refs: [h] }; return { returnCode: 0, summary: "x" }; } `, @@ -424,8 +452,10 @@ export const run = async function* (input) { const hash1 = add1.value.hash; await writeFile( bundlePath, - `${fixtureDescriptor}export const run = async function* (input) { - yield { role: "a", content: "y", meta: {} }; + `${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) { + const cas = options.cas; + const h = await putContentMerkleNode(cas, "y"); + yield { role: "a", contentHash: h, meta: {}, refs: [h] }; return { returnCode: 0, summary: "y" }; } `, diff --git a/packages/cli-workflow/__tests__/fork-cli.test.ts b/packages/cli-workflow/__tests__/fork-cli.test.ts index 59c3285..9dbc64e 100644 --- a/packages/cli-workflow/__tests__/fork-cli.test.ts +++ b/packages/cli-workflow/__tests__/fork-cli.test.ts @@ -2,6 +2,7 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test"; import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; +import { createCasStore, getContentMerklePayload, getGlobalCasDir } from "@uncaged/workflow"; import { cmdAdd } from "../src/cmd-add.js"; import { cmdFork } from "../src/cmd-fork.js"; import { cmdRun } from "../src/cmd-run.js"; @@ -9,7 +10,9 @@ import { pathExists } from "../src/fs-utils.js"; import { addCliArgs } from "./bundle-fixture.js"; /** Three-role workflow that respects `input.steps` for fork/resume. */ -const threeRoleBundleSource = `export const descriptor = { +const threeRoleBundleSource = `import { putContentMerkleNode } from "@uncaged/workflow"; + +export const descriptor = { description: "fork-cli", roles: { planner: { description: "planner", schema: {} }, @@ -17,20 +20,21 @@ const threeRoleBundleSource = `export const descriptor = { reviewer: { description: "reviewer", schema: {} }, }, }; -export const run = async function* (input) { +export const run = async function* (input, options) { + const cas = options.cas; const has = (r) => input.steps.some((s) => s.role === r); if (!has("planner")) { - yield { role: "planner", content: "p1", meta: { k: "planner" } }; + const h = await putContentMerkleNode(cas, "p1"); + yield { role: "planner", contentHash: h, meta: { k: "planner" }, refs: [h] }; } if (!has("coder")) { - yield { role: "coder", content: "c1", meta: { k: "coder" } }; + const h = await putContentMerkleNode(cas, "c1"); + yield { role: "coder", contentHash: h, meta: { k: "coder" }, refs: [h] }; } if (!has("reviewer")) { - yield { - role: "reviewer", - content: "rev-" + String(input.steps.length), - meta: { k: "reviewer" }, - }; + const body = "rev-" + String(input.steps.length); + const h = await putContentMerkleNode(cas, body); + yield { role: "reviewer", contentHash: h, meta: { k: "reviewer" }, refs: [h] }; } return { returnCode: 0, summary: "done" }; }; @@ -132,7 +136,8 @@ describe("cli fork", () => { const last = JSON.parse(lines[lines.length - 1] ?? "{}") as Record; expect(last.role).toBe("reviewer"); - expect(last.content).toBe("rev-1"); + const cas = createCasStore(getGlobalCasDir(storageRoot)); + expect(await getContentMerklePayload(cas, String(last.contentHash))).toBe("rev-1"); }); test("fork without --from-role retries last role", async () => { @@ -179,11 +184,12 @@ describe("cli fork", () => { const replayCoder = JSON.parse(lines[2] ?? "{}") as Record; expect(replayCoder.role).toBe("coder"); - expect(replayCoder.content).toBe("c1"); + const cas = createCasStore(getGlobalCasDir(storageRoot)); + expect(await getContentMerklePayload(cas, String(replayCoder.contentHash))).toBe("c1"); const last = JSON.parse(lines[lines.length - 1] ?? "{}") as Record; expect(last.role).toBe("reviewer"); - expect(last.content).toBe("rev-2"); + expect(await getContentMerklePayload(cas, String(last.contentHash))).toBe("rev-2"); }); test("fork rejects unknown role with available names", async () => { diff --git a/packages/cli-workflow/__tests__/gc-cli.test.ts b/packages/cli-workflow/__tests__/gc-cli.test.ts index 2a7362e..1d7a19b 100644 --- a/packages/cli-workflow/__tests__/gc-cli.test.ts +++ b/packages/cli-workflow/__tests__/gc-cli.test.ts @@ -4,31 +4,43 @@ import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; import { fileURLToPath } from "node:url"; -import { createCasStore, garbageCollectCas, getGlobalCasDir } from "@uncaged/workflow"; +import { + createCasStore, + garbageCollectCas, + getGlobalCasDir, + putContentMerkleNode, +} from "@uncaged/workflow"; import { cmdThreadRemove } from "../src/cmd-thread.js"; import { pathExists } from "../src/fs-utils.js"; const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url)); -/** Minimal valid `.data.jsonl` with one role step referencing `activeHash` in `refs`. */ -function makeDataJsonl(threadId: string, bundleHash: string, activeHash: string): string { - return [ +async function writeDemoDataJsonl(params: { + path: string; + threadId: string; + bundleHash: string; + cas: ReturnType; + activeHash: string; +}): Promise { + const bodyHash = await putContentMerkleNode(params.cas, "p"); + const text = [ JSON.stringify({ name: "demo", - hash: bundleHash, - threadId, + hash: params.bundleHash, + threadId: params.threadId, parameters: { prompt: "hi", options: { maxRounds: 5 } }, timestamp: 100, }), JSON.stringify({ role: "planner", - content: "p", + contentHash: bodyHash, meta: {}, - refs: [activeHash], + refs: [params.activeHash, bodyHash], timestamp: 101, }), "", ].join("\n"); + await writeFile(params.path, text, "utf8"); } describe("gc cli and garbageCollectCas", () => { @@ -60,11 +72,13 @@ describe("gc cli and garbageCollectCas", () => { const activeHash = await cas.put("active-blob"); const orphanHash = await cas.put("orphan-blob"); - await writeFile( - join(logsDir, `${threadId}.data.jsonl`), - makeDataJsonl(threadId, bundleHash, activeHash), - "utf8", - ); + await writeDemoDataJsonl({ + path: join(logsDir, `${threadId}.data.jsonl`), + threadId, + bundleHash, + cas, + activeHash, + }); const gc = await garbageCollectCas(storageRoot); expect(gc.ok).toBe(true); @@ -72,7 +86,7 @@ describe("gc cli and garbageCollectCas", () => { return; } expect(gc.value.scannedThreads).toBe(1); - expect(gc.value.activeRefs).toBe(1); + expect(gc.value.activeRefs).toBe(2); expect(gc.value.deletedEntries).toBe(1); expect(gc.value.deletedHashes).toEqual([orphanHash]); @@ -106,16 +120,18 @@ describe("gc cli and garbageCollectCas", () => { const activeHash = await cas.put("keep-me"); await cas.put("drop-me"); - await writeFile( - join(logsDir, `${threadId}.data.jsonl`), - makeDataJsonl(threadId, bundleHash, activeHash), - "utf8", - ); + await writeDemoDataJsonl({ + path: join(logsDir, `${threadId}.data.jsonl`), + threadId, + bundleHash, + cas, + activeHash, + }); const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; const proc = spawnSync(process.execPath, [cliEntryPath, "gc"], { env, encoding: "utf8" }); expect(proc.status).toBe(0); - expect(String(proc.stdout).trim()).toBe("scanned 1 threads, 1 active refs, deleted 1 entries"); + expect(String(proc.stdout).trim()).toBe("scanned 1 threads, 2 active refs, deleted 1 entries"); }); test("thread rm triggers gc so unreferenced CAS is removed", async () => { @@ -126,11 +142,13 @@ describe("gc cli and garbageCollectCas", () => { const cas = createCasStore(getGlobalCasDir(storageRoot)); const activeHash = await cas.put("pinned-by-ref"); - await writeFile( - join(logsDir, `${threadId}.data.jsonl`), - makeDataJsonl(threadId, bundleHash, activeHash), - "utf8", - ); + await writeDemoDataJsonl({ + path: join(logsDir, `${threadId}.data.jsonl`), + threadId, + bundleHash, + cas, + activeHash, + }); const orphanHash = await cas.put("orphan-after-rm"); const orphanPath = join(getGlobalCasDir(storageRoot), `${orphanHash}.txt`); diff --git a/packages/cli-workflow/__tests__/thread-cli.test.ts b/packages/cli-workflow/__tests__/thread-cli.test.ts index f77a924..8e76b04 100644 --- a/packages/cli-workflow/__tests__/thread-cli.test.ts +++ b/packages/cli-workflow/__tests__/thread-cli.test.ts @@ -17,6 +17,9 @@ import { cmdThreads } from "../src/cmd-threads.js"; import { pathExists, readTextFileIfExists } from "../src/fs-utils.js"; import { addCliArgs } from "./bundle-fixture.js"; +const wfPutImport = `import { putContentMerkleNode } from "@uncaged/workflow"; +`; + const threadFixtureDescriptor = `export const descriptor = { description: "thread-cli", roles: { @@ -31,18 +34,26 @@ const threadFixtureDescriptor = `export const descriptor = { `; const fastBundleSource = `${threadFixtureDescriptor} -export const run = async function* (input) { - yield { role: "planner", content: "plan", meta: { plan: input.prompt } }; - yield { role: "coder", content: "code", meta: { diff: "y" } }; +${wfPutImport} +export const run = async function* (input, options) { + const cas = options.cas; + let h = await putContentMerkleNode(cas, "plan"); + yield { role: "planner", contentHash: h, meta: { plan: input.prompt }, refs: [h] }; + h = await putContentMerkleNode(cas, "code"); + yield { role: "coder", contentHash: h, meta: { diff: "y" }, refs: [h] }; return { returnCode: 0, summary: "done" }; }; `; const slowPlannerBundleSource = `${threadFixtureDescriptor} -export const run = async function* (input) { +${wfPutImport} +export const run = async function* (input, options) { await new Promise((r) => setTimeout(r, 400)); - yield { role: "planner", content: "plan", meta: { plan: input.prompt } }; - yield { role: "coder", content: "code", meta: { diff: "y" } }; + const cas = options.cas; + let h = await putContentMerkleNode(cas, "plan"); + yield { role: "planner", contentHash: h, meta: { plan: input.prompt }, refs: [h] }; + h = await putContentMerkleNode(cas, "code"); + yield { role: "coder", contentHash: h, meta: { diff: "y" }, refs: [h] }; return { returnCode: 0, summary: "done" }; }; `; @@ -50,27 +61,38 @@ export const run = async function* (input) { const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url)); const abortablePlannerBundleSource = `${threadFixtureDescriptor} -export const run = async function* (input) { +${wfPutImport} +export const run = async function* (input, options) { await new Promise((r) => setTimeout(r, 600)); - yield { role: "planner", content: "plan", meta: { plan: input.prompt } }; - yield { role: "coder", content: "code", meta: { diff: "y" } }; + const cas = options.cas; + let h = await putContentMerkleNode(cas, "plan"); + yield { role: "planner", contentHash: h, meta: { plan: input.prompt }, refs: [h] }; + h = await putContentMerkleNode(cas, "code"); + yield { role: "coder", contentHash: h, meta: { diff: "y" }, refs: [h] }; return { returnCode: 0, summary: "done" }; }; `; const pauseResumeBundleSource = `${threadFixtureDescriptor} -export const run = async function* (input) { - yield { role: "first", content: "f", meta: {} }; +${wfPutImport} +export const run = async function* (_input, options) { + const cas = options.cas; + let h = await putContentMerkleNode(cas, "f"); + yield { role: "first", contentHash: h, meta: {}, refs: [h] }; await new Promise((r) => setTimeout(r, 1500)); - yield { role: "second", content: "s", meta: {} }; + h = await putContentMerkleNode(cas, "s"); + yield { role: "second", contentHash: h, meta: {}, refs: [h] }; return { returnCode: 0, summary: "done" }; }; `; const delayedFirstYieldBundleSource = `${threadFixtureDescriptor} -export const run = async function* (input) { +${wfPutImport} +export const run = async function* (_input, options) { await new Promise((r) => setTimeout(r, 900)); - yield { role: "only", content: "x", meta: {} }; + const cas = options.cas; + const h = await putContentMerkleNode(cas, "x"); + yield { role: "only", contentHash: h, meta: {}, refs: [h] }; return { returnCode: 0, summary: "done" }; }; `; diff --git a/packages/cli-workflow/src/cmd-add.ts b/packages/cli-workflow/src/cmd-add.ts index 0833bbf..a865255 100644 --- a/packages/cli-workflow/src/cmd-add.ts +++ b/packages/cli-workflow/src/cmd-add.ts @@ -192,7 +192,7 @@ export async function cmdAdd( return validated; } - const extracted = await extractBundleExports(resolvedPath); + const extracted = await extractBundleExports(resolvedPath, { storageRoot }); if (!extracted.ok) { return extracted; } diff --git a/packages/cli-workflow/src/cmd-fork.ts b/packages/cli-workflow/src/cmd-fork.ts index b16db17..acdb0a2 100644 --- a/packages/cli-workflow/src/cmd-fork.ts +++ b/packages/cli-workflow/src/cmd-fork.ts @@ -65,8 +65,9 @@ export async function cmdFork( const newThreadId = generateUlid(Date.now()); const stepsOnWire = plan.value.historicalSteps.map((s) => ({ role: s.role, - content: s.content, + contentHash: s.contentHash, meta: s.meta, + refs: s.refs, timestamp: s.timestamp, })); diff --git a/packages/workflow-agent-cursor/src/index.ts b/packages/workflow-agent-cursor/src/index.ts index a2262af..5e03087 100644 --- a/packages/workflow-agent-cursor/src/index.ts +++ b/packages/workflow-agent-cursor/src/index.ts @@ -54,7 +54,7 @@ export function createCursorAgent(config: CursorAgentConfig): AgentFn { "From the thread context, determine the absolute filesystem path where the project/repository is located.", extractCtx, ); - const fullPrompt = buildAgentPrompt(ctx); + const fullPrompt = await buildAgentPrompt(ctx); const args = [ "-p", fullPrompt, diff --git a/packages/workflow-agent-hermes/src/index.ts b/packages/workflow-agent-hermes/src/index.ts index 59c5838..6953a70 100644 --- a/packages/workflow-agent-hermes/src/index.ts +++ b/packages/workflow-agent-hermes/src/index.ts @@ -35,7 +35,7 @@ export function createHermesAgent(config: HermesAgentConfig): AgentFn { const timeoutMs = config.timeout; return async (ctx) => { - const fullPrompt = buildAgentPrompt(ctx); + const fullPrompt = await buildAgentPrompt(ctx); const args = [ "chat", "-q", diff --git a/packages/workflow-agent-llm/__tests__/create-llm-adapter.test.ts b/packages/workflow-agent-llm/__tests__/create-llm-adapter.test.ts index fadce70..39f50b8 100644 --- a/packages/workflow-agent-llm/__tests__/create-llm-adapter.test.ts +++ b/packages/workflow-agent-llm/__tests__/create-llm-adapter.test.ts @@ -1,8 +1,14 @@ import { describe, expect, test } from "bun:test"; -import { START, type ThreadContext } from "@uncaged/workflow"; +import { mkdtempSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { createCasStore, START, type ThreadContext } from "@uncaged/workflow"; import { createLlmAdapter } from "../src/create-llm-adapter.js"; +const casDir = mkdtempSync(join(tmpdir(), "wf-llm-adapter-cas-")); +const testCas = createCasStore(casDir); + function makeCtx(userContent: string): ThreadContext { return { start: { @@ -15,6 +21,7 @@ function makeCtx(userContent: string): ThreadContext { steps: [], threadId: "01TEST000000000000000000TR", currentRole: { name: "planner", systemPrompt: "system instructions" }, + cas: testCas, }; } diff --git a/packages/workflow-template-solve-issue/__tests__/solve-issue-template.test.ts b/packages/workflow-template-solve-issue/__tests__/solve-issue-template.test.ts index 4e63275..492771e 100644 --- a/packages/workflow-template-solve-issue/__tests__/solve-issue-template.test.ts +++ b/packages/workflow-template-solve-issue/__tests__/solve-issue-template.test.ts @@ -1,5 +1,9 @@ import { afterEach, describe, expect, test } from "bun:test"; +import { mkdtemp, rm } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; import { + createCasStore, createExtract, END, type ModeratorContext, @@ -113,7 +117,7 @@ function makeCtx( function preparerStep(): RoleStep { return { role: "preparer", - content: "prepared", + contentHash: "STUBHASHPREPARER01", meta: { repoPath: "/home/user/repos/test", defaultBranch: "main", @@ -133,7 +137,7 @@ function preparerStep(): RoleStep { function plannerStep(phases: PlannerMeta["phases"] = DEFAULT_PHASES): RoleStep { return { role: "planner", - content: "plan", + contentHash: "STUBHASHPLANNER001", meta: { phases }, refs: phases.map((p) => p.hash), timestamp: 1, @@ -143,7 +147,7 @@ function plannerStep(phases: PlannerMeta["phases"] = DEFAULT_PHASES): RoleStep { return { role: "coder", - content: "code", + contentHash: "STUBHASHCODER00001", meta: { completedPhase, filesChanged: ["a.ts"], summary: "fixed" }, refs: [completedPhase], timestamp: 2, @@ -153,7 +157,7 @@ function coderStep(completedPhase = "4KNMR2PX"): RoleStep { function reviewerStep(approved: boolean): RoleStep { return { role: "reviewer", - content: "rev", + contentHash: "STUBHASHREVIEWER01", meta: approved ? { status: "approved" as const } : { status: "rejected" as const, issues: ["needs fix"] }, @@ -165,7 +169,7 @@ function reviewerStep(approved: boolean): RoleStep { function committerStep(): RoleStep { return { role: "committer", - content: "commit", + contentHash: "STUBHASHCOMMITTER1", meta: { status: "committed", branch: "feat/issue-1", commitSha: "abc1234" }, refs: [], timestamp: 4, @@ -281,10 +285,15 @@ describe("solveIssueModerator", () => { describe("createSolveIssueRun", () => { let restoreFetch: (() => void) | null = null; + let casDir: string | undefined; - afterEach(() => { + afterEach(async () => { restoreFetch?.(); restoreFetch = null; + if (casDir !== undefined) { + await rm(casDir, { recursive: true, force: true }).catch(() => {}); + casDir = undefined; + } }); test("structured extraction yields preparer then planner meta from mocked chat completions", async () => { @@ -301,10 +310,13 @@ describe("createSolveIssueRun", () => { }; restoreFetch = installMockChatCompletions([EXPECT_PREPARER_META, EXPECT_PLANNER_META]); + casDir = await mkdtemp(join(tmpdir(), "solve-issue-cas-")); + const cas = createCasStore(casDir); + const run = createSolveIssueRun({ agent: async () => "" }, stubExtract); const gen = run( { prompt: "task", steps: [] }, - { threadId: "01TEST000000000000000000TR", maxRounds: 20, depth: 0 }, + { threadId: "01TEST000000000000000000TR", maxRounds: 20, depth: 0, cas }, ); const first = await gen.next(); expect(first.done).toBe(false); @@ -336,6 +348,9 @@ describe("createSolveIssueRun", () => { EXPECT_CODER_META, ]); + casDir = await mkdtemp(join(tmpdir(), "solve-issue-cas-")); + const cas = createCasStore(casDir); + const calls: string[] = []; const run = createSolveIssueRun( { @@ -362,7 +377,7 @@ describe("createSolveIssueRun", () => { ); const gen = run( { prompt: "task", steps: [] }, - { threadId: "01TEST000000000000000000TR", maxRounds: 20, depth: 0 }, + { threadId: "01TEST000000000000000000TR", maxRounds: 20, depth: 0, cas }, ); await gen.next(); expect(calls).toEqual(["preparer"]); diff --git a/packages/workflow-util-agent/__tests__/build-agent-prompt.test.ts b/packages/workflow-util-agent/__tests__/build-agent-prompt.test.ts index 349ec81..e7db229 100644 --- a/packages/workflow-util-agent/__tests__/build-agent-prompt.test.ts +++ b/packages/workflow-util-agent/__tests__/build-agent-prompt.test.ts @@ -1,5 +1,8 @@ -import { describe, expect, test } from "bun:test"; -import { START, type ThreadContext } from "@uncaged/workflow"; +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import { mkdtemp, rm } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { createCasStore, putContentMerkleNode, START, type ThreadContext } from "@uncaged/workflow"; import { buildAgentPrompt } from "../src/index.js"; @@ -13,38 +16,53 @@ function startTask(content: string): ThreadContext["start"] { } describe("buildAgentPrompt", () => { - test("includes system prompt and full task; omits tools when there are no steps", () => { + let casRoot: string; + + beforeEach(async () => { + casRoot = await mkdtemp(join(tmpdir(), "wf-build-prompt-cas-")); + }); + + afterEach(async () => { + await rm(casRoot, { recursive: true, force: true }); + }); + + test("includes system prompt and full task; omits tools when there are no steps", async () => { + const cas = createCasStore(casRoot); const ctx: ThreadContext = { start: startTask("fix the bug"), depth: 0, steps: [], threadId: "01TEST000000000000000000TR", currentRole: { name: START, systemPrompt: "You are an agent." }, + cas, }; - const text = buildAgentPrompt(ctx); + const text = await buildAgentPrompt(ctx); expect(text).toContain("You are an agent."); expect(text).toContain("## Task"); expect(text).toContain("fix the bug"); expect(text).not.toContain("## Tools"); }); - test("single step shows full content and meta, and includes tools", () => { + test("single step shows full content and meta, and includes tools", async () => { + const cas = createCasStore(casRoot); + const onlyHash = await putContentMerkleNode(cas, "only step full body"); const ctx: ThreadContext = { start: startTask("user task"), depth: 0, threadId: "01TEST000000000000000000TR", currentRole: { name: "coder", systemPrompt: "Be helpful." }, + cas, steps: [ { role: "coder", - content: "only step full body", + contentHash: onlyHash, meta: { files: ["a.ts"] }, - refs: [], + refs: [onlyHash], timestamp: 2, }, ], }; - const text = buildAgentPrompt(ctx); + const text = await buildAgentPrompt(ctx); expect(text).toContain("## Task"); expect(text).toContain("user task"); expect(text).toContain("## Step: coder"); @@ -54,30 +72,34 @@ describe("buildAgentPrompt", () => { expect(text).toContain("uncaged-workflow thread 01TEST000000000000000000TR"); }); - test("two or more steps: previous steps are meta-only; latest step is full", () => { + test("two or more steps: previous steps are meta-only; latest step is full", async () => { + const cas = createCasStore(casRoot); + const plannerHash = await putContentMerkleNode(cas, "PLANNER_SECRET_FULL_TEXT"); + const coderHash = await putContentMerkleNode(cas, "last step full content"); const ctx: ThreadContext = { start: startTask("first message full: task content here"), depth: 0, threadId: "01TEST000000000000000000TR", currentRole: { name: "coder", systemPrompt: "System." }, + cas, steps: [ { role: "planner", - content: "PLANNER_SECRET_FULL_TEXT", + contentHash: plannerHash, meta: { plan: "short" }, - refs: [], + refs: [plannerHash], timestamp: 2, }, { role: "coder", - content: "last step full content", + contentHash: coderHash, meta: { done: true }, - refs: [], + refs: [coderHash], timestamp: 3, }, ], }; - const text = buildAgentPrompt(ctx); + const text = await buildAgentPrompt(ctx); expect(text).toContain("first message full: task content here"); expect(text).toContain("## Previous Steps"); expect(text).toContain("### Step 1: planner"); @@ -90,37 +112,42 @@ describe("buildAgentPrompt", () => { expect(text).toContain("uncaged-workflow thread 01TEST000000000000000000TR"); }); - test("middle steps show meta summary only, not full content", () => { + test("middle steps show meta summary only, not full content", async () => { + const cas = createCasStore(casRoot); + const ha = await putContentMerkleNode(cas, "HIDDEN_A"); + const hb = await putContentMerkleNode(cas, "HIDDEN_B_MIDDLE"); + const hc = await putContentMerkleNode(cas, "VISIBLE_LAST"); const ctx: ThreadContext = { start: startTask("start"), depth: 0, threadId: "01TEST000000000000000000TR", currentRole: { name: "c", systemPrompt: "S" }, + cas, steps: [ { role: "a", - content: "HIDDEN_A", + contentHash: ha, meta: { n: 1 }, - refs: [], + refs: [ha], timestamp: 2, }, { role: "b", - content: "HIDDEN_B_MIDDLE", + contentHash: hb, meta: { n: 2 }, - refs: [], + refs: [hb], timestamp: 3, }, { role: "c", - content: "VISIBLE_LAST", + contentHash: hc, meta: { n: 3 }, - refs: [], + refs: [hc], timestamp: 4, }, ], }; - const text = buildAgentPrompt(ctx); + const text = await buildAgentPrompt(ctx); expect(text).not.toContain("HIDDEN_A"); expect(text).not.toContain("HIDDEN_B_MIDDLE"); expect(text).toContain('Summary: {"n":1}'); diff --git a/packages/workflow-util-agent/src/build-agent-prompt.ts b/packages/workflow-util-agent/src/build-agent-prompt.ts index 9564840..559101d 100644 --- a/packages/workflow-util-agent/src/build-agent-prompt.ts +++ b/packages/workflow-util-agent/src/build-agent-prompt.ts @@ -1,7 +1,16 @@ import type { AgentContext } from "@uncaged/workflow"; +import { getContentMerklePayload } from "@uncaged/workflow"; + +async function resolveStepText(ctx: AgentContext, contentHash: string): Promise { + const text = await getContentMerklePayload(ctx.cas, contentHash); + if (text === null) { + throw new Error(`buildAgentPrompt: missing CAS blob for ${contentHash}`); + } + return text; +} /** Builds the full agent prompt: system instructions plus summarized thread history. */ -export function buildAgentPrompt(ctx: AgentContext): string { +export async function buildAgentPrompt(ctx: AgentContext): Promise { const lines: string[] = []; lines.push(ctx.currentRole.systemPrompt); lines.push(""); @@ -15,10 +24,11 @@ export function buildAgentPrompt(ctx: AgentContext): string { if (steps.length === 1) { const s = steps[0]; + const body = await resolveStepText(ctx, s.contentHash); lines.push(""); lines.push(`## Step: ${s.role}`); lines.push(""); - lines.push(s.content); + lines.push(body); lines.push(""); lines.push(`Meta: ${JSON.stringify(s.meta)}`); } else { @@ -31,10 +41,11 @@ export function buildAgentPrompt(ctx: AgentContext): string { lines.push(`Summary: ${JSON.stringify(s.meta)}`); } const last = steps[steps.length - 1]; + const lastBody = await resolveStepText(ctx, last.contentHash); lines.push(""); lines.push(`## Latest Step: ${last.role}`); lines.push(""); - lines.push(last.content); + lines.push(lastBody); lines.push(""); lines.push(`Meta: ${JSON.stringify(last.meta)}`); } diff --git a/packages/workflow/__tests__/bundle-validator.test.ts b/packages/workflow/__tests__/bundle-validator.test.ts index 65f66af..370f567 100644 --- a/packages/workflow/__tests__/bundle-validator.test.ts +++ b/packages/workflow/__tests__/bundle-validator.test.ts @@ -26,6 +26,19 @@ export const run = async function* (input) { expect(r.ok).toBe(true); }); + test("allows static import of @uncaged/workflow", () => { + const source = `${minimalDescriptor}import { putContentMerkleNode } from "@uncaged/workflow"; + +export const run = async function* (_input, options) { + const cas = options.cas; + const h = await putContentMerkleNode(cas, "x"); + return { returnCode: 0, summary: h }; +}; +`; + const r = validateWorkflowBundle({ filePath: "/tmp/w.esm.js", source }); + expect(r.ok).toBe(true); + }); + test("rejects wrong filename suffix", () => { const r = validateWorkflowBundle({ filePath: "/tmp/w.js", diff --git a/packages/workflow/__tests__/engine.test.ts b/packages/workflow/__tests__/engine.test.ts index 4e13c38..ca230c9 100644 --- a/packages/workflow/__tests__/engine.test.ts +++ b/packages/workflow/__tests__/engine.test.ts @@ -4,10 +4,16 @@ import { tmpdir } from "node:os"; import { join } from "node:path"; import * as z from "zod/v4"; +import { createCasStore } from "../src/cas.js"; import { createWorkflow } from "../src/create-workflow.js"; import { executeThread } from "../src/engine.js"; import { createExtract } from "../src/extract-fn.js"; import { createLogger } from "../src/logger.js"; +import { + createContentMerkleNode, + getContentMerklePayload, + serializeMerkleNode, +} from "../src/merkle.js"; import { END } from "../src/types.js"; const plannerMetaSchema = z.object({ @@ -140,6 +146,7 @@ describe("executeThread", () => { const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`); const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`); await mkdir(join(root, "logs", hash), { recursive: true }); + const cas = createCasStore(join(root, "cas")); const logger = createLogger({ sink: { kind: "file", path: infoPath } }); const ac = new AbortController(); @@ -156,7 +163,7 @@ describe("executeThread", () => { forkSourceThreadId: null, prefilledDiskSteps: null, }, - { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath }, + { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath, cas }, logger, ); @@ -184,14 +191,15 @@ describe("executeThread", () => { const role1 = JSON.parse(lines[1] ?? "{}") as Record; expect(role1.role).toBe("planner"); - expect(role1.content).toBe("plan-body"); + expect(typeof role1.contentHash).toBe("string"); + expect(await getContentMerklePayload(cas, String(role1.contentHash))).toBe("plan-body"); expect(role1.meta).toEqual({ plan: "do-it", files: ["a.ts"] }); - expect(role1.refs).toEqual([]); + expect(role1.refs).toEqual([role1.contentHash]); expect(typeof role1.timestamp).toBe("number"); const role2 = JSON.parse(lines[2] ?? "{}") as Record; expect(role2.role).toBe("coder"); - expect(role2.refs).toEqual([]); + expect(role2.refs).toEqual([role2.contentHash]); const infoText = await readFile(infoPath, "utf8"); const infoLines = infoText @@ -219,11 +227,14 @@ describe("executeThread", () => { const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`); const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`); await mkdir(join(root, "logs", hash), { recursive: true }); + const cas = createCasStore(join(root, "cas")); + const plannerHash = await cas.put(serializeMerkleNode(createContentMerkleNode("plan-body"))); const logger = createLogger({ sink: { kind: "file", path: infoPath } }); const ac = new AbortController(); const histTs = 9_000_000; + const mergedPlannerRefs = ["CAS111AAAAAAA", plannerHash]; const result = await executeThread( demoWorkflow, "demo-flow", @@ -232,9 +243,9 @@ describe("executeThread", () => { steps: [ { role: "planner", - content: "plan-body", + contentHash: plannerHash, meta: { plan: "do-it", files: ["a.ts"] }, - refs: ["CAS111AAAAAAA"], + refs: mergedPlannerRefs, }, ], }, @@ -247,14 +258,14 @@ describe("executeThread", () => { prefilledDiskSteps: [ { role: "planner", - content: "plan-body", + contentHash: plannerHash, meta: { plan: "do-it", files: ["a.ts"] }, - refs: ["CAS111AAAAAAA"], + refs: mergedPlannerRefs, timestamp: histTs, }, ], }, - { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath }, + { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath, cas }, logger, ); @@ -273,11 +284,11 @@ describe("executeThread", () => { const role0 = JSON.parse(lines[1] ?? "{}") as Record; expect(role0.role).toBe("planner"); expect(role0.timestamp).toBe(histTs); - expect(role0.refs).toEqual(["CAS111AAAAAAA"]); + expect(role0.refs).toEqual(mergedPlannerRefs); const role1 = JSON.parse(lines[2] ?? "{}") as Record; expect(role1.role).toBe("coder"); - expect(role1.content).toBe("code-body"); + expect(await getContentMerklePayload(cas, String(role1.contentHash))).toBe("code-body"); } finally { await rm(root, { recursive: true, force: true }); } @@ -291,6 +302,7 @@ describe("executeThread", () => { const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`); const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`); await mkdir(join(root, "logs", hash), { recursive: true }); + const cas = createCasStore(join(root, "cas")); const logger = createLogger({ sink: { kind: "file", path: infoPath } }); const ac = new AbortController(); @@ -307,7 +319,7 @@ describe("executeThread", () => { forkSourceThreadId: null, prefilledDiskSteps: null, }, - { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath }, + { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath, cas }, logger, ); diff --git a/packages/workflow/__tests__/fork-thread.test.ts b/packages/workflow/__tests__/fork-thread.test.ts index d1dd70b..244fd87 100644 --- a/packages/workflow/__tests__/fork-thread.test.ts +++ b/packages/workflow/__tests__/fork-thread.test.ts @@ -7,9 +7,9 @@ import { } from "../src/fork-thread.js"; const sampleDataJsonl = `{"name":"demo","hash":"C9NMV6V2TQT81","threadId":"01AAA1111111111111111111","parameters":{"prompt":"hi","options":{"maxRounds":5}},"timestamp":100} -{"role":"planner","content":"p","meta":{},"timestamp":101} -{"role":"coder","content":"c","meta":{},"timestamp":102} -{"role":"reviewer","content":"r","meta":{},"timestamp":103} +{"role":"planner","contentHash":"HP0000000000000000000001","meta":{},"refs":[],"timestamp":101} +{"role":"coder","contentHash":"HP0000000000000000000002","meta":{},"refs":[],"timestamp":102} +{"role":"reviewer","contentHash":"HP0000000000000000000003","meta":{},"refs":[],"timestamp":103} `; describe("fork-thread", () => { @@ -89,7 +89,7 @@ describe("fork-thread", () => { test("parseThreadDataJsonl reads explicit depth from start record", () => { const text = `{"name":"demo","hash":"H","threadId":"01ZZZZZZZZZZZZZZZZZZZZZZ","parameters":{"prompt":"p","options":{"maxRounds":3,"depth":2}},"timestamp":1} -{"role":"planner","content":"x","meta":{},"timestamp":2} +{"role":"planner","contentHash":"HP0000000000000000000099","meta":{},"refs":[],"timestamp":2} `; const r = parseThreadDataJsonl(text); expect(r.ok).toBe(true); diff --git a/packages/workflow/__tests__/merkle.test.ts b/packages/workflow/__tests__/merkle.test.ts new file mode 100644 index 0000000..6aa4194 --- /dev/null +++ b/packages/workflow/__tests__/merkle.test.ts @@ -0,0 +1,29 @@ +import { describe, expect, test } from "bun:test"; + +import { createContentMerkleNode, parseMerkleNode, serializeMerkleNode } from "../src/merkle.js"; + +describe("merkle", () => { + test("content node roundtrips through YAML", () => { + const node = createContentMerkleNode("hello\nworld"); + const yaml = serializeMerkleNode(node); + const back = parseMerkleNode(yaml); + expect(back).toEqual(node); + }); + + test("step node with object payload roundtrips", () => { + const node = { + type: "step" as const, + payload: { role: "planner", foo: 1 }, + children: ["ABC123", "DEF456"], + }; + const yaml = serializeMerkleNode(node); + const back = parseMerkleNode(yaml); + expect(back.type).toBe("step"); + expect(back.payload).toEqual({ role: "planner", foo: 1 }); + expect(back.children).toEqual(["ABC123", "DEF456"]); + }); + + test("parse rejects invalid YAML root", () => { + expect(() => parseMerkleNode("[]")).toThrow(); + }); +}); diff --git a/packages/workflow/__tests__/refs-tracking.test.ts b/packages/workflow/__tests__/refs-tracking.test.ts index 6b6ad6a..909a3b3 100644 --- a/packages/workflow/__tests__/refs-tracking.test.ts +++ b/packages/workflow/__tests__/refs-tracking.test.ts @@ -4,6 +4,7 @@ import { tmpdir } from "node:os"; import { join } from "node:path"; import * as z from "zod/v4"; +import { createCasStore } from "../src/cas.js"; import { createWorkflow } from "../src/create-workflow.js"; import { executeThread } from "../src/engine.js"; import { createExtract } from "../src/extract-fn.js"; @@ -110,8 +111,8 @@ describe("RoleStep refs tracking", () => { test("parseThreadDataJsonl reads refs and defaults missing refs to []", () => { const text = `{"name":"demo","hash":"C9NMV6V2TQT81","threadId":"01AAA1111111111111111111","parameters":{"prompt":"hi","options":{"maxRounds":5}},"timestamp":100} -{"role":"planner","content":"p","meta":{},"refs":["H111AAAAAAAAA","H222AAAAAAAAA"],"timestamp":101} -{"role":"coder","content":"c","meta":{},"timestamp":102} +{"role":"planner","contentHash":"HPAYLOAD111111","meta":{},"refs":["H111AAAAAAAAA","H222AAAAAAAAA"],"timestamp":101} +{"role":"coder","contentHash":"HPAYLOAD222222","meta":{},"timestamp":102} `; const r = parseThreadDataJsonl(text); expect(r.ok).toBe(true); @@ -139,6 +140,7 @@ describe("RoleStep refs tracking", () => { const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`); const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`); await mkdir(join(root, "logs", hash), { recursive: true }); + const cas = createCasStore(join(root, "cas")); const logger = createLogger({ sink: { kind: "file", path: infoPath } }); const ac = new AbortController(); @@ -155,7 +157,7 @@ describe("RoleStep refs tracking", () => { forkSourceThreadId: null, prefilledDiskSteps: null, }, - { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath }, + { threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath, cas }, logger, ); @@ -170,7 +172,12 @@ describe("RoleStep refs tracking", () => { const role1 = JSON.parse(lines[1] ?? "{}") as Record; expect(role1.role).toBe("planner"); - expect(role1.refs).toEqual(["C9NMV6V2TQT81", "C9NMV6V2TQT82"]); + const refs = role1.refs as string[]; + expect(refs).toContain("C9NMV6V2TQT81"); + expect(refs).toContain("C9NMV6V2TQT82"); + expect(typeof role1.contentHash).toBe("string"); + expect(refs).toContain(String(role1.contentHash)); + expect(refs.length).toBe(3); } finally { await rm(root, { recursive: true, force: true }); } @@ -178,8 +185,8 @@ describe("RoleStep refs tracking", () => { test("buildForkPlan carries refs on historical steps", () => { const text = `{"name":"demo","hash":"C9NMV6V2TQT81","threadId":"01AAA1111111111111111111","parameters":{"prompt":"hi","options":{"maxRounds":5}},"timestamp":100} -{"role":"planner","content":"p","meta":{},"refs":["KEEPREFAAAAAA"],"timestamp":101} -{"role":"coder","content":"c","meta":{},"refs":["CODERHASHAAAA"],"timestamp":102} +{"role":"planner","contentHash":"HP111111111111","meta":{},"refs":["KEEPREFAAAAAA"],"timestamp":101} +{"role":"coder","contentHash":"HP222222222222","meta":{},"refs":["CODERHASHAAAA"],"timestamp":102} `; const plan = buildForkPlan(text, null); expect(plan.ok).toBe(true); diff --git a/packages/workflow/__tests__/thread-jsonl-format.test.ts b/packages/workflow/__tests__/thread-jsonl-format.test.ts index 4602926..92fea30 100644 --- a/packages/workflow/__tests__/thread-jsonl-format.test.ts +++ b/packages/workflow/__tests__/thread-jsonl-format.test.ts @@ -18,7 +18,7 @@ describe("RFC-001 thread JSONL shapes", () => { const roleRecord = { role: "planner", - content: "Plan: modify auth middleware...", + contentHash: "CPHASH000000000000000001", meta: { plan: "...", files: ["src/auth.ts"] }, refs: [] as string[], timestamp: 1714963201000, @@ -28,7 +28,7 @@ describe("RFC-001 thread JSONL shapes", () => { ["hash", "name", "parameters", "threadId", "timestamp"].sort(), ); expect(Object.keys(roleRecord).sort()).toEqual( - ["content", "meta", "refs", "role", "timestamp"].sort(), + ["contentHash", "meta", "refs", "role", "timestamp"].sort(), ); }); diff --git a/packages/workflow/__tests__/worker.test.ts b/packages/workflow/__tests__/worker.test.ts index 6fdc1f5..7067260 100644 --- a/packages/workflow/__tests__/worker.test.ts +++ b/packages/workflow/__tests__/worker.test.ts @@ -5,22 +5,29 @@ import { createConnection } from "node:net"; import { tmpdir } from "node:os"; import { join } from "node:path"; +import { createCasStore } from "../src/cas.js"; +import { createContentMerkleNode, serializeMerkleNode } from "../src/merkle.js"; import { getWorkerHostScriptPath } from "../src/worker-entry-path.js"; -const bundleSource = `export const descriptor = { +const bundleSource = `import { putContentMerkleNode } from "@uncaged/workflow"; + +export const descriptor = { description: "worker-test", roles: { planner: { description: "planner", schema: {} }, coder: { description: "coder", schema: {} }, }, }; -export const run = async function* (input) { +export const run = async function* (input, options) { + const cas = options.cas; const has = (r) => input.steps.some((s) => s.role === r); if (!has("planner")) { - yield { role: "planner", content: "p", meta: { plan: input.prompt } }; + const h = await putContentMerkleNode(cas, "p"); + yield { role: "planner", contentHash: h, meta: { plan: input.prompt }, refs: [h] }; } if (!has("coder")) { - yield { role: "coder", content: "c", meta: { diff: "y" } }; + const h = await putContentMerkleNode(cas, "c"); + yield { role: "coder", contentHash: h, meta: { diff: "y" }, refs: [h] }; } return { returnCode: 0, summary: "completed: moderator returned END" }; }; @@ -102,7 +109,7 @@ describe("worker process", () => { threadId, workflowName: "demo-flow", prompt: "hello", - options: { maxRounds: 5 }, + options: { maxRounds: 5, depth: 0 }, }); const exitCode: number = await new Promise((resolve) => { @@ -143,6 +150,11 @@ describe("worker process", () => { const port = await readReadyPort(child); + const cas = createCasStore(join(root, "cas")); + const plannerReplayHash = await cas.put( + serializeMerkleNode(createContentMerkleNode("p-old")), + ); + const threadId = "01KQXKW18CT8G75T53R8F4G7YG"; const srcId = "01SRCMMMMMMMMMMMMMMMMMMMM"; await sendJson(port, { @@ -150,12 +162,13 @@ describe("worker process", () => { threadId, workflowName: "demo-flow", prompt: "hello", - options: { maxRounds: 5 }, + options: { maxRounds: 5, depth: 0 }, steps: [ { role: "planner", - content: "p-old", + contentHash: plannerReplayHash, meta: { plan: "z" }, + refs: [plannerReplayHash], timestamp: 555, }, ], diff --git a/packages/workflow/__tests__/workflow-as-agent-integration.test.ts b/packages/workflow/__tests__/workflow-as-agent-integration.test.ts index b4386f4..4dd34b9 100644 --- a/packages/workflow/__tests__/workflow-as-agent-integration.test.ts +++ b/packages/workflow/__tests__/workflow-as-agent-integration.test.ts @@ -4,11 +4,13 @@ import { tmpdir } from "node:os"; import { join } from "node:path"; import * as z from "zod/v4"; +import { createCasStore } from "../src/cas.js"; import { createWorkflow } from "../src/create-workflow.js"; import { executeThread } from "../src/engine.js"; import { createExtract } from "../src/extract-fn.js"; import { hashWorkflowBundleBytes } from "../src/hash.js"; import { createLogger } from "../src/logger.js"; +import { getContentMerklePayload } from "../src/merkle.js"; import { readWorkflowRegistry, registerWorkflowVersion, @@ -80,7 +82,9 @@ const parentExtract = createExtract({ model: "test", }); -const childBundleSource = `export const descriptor = { +const childBundleSource = `import { putContentMerkleNode } from "@uncaged/workflow"; + +export const descriptor = { description: "child-integration", roles: { agent: { @@ -89,8 +93,10 @@ const childBundleSource = `export const descriptor = { }, }, }; -export async function* run(input) { - yield { role: "agent", content: "child-body", meta: {}, refs: [] }; +export async function* run(input, options) { + const cas = options.cas; + const h = await putContentMerkleNode(cas, "child-body"); + yield { role: "agent", contentHash: h, meta: {}, refs: [h] }; return { returnCode: 0, summary: "child-done:" + input.prompt }; } `; @@ -149,6 +155,7 @@ describe("workflowAsAgent integration", () => { const dataPath = join(root, "logs", parentHash, `${threadId}.data.jsonl`); const infoPath = join(root, "logs", parentHash, `${threadId}.info.jsonl`); await mkdir(join(root, "logs", parentHash), { recursive: true }); + const cas = createCasStore(join(root, "cas")); const logger = createLogger({ sink: { kind: "file", path: infoPath } }); const ac = new AbortController(); @@ -165,7 +172,7 @@ describe("workflowAsAgent integration", () => { forkSourceThreadId: null, prefilledDiskSteps: null, }, - { threadId, hash: parentHash, dataJsonlPath: dataPath, infoJsonlPath: infoPath }, + { threadId, hash: parentHash, dataJsonlPath: dataPath, infoJsonlPath: infoPath, cas }, logger, ); @@ -179,7 +186,9 @@ describe("workflowAsAgent integration", () => { expect(parentLines.length).toBe(2); const callerLine = JSON.parse(parentLines[1] ?? "{}") as Record; expect(callerLine.role).toBe("caller"); - expect(callerLine.content).toBe("child-done:from-parent"); + expect(await getContentMerklePayload(cas, String(callerLine.contentHash))).toBe( + "child-done:from-parent", + ); const childDir = join(root, "logs", childHash); const childFiles = await readdir(childDir); diff --git a/packages/workflow/__tests__/workflow-as-agent.test.ts b/packages/workflow/__tests__/workflow-as-agent.test.ts index d232f6e..59f8bd4 100644 --- a/packages/workflow/__tests__/workflow-as-agent.test.ts +++ b/packages/workflow/__tests__/workflow-as-agent.test.ts @@ -3,6 +3,7 @@ import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; +import { createCasStore } from "../src/cas.js"; import { hashWorkflowBundleBytes } from "../src/hash.js"; import { readWorkflowRegistry, @@ -12,7 +13,12 @@ import { import { type AgentContext, START } from "../src/types.js"; import { workflowAsAgent } from "../src/workflow-as-agent.js"; -function makeAgentCtx(params: { depth: number; prompt: string; maxRounds: number }): AgentContext { +function makeAgentCtx(params: { + storageRoot: string; + depth: number; + prompt: string; + maxRounds: number; +}): AgentContext { const ts = Date.now(); return { threadId: "01PARENT000000000000000001AA", @@ -28,10 +34,13 @@ function makeAgentCtx(params: { depth: number; prompt: string; maxRounds: number name: "caller", systemPrompt: "caller", }, + cas: createCasStore(join(params.storageRoot, "agent-ctx-cas")), }; } -const childBundleSource = `export const descriptor = { +const childBundleSource = `import { putContentMerkleNode } from "@uncaged/workflow"; + +export const descriptor = { description: "child-test", roles: { agent: { @@ -40,8 +49,10 @@ const childBundleSource = `export const descriptor = { }, }, }; -export async function* run(input) { - yield { role: "agent", content: "child-body", meta: {}, refs: [] }; +export async function* run(input, options) { + const cas = options.cas; + const h = await putContentMerkleNode(cas, "child-body"); + yield { role: "agent", contentHash: h, meta: {}, refs: [h] }; return { returnCode: 0, summary: "child-done:" + input.prompt }; } `; @@ -68,7 +79,9 @@ describe("workflowAsAgent", () => { const root = await mkdtemp(join(tmpdir(), "wf-waa-missing-")); try { const agent = workflowAsAgent("missing-wf", { storageRoot: root }); - const out = await agent(makeAgentCtx({ depth: 0, prompt: "x", maxRounds: 5 })); + const out = await agent( + makeAgentCtx({ storageRoot: root, depth: 0, prompt: "x", maxRounds: 5 }), + ); expect(out).toContain("not found in registry"); expect(out).toContain("missing-wf"); } finally { @@ -81,7 +94,9 @@ describe("workflowAsAgent", () => { try { await installChildWorkflow(root); const agent = workflowAsAgent("child-wf", { storageRoot: root }); - const out = await agent(makeAgentCtx({ depth: 0, prompt: "hello-parent", maxRounds: 5 })); + const out = await agent( + makeAgentCtx({ storageRoot: root, depth: 0, prompt: "hello-parent", maxRounds: 5 }), + ); expect(out).toBe("child-done:hello-parent"); } finally { await rm(root, { recursive: true, force: true }); @@ -92,7 +107,9 @@ describe("workflowAsAgent", () => { const root = await mkdtemp(join(tmpdir(), "wf-waa-depth-")); try { const agent = workflowAsAgent("child-wf", { storageRoot: root }); - const out = await agent(makeAgentCtx({ depth: 3, prompt: "x", maxRounds: 5 })); + const out = await agent( + makeAgentCtx({ storageRoot: root, depth: 3, prompt: "x", maxRounds: 5 }), + ); expect(out).toContain("depth limit"); } finally { await rm(root, { recursive: true, force: true }); diff --git a/packages/workflow/src/bundle-import-env.ts b/packages/workflow/src/bundle-import-env.ts new file mode 100644 index 0000000..ef1bf1c --- /dev/null +++ b/packages/workflow/src/bundle-import-env.ts @@ -0,0 +1,8 @@ +import { pathToFileURL } from "node:url"; + +/** + * Dynamic-import a workflow bundle path (see {@link extractBundleExports} — symlink must exist first). + */ +export async function importWorkflowBundleModule(bundlePath: string): Promise { + return import(pathToFileURL(bundlePath).href); +} diff --git a/packages/workflow/src/bundle-validator.ts b/packages/workflow/src/bundle-validator.ts index 4312a23..9c7f982 100644 --- a/packages/workflow/src/bundle-validator.ts +++ b/packages/workflow/src/bundle-validator.ts @@ -41,9 +41,12 @@ function isAllowedImportSpecifier(spec: string): boolean { if (spec.length === 0) { return false; } - if (spec.startsWith(".") || spec.startsWith("/")) { + if (spec.startsWith(".") || spec.startsWith("/") || spec.startsWith("file:")) { return false; } + if (spec === "@uncaged/workflow") { + return true; + } return isBuiltin(spec); } @@ -297,7 +300,7 @@ function validateImportDeclaration(node: ImportDeclaration): string | null { return "only static string import specifiers are allowed"; } if (!isAllowedImportSpecifier(spec)) { - return `disallowed import specifier "${spec}" (only Node built-ins are allowed)`; + return `disallowed import specifier "${spec}" (only Node built-ins and "@uncaged/workflow" are allowed)`; } return null; } @@ -312,7 +315,7 @@ function validateExportSource( return staticMessage; } if (!isAllowedImportSpecifier(spec)) { - return `${disallowedPrefix} "${spec}" (only Node built-ins are allowed)`; + return `${disallowedPrefix} "${spec}" (only Node built-ins and "@uncaged/workflow" are allowed)`; } return null; } @@ -365,7 +368,7 @@ function bundleConstraintViolationForNode(node: Node): string | null { /** * Validate RFC-001 bundle rules: single-file ESM shape, named exports `run` + `descriptor`, - * no default export, no dynamic `import()`, static imports restricted to Node builtins. + * no default export, no dynamic `import()`, static imports restricted to Node builtins plus `@uncaged/workflow`. */ export function validateWorkflowBundle(input: WorkflowBundleValidationInput): Result { if (!endsWithEsmJs(input.filePath)) { diff --git a/packages/workflow/src/create-workflow.ts b/packages/workflow/src/create-workflow.ts index 5cfd008..ea81cd3 100644 --- a/packages/workflow/src/create-workflow.ts +++ b/packages/workflow/src/create-workflow.ts @@ -1,4 +1,6 @@ import type { ExtractFn } from "./extract-fn.js"; +import { putContentMerkleNode } from "./merkle.js"; +import { mergeRefsWithContentHash } from "./refs-field.js"; import { type AgentBinding, type AgentContext, @@ -58,7 +60,7 @@ export function createWorkflow( const baseTs = Date.now(); let steps: RoleStep[] = input.steps.map((out, i) => ({ role: out.role, - content: out.content, + contentHash: out.contentHash, meta: out.meta, refs: out.refs, timestamp: baseTs + i, @@ -93,6 +95,7 @@ export function createWorkflow( const agentCtx: AgentContext = { ...modCtx, currentRole: { name: next, systemPrompt: roleDef.systemPrompt }, + cas: options.cas, }; const agent = binding.overrides?.[next] ?? binding.agent; @@ -110,21 +113,28 @@ export function createWorkflow( extractCtx as unknown as ExtractContext, ); - const refs = resolveExtractedRefs( - roleDef as unknown as RoleDefinition>, - meta, + const contentHash = await putContentMerkleNode(options.cas, raw); + + const refs = mergeRefsWithContentHash( + resolveExtractedRefs(roleDef as unknown as RoleDefinition>, meta), + contentHash, ); const ts = Date.now(); const step = { role: next, - content: raw, + contentHash, meta, refs, timestamp: ts, } as RoleStep; - yield { role: step.role, content: step.content, meta: step.meta, refs: step.refs }; + yield { + role: step.role, + contentHash: step.contentHash, + meta: step.meta, + refs: step.refs, + }; steps = [...steps, step]; } diff --git a/packages/workflow/src/engine.ts b/packages/workflow/src/engine.ts index 2e3abdb..8808637 100644 --- a/packages/workflow/src/engine.ts +++ b/packages/workflow/src/engine.ts @@ -1,7 +1,9 @@ import { appendFile, mkdir } from "node:fs/promises"; import { dirname } from "node:path"; +import type { CasStore } from "./cas.js"; import type { LogFn } from "./logger.js"; +import { getContentMerklePayload } from "./merkle.js"; import { normalizeRefsField } from "./refs-field.js"; import type { ThreadInput, WorkflowFn, WorkflowFnOptions, WorkflowResult } from "./types.js"; @@ -10,12 +12,13 @@ export type ExecuteThreadIo = { hash: string; dataJsonlPath: string; infoJsonlPath: string; + cas: CasStore; }; /** One persisted role line in `.data.jsonl` (engine adds these for fork replay before running the generator). */ export type PrefilledDiskStep = { role: string; - content: string; + contentHash: string; meta: Record; refs: string[]; timestamp: number; @@ -50,8 +53,9 @@ async function driveWorkflowGenerator(params: { dataJsonlPath: string; threadId: string; logger: LogFn; + cas: CasStore; }): Promise { - const { fn, input, bundleOptions, executeOptions, dataJsonlPath, threadId, logger } = params; + const { fn, input, bundleOptions, executeOptions, dataJsonlPath, threadId, logger, cas } = params; const gen = fn(input, bundleOptions); let written = 0; @@ -78,10 +82,16 @@ async function driveWorkflowGenerator(params: { written++; const step = iterResult.value; + const resolved = await getContentMerklePayload(cas, step.contentHash); + if (resolved === null) { + throw new Error( + `role step ${step.role}: CAS blob missing for contentHash ${step.contentHash}`, + ); + } const ts = Date.now(); await appendDataLine(dataJsonlPath, { role: step.role, - content: step.content, + contentHash: step.contentHash, meta: step.meta, refs: normalizeRefsField(step.refs), timestamp: ts, @@ -153,9 +163,15 @@ export async function executeThread( if (prefilled !== null) { for (const row of prefilled) { + const prefilledPayload = await getContentMerklePayload(io.cas, row.contentHash); + if (prefilledPayload === null) { + throw new Error( + `prefilled step ${row.role}: CAS blob missing for contentHash ${row.contentHash}`, + ); + } await appendDataLine(io.dataJsonlPath, { role: row.role, - content: row.content, + contentHash: row.contentHash, meta: row.meta, refs: normalizeRefsField(row.refs), timestamp: row.timestamp, @@ -175,6 +191,7 @@ export async function executeThread( threadId: io.threadId, maxRounds: options.maxRounds, depth: options.depth, + cas: io.cas, }; return await driveWorkflowGenerator({ @@ -185,5 +202,6 @@ export async function executeThread( dataJsonlPath: io.dataJsonlPath, threadId: io.threadId, logger, + cas: io.cas, }); } diff --git a/packages/workflow/src/ensure-uncaged-workflow-symlink.ts b/packages/workflow/src/ensure-uncaged-workflow-symlink.ts new file mode 100644 index 0000000..82aaf1c --- /dev/null +++ b/packages/workflow/src/ensure-uncaged-workflow-symlink.ts @@ -0,0 +1,36 @@ +import { mkdir, readlink, symlink, unlink } from "node:fs/promises"; +import path from "node:path"; +import { fileURLToPath } from "node:url"; + +/** This module lives in `@uncaged/workflow/src`; parent dir is the package root. */ +function installedWorkflowPackageDir(): string { + return fileURLToPath(new URL("..", import.meta.url)); +} + +/** + * Ensures `/node_modules/@uncaged/workflow` points at the installed `@uncaged/workflow` + * package so workflow bundles loaded from `/bundles/*.esm.js` can resolve `import "@uncaged/workflow"`. + */ +export async function ensureUncagedWorkflowSymlink(storageRoot: string): Promise { + const target = installedWorkflowPackageDir(); + const linkDir = path.join(storageRoot, "node_modules", "@uncaged"); + const linkPath = path.join(linkDir, "workflow"); + await mkdir(linkDir, { recursive: true }); + + try { + const existing = await readlink(linkPath); + const normalizedExisting = path.resolve(linkDir, existing); + if (normalizedExisting === target) { + return; + } + await unlink(linkPath); + } catch (e) { + const errObj = e as NodeJS.ErrnoException; + if (errObj.code !== "ENOENT" && errObj.code !== "EINVAL") { + throw e; + } + } + + const linkType = process.platform === "win32" ? "junction" : "dir"; + await symlink(target, linkPath, linkType); +} diff --git a/packages/workflow/src/extract-bundle-exports.ts b/packages/workflow/src/extract-bundle-exports.ts index 82294fc..1a0c987 100644 --- a/packages/workflow/src/extract-bundle-exports.ts +++ b/packages/workflow/src/extract-bundle-exports.ts @@ -1,5 +1,5 @@ -import { pathToFileURL } from "node:url"; - +import { importWorkflowBundleModule } from "./bundle-import-env.js"; +import { ensureUncagedWorkflowSymlink } from "./ensure-uncaged-workflow-symlink.js"; import { err, ok, type Result } from "./result.js"; import type { WorkflowFn } from "./types.js"; import type { WorkflowDescriptor } from "./workflow-descriptor.js"; @@ -10,14 +10,23 @@ export type ExtractedBundleExports = { descriptor: WorkflowDescriptor; }; +export type ExtractBundleExportsOptions = { + /** When set, ensures `node_modules/@uncaged/workflow` exists under this root before import. */ + storageRoot: string | null; +}; + /** Load a workflow `.esm.js` bundle and read its named exports (`run`, `descriptor`). */ export async function extractBundleExports( bundlePath: string, + options: ExtractBundleExportsOptions = { storageRoot: null }, ): Promise> { let modUnknown: unknown; try { + if (options.storageRoot !== null) { + await ensureUncagedWorkflowSymlink(options.storageRoot); + } // Dynamic import required: user bundle path resolved at runtime - modUnknown = await import(pathToFileURL(bundlePath).href); + modUnknown = await importWorkflowBundleModule(bundlePath); } catch (e) { const message = e instanceof Error ? e.message : String(e); return err(`failed to import bundle: ${message}`); diff --git a/packages/workflow/src/extract-fn.ts b/packages/workflow/src/extract-fn.ts index e1ca2d5..4c46a2b 100644 --- a/packages/workflow/src/extract-fn.ts +++ b/packages/workflow/src/extract-fn.ts @@ -1,6 +1,7 @@ import type * as z from "zod/v4"; import { llmExtractWithRetry } from "./llm-extract.js"; +import { getContentMerklePayload } from "./merkle.js"; import type { ExtractContext, LlmProvider } from "./types.js"; export type ExtractFn = >( @@ -29,8 +30,12 @@ export function createExtract(provider: LlmProvider): ExtractFn { if (ctx.steps.length > 0) { lines.push("## Thread History"); for (const step of ctx.steps) { + const body = await getContentMerklePayload(ctx.cas, step.contentHash); + if (body === null) { + throw new Error(`extract: missing CAS blob for step ${step.role}: ${step.contentHash}`); + } lines.push(`### ${step.role}`); - lines.push(step.content); + lines.push(body); lines.push(`Meta: ${JSON.stringify(step.meta)}`); lines.push(""); } diff --git a/packages/workflow/src/fork-thread.ts b/packages/workflow/src/fork-thread.ts index b058a64..94a3d85 100644 --- a/packages/workflow/src/fork-thread.ts +++ b/packages/workflow/src/fork-thread.ts @@ -19,14 +19,14 @@ function parseRoleLine( lineIndex: number, ): Result { const role = obj.role; - const content = obj.content; + const contentHash = obj.contentHash; const meta = obj.meta; const timestamp = obj.timestamp; if (typeof role !== "string") { return err(`invalid role record at line ${lineIndex}: missing role`); } - if (typeof content !== "string") { - return err(`invalid role record at line ${lineIndex}: missing content`); + if (typeof contentHash !== "string") { + return err(`invalid role record at line ${lineIndex}: missing contentHash`); } if (meta === null || typeof meta !== "object") { return err(`invalid role record at line ${lineIndex}: missing meta`); @@ -36,7 +36,7 @@ function parseRoleLine( } return ok({ role, - content, + contentHash, meta: meta as Record, refs: normalizeRefsField(obj.refs), timestamp, diff --git a/packages/workflow/src/index.ts b/packages/workflow/src/index.ts index 205749b..e6481cf 100644 --- a/packages/workflow/src/index.ts +++ b/packages/workflow/src/index.ts @@ -40,6 +40,15 @@ export { type LogFn, type LoggerSink, } from "./logger.js"; +export { + createContentMerkleNode, + getContentMerklePayload, + type MerkleNode, + type MerkleNodeType, + parseMerkleNode, + putContentMerkleNode, + serializeMerkleNode, +} from "./merkle.js"; export { getRegisteredWorkflow, listRegisteredWorkflowNames, diff --git a/packages/workflow/src/merkle.ts b/packages/workflow/src/merkle.ts new file mode 100644 index 0000000..ce938b6 --- /dev/null +++ b/packages/workflow/src/merkle.ts @@ -0,0 +1,76 @@ +import { parse, stringify } from "yaml"; + +import type { CasStore } from "./cas.js"; + +export type MerkleNodeType = "content" | "step" | "thread"; + +export type MerkleNode = { + type: MerkleNodeType; + payload: string | Record; + children: string[]; +}; + +export function serializeMerkleNode(node: MerkleNode): string { + return stringify( + { type: node.type, payload: node.payload, children: node.children }, + { indent: 2 }, + ); +} + +export function parseMerkleNode(yamlText: string): MerkleNode { + const raw = parse(yamlText) as unknown; + if (raw === null || typeof raw !== "object") { + throw new Error("merkle: YAML root must be an object"); + } + const rec = raw as Record; + const type = rec.type; + const payload = rec.payload; + const children = rec.children; + if (type !== "content" && type !== "step" && type !== "thread") { + throw new Error("merkle: invalid or missing type"); + } + if (typeof payload !== "string" && (payload === null || typeof payload !== "object")) { + throw new Error("merkle: payload must be a string or object"); + } + if (!Array.isArray(children)) { + throw new Error("merkle: children must be an array"); + } + const childHashes: string[] = []; + for (const c of children) { + if (typeof c !== "string") { + throw new Error("merkle: child hash must be a string"); + } + childHashes.push(c); + } + return { + type, + payload: typeof payload === "string" ? payload : (payload as Record), + children: childHashes, + }; +} + +export function createContentMerkleNode(payload: string): MerkleNode { + return { type: "content", payload, children: [] }; +} + +/** Serializes a content Merkle node and stores it in CAS; returns its hash. */ +export async function putContentMerkleNode(store: CasStore, content: string): Promise { + const yamlText = serializeMerkleNode(createContentMerkleNode(content)); + return store.put(yamlText); +} + +/** Loads a CAS blob and returns the payload string for a `content` Merkle node. */ +export async function getContentMerklePayload( + store: CasStore, + hash: string, +): Promise { + const yamlText = await store.get(hash); + if (yamlText === null) { + return null; + } + const node = parseMerkleNode(yamlText); + if (node.type !== "content" || typeof node.payload !== "string") { + return null; + } + return node.payload; +} diff --git a/packages/workflow/src/refs-field.ts b/packages/workflow/src/refs-field.ts index fc00edc..e89d40c 100644 --- a/packages/workflow/src/refs-field.ts +++ b/packages/workflow/src/refs-field.ts @@ -1,3 +1,12 @@ +/** Append `contentHash` to `refs` when not already present (dedupe by first occurrence order). */ +export function mergeRefsWithContentHash(refs: string[], contentHash: string): string[] { + const out = [...refs]; + if (!out.includes(contentHash)) { + out.push(contentHash); + } + return out; +} + /** Normalize `refs` from persisted JSONL or IPC payloads (missing or invalid → []). */ export function normalizeRefsField(value: unknown): string[] { if (!Array.isArray(value)) { diff --git a/packages/workflow/src/types.ts b/packages/workflow/src/types.ts index 7b5e642..28e9d9e 100644 --- a/packages/workflow/src/types.ts +++ b/packages/workflow/src/types.ts @@ -1,5 +1,7 @@ import type * as z from "zod/v4"; +import type { CasStore } from "./cas.js"; + /** Sentinel values for automaton control flow. */ export const START = "__start__" as const; export const END = "__end__" as const; @@ -17,7 +19,8 @@ export type LlmProvider = { /** What each generator yield produces — one role's output (engine adds `timestamp` when persisting). */ export type RoleOutput = { role: string; - content: string; + /** CAS hash of the serialized Merkle content node for this step's body text. */ + contentHash: string; meta: Record; /** CAS hashes produced or consumed by this step (for GC traceability). */ refs: string[]; @@ -41,6 +44,8 @@ export type WorkflowFnOptions = { maxRounds: number; /** Nesting depth for workflow-as-agent chains; root threads use `0`. */ depth: number; + /** Global CAS store for Merkle content blobs (role step bodies). */ + cas: CasStore; }; /** Bundle contract — named export `run` is a function returning an AsyncGenerator. */ @@ -62,7 +67,7 @@ export type RoleStep = { [K in keyof M & string]: { role: K; meta: M[K]; - content: string; + contentHash: string; refs: string[]; timestamp: number; }; @@ -83,6 +88,7 @@ export type AgentContext = ModeratorContext & name: string; systemPrompt: string; }; + cas: CasStore; }; /** Phase 3: Extractor runs — has agent output; the extraction instruction is a separate argument to the extract function. */ diff --git a/packages/workflow/src/worker.ts b/packages/workflow/src/worker.ts index 85de2cd..8191b1d 100644 --- a/packages/workflow/src/worker.ts +++ b/packages/workflow/src/worker.ts @@ -1,12 +1,15 @@ import { mkdir, unlink, writeFile } from "node:fs/promises"; import { createServer, type Socket } from "node:net"; import { dirname, join } from "node:path"; -import { pathToFileURL } from "node:url"; +import { importWorkflowBundleModule } from "./bundle-import-env.js"; +import { createCasStore } from "./cas.js"; import type { PrefilledDiskStep } from "./engine.js"; import { type ExecuteThreadIo, executeThread } from "./engine.js"; +import { ensureUncagedWorkflowSymlink } from "./ensure-uncaged-workflow-symlink.js"; import { createLogger } from "./logger.js"; import { normalizeRefsField } from "./refs-field.js"; import { err, ok, type Result } from "./result.js"; +import { getGlobalCasDir } from "./storage-root.js"; import { createThreadPauseGate, type ThreadPauseGate } from "./thread-pause-gate.js"; import type { RoleOutput, WorkflowFn } from "./types.js"; @@ -48,9 +51,9 @@ type ThreadHandle = { function parseRoleOutputRecord(obj: Record): RoleOutput | null { const role = obj.role; - const content = obj.content; + const contentHash = obj.contentHash; const meta = obj.meta; - if (typeof role !== "string" || typeof content !== "string") { + if (typeof role !== "string" || typeof contentHash !== "string") { return null; } if (meta === null || typeof meta !== "object") { @@ -58,7 +61,7 @@ function parseRoleOutputRecord(obj: Record): RoleOutput | null } return { role, - content, + contentHash, meta: meta as Record, refs: normalizeRefsField(obj.refs), }; @@ -300,8 +303,9 @@ async function main(): Promise { return; } + await ensureUncagedWorkflowSymlink(storageRoot); // Dynamic import required: user bundle path resolved at runtime - const modUnknown: unknown = await import(pathToFileURL(bundlePath).href); + const modUnknown: unknown = await importWorkflowBundleModule(bundlePath); const modRec = modUnknown as Record; const runExport = modRec.run; if (!isWorkflowFnLike(runExport)) { @@ -315,6 +319,8 @@ async function main(): Promise { let activeThreads = 0; let shutdownTimer: ReturnType | null = null; + const cas = createCasStore(getGlobalCasDir(storageRoot)); + const workerCtlPath = join(storageRoot, "workers", `${hash}.json`); function cancelShutdownTimer(): void { @@ -363,6 +369,7 @@ async function main(): Promise { hash, dataJsonlPath, infoJsonlPath, + cas, }; const existing = threads.get(threadId); @@ -389,7 +396,7 @@ async function main(): Promise { const ts = cmd.stepTimestamps?.[i]; return { role: step.role, - content: step.content, + contentHash: step.contentHash, meta: step.meta, refs: normalizeRefsField(step.refs), timestamp: typeof ts === "number" && ts > 0 ? ts : baseTs + i, diff --git a/packages/workflow/src/workflow-as-agent.ts b/packages/workflow/src/workflow-as-agent.ts index bbe6ce0..3c60d58 100644 --- a/packages/workflow/src/workflow-as-agent.ts +++ b/packages/workflow/src/workflow-as-agent.ts @@ -1,10 +1,11 @@ import { join } from "node:path"; +import { createCasStore } from "./cas.js"; import { type ExecuteThreadIo, executeThread } from "./engine.js"; import { extractBundleExports } from "./extract-bundle-exports.js"; import { createLogger } from "./logger.js"; import { getRegisteredWorkflow, readWorkflowRegistry } from "./registry.js"; -import { getDefaultWorkflowStorageRoot } from "./storage-root.js"; +import { getDefaultWorkflowStorageRoot, getGlobalCasDir } from "./storage-root.js"; import type { AgentContext, AgentFn, ThreadInput } from "./types.js"; import { generateUlid } from "./ulid.js"; @@ -50,7 +51,7 @@ export function workflowAsAgent( } const bundlePath = join(storageRoot, "bundles", `${entry.hash}.esm.js`); - const bundleExportsResult = await extractBundleExports(bundlePath); + const bundleExportsResult = await extractBundleExports(bundlePath, { storageRoot }); if (!bundleExportsResult.ok) { return `ERROR: ${bundleExportsResult.error}`; } @@ -69,6 +70,7 @@ export function workflowAsAgent( hash: entry.hash, dataJsonlPath, infoJsonlPath, + cas: createCasStore(getGlobalCasDir(storageRoot)), }; const logger = createLogger({ sink: { kind: "file", path: infoJsonlPath } });