From 1e9900bed3fb5bde6963b10bb487dc2018487d5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Tue, 12 May 2026 02:10:06 +0000 Subject: [PATCH] =?UTF-8?q?feat(#194):=20Phase=202=20=E2=80=94=20Engine=20?= =?UTF-8?q?layer=20Merkle=20call=20stack=20wiring?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Protocol: AgentFnResult = string | { output, childThread }, RoleOutput.childThread, ThreadContext.bundleHash for parent state lookup - Runtime: create-workflow normalizes AgentFnResult, propagates childThread in RoleOutput - Engine: ExecuteThreadOptions.parentStateHash, appendStateForStep writes childThread, putStartNode uses parentStateHash - workflowAsAgent: reads parent head state from threads.json, passes parentStateHash to child, returns { output, childThread: rootHash } - Integration test: 4 cases verifying bidirectional Merkle links (306 lines) Phase 2 of #194 (Merkle Call Stack). Closes #196. 小橘 --- .../__tests__/create-llm-adapter.test.ts | 1 + packages/workflow-cas/src/nodes.ts | 12 +- .../workflow-execute/__tests__/engine.test.ts | 9 +- .../__tests__/merkle-call-stack.test.ts | 306 ++++++++++++++++++ .../workflow-execute/src/engine/engine.ts | 8 +- .../src/engine/fork-thread.ts | 1 + packages/workflow-execute/src/engine/types.ts | 2 + .../workflow-execute/src/engine/worker.ts | 3 + .../workflow-execute/src/workflow-as-agent.ts | 19 +- .../__tests__/moderator-table.test.ts | 1 + packages/workflow-protocol/src/index.ts | 1 + packages/workflow-protocol/src/types.ts | 6 +- .../workflow-runtime/src/build-context.ts | 2 + .../workflow-runtime/src/create-workflow.ts | 16 +- packages/workflow-runtime/src/index.ts | 1 + packages/workflow-runtime/src/types.ts | 1 + .../__tests__/develop-template.test.ts | 1 + .../__tests__/solve-issue-template.test.ts | 2 + .../__tests__/build-agent-prompt.test.ts | 4 + 19 files changed, 381 insertions(+), 15 deletions(-) create mode 100644 packages/workflow-execute/__tests__/merkle-call-stack.test.ts diff --git a/packages/workflow-agent-llm/__tests__/create-llm-adapter.test.ts b/packages/workflow-agent-llm/__tests__/create-llm-adapter.test.ts index c40f8c7..8303efb 100644 --- a/packages/workflow-agent-llm/__tests__/create-llm-adapter.test.ts +++ b/packages/workflow-agent-llm/__tests__/create-llm-adapter.test.ts @@ -12,6 +12,7 @@ function makeCtx(userContent: string): AgentContext { timestamp: 1, }, depth: 0, + bundleHash: "TESTHASH00001", steps: [], threadId: "01TEST000000000000000000TR", currentRole: { name: "planner", systemPrompt: "system instructions" }, diff --git a/packages/workflow-cas/src/nodes.ts b/packages/workflow-cas/src/nodes.ts index 7db37b2..9db4f70 100644 --- a/packages/workflow-cas/src/nodes.ts +++ b/packages/workflow-cas/src/nodes.ts @@ -118,14 +118,22 @@ export function parseCasThreadNode(yamlText: string): ParsedCasThreadNode | null if (!isStartPayload(raw.payload)) { return null; } - const node: StartNode = { type: "start", payload: normalizeStartPayload(raw.payload), refs: [...refs] }; + const node: StartNode = { + type: "start", + payload: normalizeStartPayload(raw.payload), + refs: [...refs], + }; return { kind: "start", node }; } if (!isStatePayload(raw.payload)) { return null; } - const node: StateNode = { type: "state", payload: normalizeStatePayload(raw.payload), refs: [...refs] }; + const node: StateNode = { + type: "state", + payload: normalizeStatePayload(raw.payload), + refs: [...refs], + }; return { kind: "state", node }; } diff --git a/packages/workflow-execute/__tests__/engine.test.ts b/packages/workflow-execute/__tests__/engine.test.ts index 831e548..8dd713f 100644 --- a/packages/workflow-execute/__tests__/engine.test.ts +++ b/packages/workflow-execute/__tests__/engine.test.ts @@ -35,6 +35,7 @@ function noLogger(): (tag: string, content: string) => void { function makeOptions(overrides: Partial): ExecuteThreadOptions { return { depth: 0, + parentStateHash: null, signal: new AbortController().signal, awaitAfterEachYield: async () => {}, forkSourceThreadId: null, @@ -144,9 +145,9 @@ describe("executeThread (Phase 2 — CAS thread storage)", () => { runtime: WorkflowRuntime, ): AsyncGenerator { const h1 = await runtime.cas.put("plan-text"); - yield { role: "planner", contentHash: h1, meta: { plan: 1 }, refs: [h1] }; + yield { role: "planner", contentHash: h1, meta: { plan: 1 }, refs: [h1], childThread: null }; const h2 = await runtime.cas.put("code-text"); - yield { role: "coder", contentHash: h2, meta: { diff: "y" }, refs: [h2] }; + yield { role: "coder", contentHash: h2, meta: { diff: "y" }, refs: [h2], childThread: null }; return { returnCode: 0, summary: "done" }; }; @@ -210,7 +211,7 @@ describe("executeThread (Phase 2 — CAS thread storage)", () => { runtime: WorkflowRuntime, ): AsyncGenerator { const h = await runtime.cas.put("only-step"); - yield { role: "only", contentHash: h, meta: {}, refs: [h] }; + yield { role: "only", contentHash: h, meta: {}, refs: [h], childThread: null }; return { returnCode: 0, summary: "completed" }; }; @@ -261,7 +262,7 @@ describe("executeThread (Phase 2 — CAS thread storage)", () => { runtime: WorkflowRuntime, ): AsyncGenerator { const h = await runtime.cas.put("step"); - yield { role: "only", contentHash: h, meta: {}, refs: [h] }; + yield { role: "only", contentHash: h, meta: {}, refs: [h], childThread: null }; return { returnCode: 0, summary: "done" }; }; diff --git a/packages/workflow-execute/__tests__/merkle-call-stack.test.ts b/packages/workflow-execute/__tests__/merkle-call-stack.test.ts new file mode 100644 index 0000000..147716c --- /dev/null +++ b/packages/workflow-execute/__tests__/merkle-call-stack.test.ts @@ -0,0 +1,306 @@ +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import type { CasStore } from "@uncaged/workflow-cas"; +import { createCasStore, parseCasThreadNode } from "@uncaged/workflow-cas"; +import type { StartNode, StateNode } from "@uncaged/workflow-protocol"; +import type { + RoleOutput, + ThreadContext, + WorkflowCompletion, + WorkflowFn, + WorkflowRuntime, +} from "@uncaged/workflow-runtime"; + +import { executeThread } from "../src/engine/engine.js"; +import type { ExecuteThreadIo, ExecuteThreadOptions } from "../src/engine/types.js"; + +const TEST_REGISTRY_YAML = `config: + maxDepth: 3 + supervisorInterval: 0 + providers: + stub: + baseUrl: http://127.0.0.1:9 + apiKey: test + models: + default: stub/m +workflows: {} +`; + +function noLogger(): (tag: string, content: string) => void { + return () => {}; +} + +function makeOptions(overrides: Partial): ExecuteThreadOptions { + return { + depth: 0, + parentStateHash: null, + signal: new AbortController().signal, + awaitAfterEachYield: async () => {}, + forkSourceThreadId: null, + prefilledDiskSteps: null, + forkContinuation: null, + replayTimestamps: null, + storageRoot: "/tmp/never", + ...overrides, + }; +} + +async function setupStorage(): Promise<{ + storageRoot: string; + casDir: string; +}> { + const storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-merkle-")); + await writeFile(join(storageRoot, "workflow.yaml"), TEST_REGISTRY_YAML, "utf8"); + const casDir = join(storageRoot, "cas"); + await mkdir(casDir, { recursive: true }); + return { storageRoot, casDir }; +} + +async function loadStartNode(cas: CasStore, endHash: string): Promise { + const endBlob = await cas.get(endHash); + const endParsed = parseCasThreadNode(endBlob ?? ""); + if (endParsed?.kind !== "state") throw new Error("expected state node"); + const startBlob = await cas.get(endParsed.node.payload.start); + const startParsed = parseCasThreadNode(startBlob ?? ""); + if (startParsed?.kind !== "start") throw new Error("expected start node"); + return startParsed.node; +} + +async function loadStateNode(cas: CasStore, hash: string): Promise { + const blob = await cas.get(hash); + const parsed = parseCasThreadNode(blob ?? ""); + if (parsed?.kind !== "state") throw new Error("expected state node"); + return parsed.node; +} + +describe("Merkle call stack — cross-thread DAG linking (Phase 2)", () => { + let storageRoot: string; + let casDir: string; + + beforeEach(async () => { + const setup = await setupStorage(); + storageRoot = setup.storageRoot; + casDir = setup.casDir; + }); + + afterEach(async () => { + await rm(storageRoot, { recursive: true, force: true }); + }); + + test("parentStateHash is written into child start node's parentState and refs", async () => { + const cas = createCasStore(casDir); + + // biome-ignore lint/correctness/useYield: testing start-only path + const parentWf: WorkflowFn = async function* ( + _thread: ThreadContext, + _runtime: WorkflowRuntime, + ): AsyncGenerator { + return { returnCode: 0, summary: "parent done" }; + }; + + const parentResult = await executeThread( + parentWf, + "parent-wf", + { prompt: "parent task", steps: [] }, + makeOptions({ storageRoot }), + { + threadId: "P_THREAD_01", + hash: "PARENTHASH0001", + infoJsonlPath: join(storageRoot, "logs", "PARENTHASH0001", "P1.info.jsonl"), + cas, + }, + noLogger(), + ); + + // biome-ignore lint/correctness/useYield: testing start-only path + const childWf: WorkflowFn = async function* ( + _thread: ThreadContext, + _runtime: WorkflowRuntime, + ): AsyncGenerator { + return { returnCode: 0, summary: "child done" }; + }; + + const childResult = await executeThread( + childWf, + "child-wf", + { prompt: "child task", steps: [] }, + makeOptions({ storageRoot, depth: 1, parentStateHash: parentResult.rootHash }), + { + threadId: "C_THREAD_01", + hash: "CHILDHASH00001", + infoJsonlPath: join(storageRoot, "logs", "CHILDHASH00001", "C1.info.jsonl"), + cas, + }, + noLogger(), + ); + + const childStart = await loadStartNode(cas, childResult.rootHash); + expect(childStart.payload.parentState).toBe(parentResult.rootHash); + expect(childStart.refs).toContain(parentResult.rootHash); + }); + + test("childThread on parent state node points to child's final state and is in refs", async () => { + const cas = createCasStore(casDir); + const childFinalHash = "CHILD_FINAL_001"; + + const parentWf: WorkflowFn = async function* ( + _thread: ThreadContext, + runtime: WorkflowRuntime, + ): AsyncGenerator { + const h = await runtime.cas.put("developer output"); + yield { + role: "developer", + contentHash: h, + meta: { action: "delegate" }, + refs: [h], + childThread: childFinalHash, + }; + return { returnCode: 0, summary: "parent complete" }; + }; + + const result = await executeThread( + parentWf, + "parent-wf", + { prompt: "parent task", steps: [] }, + makeOptions({ storageRoot }), + { + threadId: "P_THREAD_02", + hash: "CTHREAD_TEST01", + infoJsonlPath: join(storageRoot, "logs", "CTHREAD_TEST01", "P2.info.jsonl"), + cas, + }, + noLogger(), + ); + + const endNode = await loadStateNode(cas, result.rootHash); + const devStateHash = endNode.payload.ancestors[0] ?? ""; + const devNode = await loadStateNode(cas, devStateHash); + + expect(devNode.payload.role).toBe("developer"); + expect(devNode.payload.childThread).toBe(childFinalHash); + expect(devNode.refs).toContain(childFinalHash); + }); + + test("parent state with no child has childThread: null", async () => { + const cas = createCasStore(casDir); + + const wf: WorkflowFn = async function* ( + _thread: ThreadContext, + runtime: WorkflowRuntime, + ): AsyncGenerator { + const h = await runtime.cas.put("prep output"); + yield { role: "preparer", contentHash: h, meta: {}, refs: [h], childThread: null }; + return { returnCode: 0, summary: "done" }; + }; + + const result = await executeThread( + wf, + "test-wf", + { prompt: "task", steps: [] }, + makeOptions({ storageRoot }), + { + threadId: "NULL_CT_01", + hash: "NULLCT_TEST001", + infoJsonlPath: join(storageRoot, "logs", "NULLCT_TEST001", "N1.info.jsonl"), + cas, + }, + noLogger(), + ); + + const endNode = await loadStateNode(cas, result.rootHash); + const prepHash = endNode.payload.ancestors[0] ?? ""; + const prepNode = await loadStateNode(cas, prepHash); + + expect(prepNode.payload.childThread).toBeNull(); + expect(prepNode.refs).not.toContain(null); + }); + + test("full bidirectional: child parentState is traversable to parent's context", async () => { + const cas = createCasStore(casDir); + const parentHash = "BIDIR_PARENT01"; + + const parentWf: WorkflowFn = async function* ( + _thread: ThreadContext, + runtime: WorkflowRuntime, + ): AsyncGenerator { + const h1 = await runtime.cas.put("preparation output"); + yield { + role: "preparer", + contentHash: h1, + meta: { repoPath: "/test" }, + refs: [h1], + childThread: null, + }; + const h2 = await runtime.cas.put("developer output"); + yield { + role: "developer", + contentHash: h2, + meta: { action: "code" }, + refs: [h2], + childThread: "CHILD_END_HASH1", + }; + return { returnCode: 0, summary: "all done" }; + }; + + const observedHeads: string[] = []; + const opts = makeOptions({ + storageRoot, + awaitAfterEachYield: async () => { + const bundleDir = join(storageRoot, "bundles", parentHash); + const text = await readFile(join(bundleDir, "threads.json"), "utf8"); + const parsed = JSON.parse(text) as Record; + const head = parsed.BIDIR_T_001?.head ?? null; + if (head !== null) observedHeads.push(head); + }, + }); + + await executeThread( + parentWf, + "bidir-wf", + { prompt: "bidir test", steps: [] }, + opts, + { + threadId: "BIDIR_T_001", + hash: parentHash, + infoJsonlPath: join(storageRoot, "logs", parentHash, "BD1.info.jsonl"), + cas, + }, + noLogger(), + ); + + expect(observedHeads.length).toBe(2); + const preparerStateHash = observedHeads[0] ?? ""; + + // Execute child with parentState pointing to parent's preparer state + // biome-ignore lint/correctness/useYield: testing start-only path + const childWf: WorkflowFn = async function* ( + _t: ThreadContext, + _r: WorkflowRuntime, + ): AsyncGenerator { + return { returnCode: 0, summary: "child ok" }; + }; + + const childResult = await executeThread( + childWf, + "bidir-child", + { prompt: "child bidir", steps: [] }, + makeOptions({ storageRoot, depth: 1, parentStateHash: preparerStateHash }), + { + threadId: "BIDIR_C_001", + hash: "BIDIR_CHILD001", + infoJsonlPath: join(storageRoot, "logs", "BIDIR_CHILD001", "BC1.info.jsonl"), + cas, + }, + noLogger(), + ); + + // Upward traversal: child start → parentState → preparer state → meta.repoPath + const childStart = await loadStartNode(cas, childResult.rootHash); + expect(childStart.payload.parentState).toBe(preparerStateHash); + + const parentPrep = await loadStateNode(cas, preparerStateHash); + expect(parentPrep.payload.meta.repoPath).toBe("/test"); + }); +}); diff --git a/packages/workflow-execute/src/engine/engine.ts b/packages/workflow-execute/src/engine/engine.ts index 8adde0f..b395f25 100644 --- a/packages/workflow-execute/src/engine/engine.ts +++ b/packages/workflow-execute/src/engine/engine.ts @@ -94,6 +94,7 @@ async function appendStateForStep(params: { meta: Record; refs: readonly string[]; timestamp: number; + childThread: string | null; }): Promise<{ stateHash: string; chain: ChainState }> { const text = await getContentMerklePayload(params.cas, params.contentHash); if (text === null) { @@ -112,7 +113,7 @@ async function appendStateForStep(params: { ancestors, compact: null, timestamp: params.timestamp, - childThread: null, + childThread: params.childThread, }; const stateHash = await putStateNode(params.cas, payload); return { @@ -331,6 +332,7 @@ async function driveWorkflowGenerator(params: { meta: step.meta, refs: step.refs, timestamp: ts, + childThread: step.childThread ?? null, }); chain = written_.chain; await publishHead({ bundleDir, threadId, startHash, headHash: written_.stateHash }); @@ -441,7 +443,7 @@ export async function executeThread( name: workflowName, hash: io.hash, depth: options.depth, - parentState: null, + parentState: options.parentStateHash, }, promptHash, ); @@ -469,6 +471,7 @@ export async function executeThread( meta: row.meta, refs: row.refs, timestamp: row.timestamp, + childThread: null, }); chain = written.chain; await publishHead({ @@ -490,6 +493,7 @@ export async function executeThread( const thread: ThreadContext = { threadId: io.threadId, depth: options.depth, + bundleHash: io.hash, start: { role: START, content: input.prompt, diff --git a/packages/workflow-execute/src/engine/fork-thread.ts b/packages/workflow-execute/src/engine/fork-thread.ts index a4a3084..6ce28f4 100644 --- a/packages/workflow-execute/src/engine/fork-thread.ts +++ b/packages/workflow-execute/src/engine/fork-thread.ts @@ -144,6 +144,7 @@ async function payloadToRoleOutput(cas: CasStore, payload: StateNodePayload): Pr contentHash: payload.content, meta: payload.meta, refs, + childThread: payload.childThread, }; } diff --git a/packages/workflow-execute/src/engine/types.ts b/packages/workflow-execute/src/engine/types.ts index b3de42f..90f9f60 100644 --- a/packages/workflow-execute/src/engine/types.ts +++ b/packages/workflow-execute/src/engine/types.ts @@ -41,6 +41,8 @@ export type PrefilledDiskStep = { export type ExecuteThreadOptions = { /** Passed to the bundle thread context as `ThreadContext.depth`. */ depth: number; + /** Parent thread's head state hash at spawn time; `null` for top-level threads. */ + parentStateHash: string | null; signal: AbortSignal; /** Invoked after each successful yield (and outer-loop checks); used for pause/resume. */ awaitAfterEachYield: () => Promise; diff --git a/packages/workflow-execute/src/engine/worker.ts b/packages/workflow-execute/src/engine/worker.ts index f8407b8..42c2fe8 100644 --- a/packages/workflow-execute/src/engine/worker.ts +++ b/packages/workflow-execute/src/engine/worker.ts @@ -72,11 +72,13 @@ function parseRoleOutputRecord(obj: Record): RoleOutput | null if (meta === null || typeof meta !== "object") { return null; } + const childThread = obj.childThread; return { role, contentHash, meta: meta as Record, refs: normalizeRefsField(obj.refs), + childThread: typeof childThread === "string" ? childThread : null, }; } @@ -497,6 +499,7 @@ async function main(): Promise { { prompt: cmd.prompt, steps: cmd.steps }, { ...cmd.options, + parentStateHash: null, signal: ac.signal, awaitAfterEachYield: () => pauseGate.awaitAfterYield(), forkSourceThreadId: cmd.forkSourceThreadId, diff --git a/packages/workflow-execute/src/workflow-as-agent.ts b/packages/workflow-execute/src/workflow-as-agent.ts index 87663c7..a397cfc 100644 --- a/packages/workflow-execute/src/workflow-as-agent.ts +++ b/packages/workflow-execute/src/workflow-as-agent.ts @@ -6,7 +6,7 @@ import { getRegisteredWorkflow, readWorkflowRegistry, } from "@uncaged/workflow-register"; -import type { AgentContext, AgentFn } from "@uncaged/workflow-runtime"; +import type { AgentContext, AgentFn, AgentFnResult } from "@uncaged/workflow-runtime"; import { createLogger, generateUlid, @@ -14,7 +14,7 @@ import { getGlobalCasDir, } from "@uncaged/workflow-util"; import type { ExecuteThreadIo } from "./engine/index.js"; -import { executeThread } from "./engine/index.js"; +import { executeThread, getBundleDir, readThreadsIndex } from "./engine/index.js"; const DEFAULT_WORKFLOW_AS_AGENT_MAX_DEPTH = 3; @@ -37,6 +37,13 @@ function resolveWorkflowAsAgentStorageRoot(options: WorkflowAsAgentOptions | nul return getDefaultWorkflowStorageRoot(); } +async function readParentHeadState(storageRoot: string, ctx: AgentContext): Promise { + const bundleDir = getBundleDir(storageRoot, ctx.bundleHash); + const index = await readThreadsIndex(bundleDir); + const entry = index[ctx.threadId] ?? null; + return entry !== null ? entry.head : null; +} + /** * Returns an {@link AgentFn} that runs another registered workflow in a new thread, * using the parent thread's initial prompt (`ctx.start.content`) as the child prompt. @@ -45,7 +52,7 @@ export function workflowAsAgent( workflowName: string, options: WorkflowAsAgentOptions | null = null, ): AgentFn { - return async (ctx: AgentContext): Promise => { + return async (ctx: AgentContext): Promise => { const nextDepth = ctx.depth + 1; const storageRoot = resolveWorkflowAsAgentStorageRoot(options); @@ -89,6 +96,8 @@ export function workflowAsAgent( const logger = createLogger({ sink: { kind: "file", path: infoJsonlPath } }); const signalNever = new AbortController(); + const parentHeadState = await readParentHeadState(storageRoot, ctx); + try { const result = await executeThread( bundleExportsResult.value.run, @@ -96,6 +105,7 @@ export function workflowAsAgent( input, { depth: nextDepth, + parentStateHash: parentHeadState, signal: signalNever.signal, awaitAfterEachYield: async () => {}, forkSourceThreadId: ctx.threadId, @@ -107,7 +117,8 @@ export function workflowAsAgent( io, logger, ); - return `Child workflow "${workflowName}" completed (returnCode=${result.returnCode}).\n\nSummary: ${result.summary}\n\nChild thread root hash: ${result.rootHash}`; + const summary = `Child workflow "${workflowName}" completed (returnCode=${result.returnCode}).\n\nSummary: ${result.summary}\n\nChild thread root hash: ${result.rootHash}`; + return { output: summary, childThread: result.rootHash }; } catch (e) { const message = e instanceof Error ? e.message : String(e); return `ERROR: ${message}`; diff --git a/packages/workflow-protocol/__tests__/moderator-table.test.ts b/packages/workflow-protocol/__tests__/moderator-table.test.ts index 5e01f9c..6d66a51 100644 --- a/packages/workflow-protocol/__tests__/moderator-table.test.ts +++ b/packages/workflow-protocol/__tests__/moderator-table.test.ts @@ -21,6 +21,7 @@ function makeCtx(roles: (keyof TestMeta & string)[]): ModeratorContext return { threadId: "test-thread", depth: 0, + bundleHash: "TESTHASH00001", start: { role: START, content: "test", diff --git a/packages/workflow-protocol/src/index.ts b/packages/workflow-protocol/src/index.ts index 0390a0b..302067f 100644 --- a/packages/workflow-protocol/src/index.ts +++ b/packages/workflow-protocol/src/index.ts @@ -13,6 +13,7 @@ export type { AgentBinding, AgentContext, AgentFn, + AgentFnResult, CasStore, ExtractFn, ExtractResult, diff --git a/packages/workflow-protocol/src/types.ts b/packages/workflow-protocol/src/types.ts index 944a193..08bc5af 100644 --- a/packages/workflow-protocol/src/types.ts +++ b/packages/workflow-protocol/src/types.ts @@ -41,6 +41,7 @@ export type RoleOutput = { contentHash: string; meta: Record; refs: string[]; + childThread: string | null; }; export type StartStep = { @@ -63,6 +64,7 @@ export type RoleStep = { export type ThreadContext = { threadId: string; depth: number; + bundleHash: string; start: StartStep; steps: RoleStep[]; }; @@ -127,7 +129,9 @@ export type ExtractFn = >( contentHash: string, ) => Promise>; -export type AgentFn = (ctx: AgentContext) => Promise; +export type AgentFnResult = string | { output: string; childThread: string | null }; + +export type AgentFn = (ctx: AgentContext) => Promise; export type AgentBinding = { agent: AgentFn; diff --git a/packages/workflow-runtime/src/build-context.ts b/packages/workflow-runtime/src/build-context.ts index 6d3bccb..874c4a5 100644 --- a/packages/workflow-runtime/src/build-context.ts +++ b/packages/workflow-runtime/src/build-context.ts @@ -54,6 +54,7 @@ async function threadFromStartHead( return { threadId: "", depth: p.depth, + bundleHash: p.hash, start: { role: START, content: prompt, @@ -113,6 +114,7 @@ async function threadFromStateHead( return { threadId: "", depth: sp.depth, + bundleHash: sp.hash, start: { role: START, content: prompt, diff --git a/packages/workflow-runtime/src/create-workflow.ts b/packages/workflow-runtime/src/create-workflow.ts index 310f6f7..30a7390 100644 --- a/packages/workflow-runtime/src/create-workflow.ts +++ b/packages/workflow-runtime/src/create-workflow.ts @@ -6,6 +6,7 @@ import { type AgentBinding, type AgentContext, type AgentFn, + type AgentFnResult, END, type ModeratorContext, type RoleDefinition, @@ -49,6 +50,16 @@ function mergeUniqueHashes(a: readonly string[], b: readonly string[]): string[] return out; } +function normalizeAgentResult(result: AgentFnResult): { + output: string; + childThread: string | null; +} { + if (typeof result === "string") { + return { output: result, childThread: null }; + } + return result; +} + function agentForRole(binding: AgentBinding, roleName: string): AgentFn { const overrides = binding.overrides; const overrideFn: AgentFn | undefined = @@ -86,9 +97,9 @@ async function advanceOneRound( }; const agent = agentForRole(binding, next); - const raw = await agent(agentCtx as unknown as AgentContext); + const agentResult = normalizeAgentResult(await agent(agentCtx as unknown as AgentContext)); - const agentContentHash = await putContentNodeWithRefs(runtime.cas, raw, []); + const agentContentHash = await putContentNodeWithRefs(runtime.cas, agentResult.output, []); const extracted = await runtime.extract( roleDef.schema as z.ZodType>, @@ -122,6 +133,7 @@ async function advanceOneRound( contentHash: step.contentHash, meta: step.meta, refs: step.refs, + childThread: agentResult.childThread, }, step, }; diff --git a/packages/workflow-runtime/src/index.ts b/packages/workflow-runtime/src/index.ts index 20fe8b4..73f5326 100644 --- a/packages/workflow-runtime/src/index.ts +++ b/packages/workflow-runtime/src/index.ts @@ -5,6 +5,7 @@ export type { AgentBinding, AgentContext, AgentFn, + AgentFnResult, CasStore, ExtractFn, ExtractResult, diff --git a/packages/workflow-runtime/src/types.ts b/packages/workflow-runtime/src/types.ts index 53491ec..b453408 100644 --- a/packages/workflow-runtime/src/types.ts +++ b/packages/workflow-runtime/src/types.ts @@ -7,6 +7,7 @@ export type { AgentBinding, AgentContext, AgentFn, + AgentFnResult, CasStore, ExtractFn, ExtractResult, diff --git a/packages/workflow-template-develop/__tests__/develop-template.test.ts b/packages/workflow-template-develop/__tests__/develop-template.test.ts index 483bebe..40ea336 100644 --- a/packages/workflow-template-develop/__tests__/develop-template.test.ts +++ b/packages/workflow-template-develop/__tests__/develop-template.test.ts @@ -26,6 +26,7 @@ function makeCtx(steps: ModeratorContext["steps"]): ModeratorContex return { threadId: "01TEST000000000000000000TR", depth: 0, + bundleHash: "TESTHASH00001", start: makeStart(), steps, }; diff --git a/packages/workflow-template-solve-issue/__tests__/solve-issue-template.test.ts b/packages/workflow-template-solve-issue/__tests__/solve-issue-template.test.ts index 5af8e56..8a64588 100644 --- a/packages/workflow-template-solve-issue/__tests__/solve-issue-template.test.ts +++ b/packages/workflow-template-solve-issue/__tests__/solve-issue-template.test.ts @@ -113,6 +113,7 @@ function makeCtx( return { threadId: "01TEST000000000000000000TR", depth: 0, + bundleHash: "TESTHASH00001", start: makeStart(), steps, }; @@ -178,6 +179,7 @@ function makeThread(prompt: string) { return { threadId: "01TEST000000000000000000TR", depth: 0, + bundleHash: "TESTHASH00001", start: { role: START, content: prompt, diff --git a/packages/workflow-util-agent/__tests__/build-agent-prompt.test.ts b/packages/workflow-util-agent/__tests__/build-agent-prompt.test.ts index 5bbeba0..3601015 100644 --- a/packages/workflow-util-agent/__tests__/build-agent-prompt.test.ts +++ b/packages/workflow-util-agent/__tests__/build-agent-prompt.test.ts @@ -17,6 +17,7 @@ describe("buildAgentPrompt", () => { const ctx: AgentContext = { start: startTask("fix the bug"), depth: 0, + bundleHash: "TESTHASH00001", steps: [], threadId: "01TEST000000000000000000TR", currentRole: { name: START, systemPrompt: "You are an agent." }, @@ -33,6 +34,7 @@ describe("buildAgentPrompt", () => { const ctx: AgentContext = { start: startTask("user task"), depth: 0, + bundleHash: "TESTHASH00001", threadId: "01TEST000000000000000000TR", currentRole: { name: "coder", systemPrompt: "Be helpful." }, steps: [ @@ -61,6 +63,7 @@ describe("buildAgentPrompt", () => { const ctx: AgentContext = { start: startTask("first message full: task content here"), depth: 0, + bundleHash: "TESTHASH00001", threadId: "01TEST000000000000000000TR", currentRole: { name: "coder", systemPrompt: "System." }, steps: [ @@ -99,6 +102,7 @@ describe("buildAgentPrompt", () => { const ctx: AgentContext = { start: startTask("start"), depth: 0, + bundleHash: "TESTHASH00001", threadId: "01TEST000000000000000000TR", currentRole: { name: "c", systemPrompt: "S" }, steps: [