Compare commits

...

4 Commits

Author SHA1 Message Date
xiaoju 1e9900bed3 feat(#194): Phase 2 — Engine layer Merkle call stack wiring
- Protocol: AgentFnResult = string | { output, childThread }, RoleOutput.childThread,
  ThreadContext.bundleHash for parent state lookup
- Runtime: create-workflow normalizes AgentFnResult, propagates childThread in RoleOutput
- Engine: ExecuteThreadOptions.parentStateHash, appendStateForStep writes childThread,
  putStartNode uses parentStateHash
- workflowAsAgent: reads parent head state from threads.json, passes parentStateHash
  to child, returns { output, childThread: rootHash }
- Integration test: 4 cases verifying bidirectional Merkle links (306 lines)

Phase 2 of #194 (Merkle Call Stack). Closes #196.

小橘 <xiaoju@shazhou.work>
2026-05-12 02:10:06 +00:00
xiaomo 9c1b018ffa Merge pull request 'feat(#194): Phase 1 — Merkle Call Stack protocol + CAS layer' (#199) from feat/194-merkle-call-stack-phase1 into main 2026-05-12 01:50:05 +00:00
xiaoju a98431a12a feat(#194): Phase 1 — parentState / childThread in CAS nodes
- Protocol: StartNodePayload.parentState, StateNodePayload.childThread
- CAS: putStartNode refs include parentState, collectRefs includes childThread
- Parsing: legacy nodes without new fields default to null
- Engine + fork: all callers pass parentState: null / childThread: null
- Tests: 8 new cases for refs, parsing, collect-refs (+208 lines)

Phase 1 of #194 (Merkle Call Stack). Closes #195.

小橘 <xiaoju@shazhou.work>
2026-05-12 01:42:10 +00:00
xiaoju e37dbc3f35 wip: Phase 1 protocol + CAS types for Merkle call stack
小橘 <xiaoju@shazhou.work>
2026-05-12 01:35:45 +00:00
26 changed files with 631 additions and 20 deletions
@@ -45,8 +45,8 @@ describe("gc cli and garbageCollectCas", () => {
{
name: "demo",
hash: bundleHash,
depth: 0,
parentState: null,
},
promptHash,
);
@@ -100,8 +100,8 @@ describe("gc cli and garbageCollectCas", () => {
{
name: "demo",
hash: bundleHash,
depth: 0,
parentState: null,
},
promptHash,
);
@@ -135,8 +135,8 @@ describe("gc cli and garbageCollectCas", () => {
{
name: "demo",
hash: bundleHash,
depth: 0,
parentState: null,
},
promptHash,
);
@@ -12,6 +12,7 @@ function makeCtx(userContent: string): AgentContext {
timestamp: 1,
},
depth: 0,
bundleHash: "TESTHASH00001",
steps: [],
threadId: "01TEST000000000000000000TR",
currentRole: { name: "planner", systemPrompt: "system instructions" },
@@ -14,6 +14,7 @@ function payload(
ancestors: partial.ancestors ?? [],
compact: partial.compact ?? null,
timestamp: partial.timestamp ?? 0,
childThread: partial.childThread ?? null,
};
}
@@ -62,4 +63,32 @@ describe("collectRefs", () => {
);
expect(refs).toEqual(["S2", "C2"]);
});
test("includes childThread hash when childThread is non-null", () => {
const refs = collectRefs(
payload({
role: "developer",
start: "S3",
content: "C3",
ancestors: ["A3"],
compact: null,
childThread: "CHILDEND000000000000001",
}),
);
expect(refs).toEqual(["S3", "C3", "A3", "CHILDEND000000000000001"]);
});
test("does not include childThread when childThread is null", () => {
const refs = collectRefs(
payload({
role: "developer",
start: "S4",
content: "C4",
ancestors: [],
compact: null,
childThread: null,
}),
);
expect(refs).toEqual(["S4", "C4"]);
});
});
@@ -0,0 +1,161 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { stringify } from "yaml";
import { createCasStore } from "../src/cas.js";
import { parseCasThreadNode, putStartNode, putStateNode } from "../src/nodes.js";
describe("putStartNode — parentState in refs", () => {
let dir: string;
beforeEach(async () => {
dir = await mkdtemp(join(tmpdir(), "wf-cas-nodes-"));
});
afterEach(async () => {
await rm(dir, { recursive: true, force: true });
});
test("refs contains only promptHash when parentState is null", async () => {
const cas = createCasStore(join(dir, "cas"));
const promptHash = await cas.put("hello");
const startHash = await putStartNode(
cas,
{ name: "demo", hash: "BUNDLEAAAAAAAAA", depth: 0, parentState: null },
promptHash,
);
const blob = await cas.get(startHash);
expect(blob).not.toBeNull();
const parsed = parseCasThreadNode(blob ?? "");
expect(parsed).not.toBeNull();
expect(parsed?.kind).toBe("start");
if (parsed?.kind !== "start") return;
expect(parsed.node.refs).toEqual([promptHash]);
expect(parsed.node.payload.parentState).toBeNull();
});
test("refs contains [promptHash, parentStateHash] when parentState is set", async () => {
const cas = createCasStore(join(dir, "cas"));
const parentStateHash = await cas.put("fake-parent-state");
const promptHash = await cas.put("child-prompt");
const startHash = await putStartNode(
cas,
{ name: "develop", hash: "BUNDLEBBBBBBBBB", depth: 1, parentState: parentStateHash },
promptHash,
);
const blob = await cas.get(startHash);
expect(blob).not.toBeNull();
const parsed = parseCasThreadNode(blob ?? "");
expect(parsed).not.toBeNull();
expect(parsed?.kind).toBe("start");
if (parsed?.kind !== "start") return;
expect(parsed.node.refs).toEqual([promptHash, parentStateHash]);
expect(parsed.node.payload.parentState).toBe(parentStateHash);
});
});
describe("putStateNode — childThread in refs", () => {
let dir: string;
beforeEach(async () => {
dir = await mkdtemp(join(tmpdir(), "wf-cas-nodes-state-"));
});
afterEach(async () => {
await rm(dir, { recursive: true, force: true });
});
test("refs does not include childThread when childThread is null", async () => {
const cas = createCasStore(join(dir, "cas"));
const startHash = await cas.put("start");
const contentHash = await cas.put("content");
const stateHash = await putStateNode(cas, {
role: "planner",
meta: {},
start: startHash,
content: contentHash,
ancestors: [],
compact: null,
timestamp: 1000,
childThread: null,
});
const blob = await cas.get(stateHash);
expect(blob).not.toBeNull();
const parsed = parseCasThreadNode(blob ?? "");
expect(parsed?.kind).toBe("state");
if (parsed?.kind !== "state") return;
expect(parsed.node.refs).not.toContain("anything-else");
expect(parsed.node.refs).toEqual([startHash, contentHash]);
expect(parsed.node.payload.childThread).toBeNull();
});
test("refs includes childThread hash when childThread is set", async () => {
const cas = createCasStore(join(dir, "cas"));
const startHash = await cas.put("start");
const contentHash = await cas.put("content");
const childEndHash = await cas.put("child-end-state");
const stateHash = await putStateNode(cas, {
role: "developer",
meta: { pr: 42 },
start: startHash,
content: contentHash,
ancestors: [],
compact: null,
timestamp: 2000,
childThread: childEndHash,
});
const blob = await cas.get(stateHash);
expect(blob).not.toBeNull();
const parsed = parseCasThreadNode(blob ?? "");
expect(parsed?.kind).toBe("state");
if (parsed?.kind !== "state") return;
expect(parsed.node.refs).toContain(childEndHash);
expect(parsed.node.payload.childThread).toBe(childEndHash);
});
});
describe("parseCasThreadNode — legacy node compatibility", () => {
test("start node without parentState field defaults to null", () => {
const yaml = stringify({
type: "start",
payload: { name: "demo", hash: "BUNDLEAAAAAAAAA", depth: 0 },
refs: ["PROMPTHASH00001"],
});
const parsed = parseCasThreadNode(yaml);
expect(parsed).not.toBeNull();
expect(parsed?.kind).toBe("start");
if (parsed?.kind !== "start") return;
expect(parsed.node.payload.parentState).toBeNull();
});
test("state node without childThread field defaults to null", () => {
const yaml = stringify({
type: "state",
payload: {
role: "planner",
meta: {},
start: "STARTHASH00001",
content: "CONTENTHASH0001",
ancestors: [],
compact: null,
timestamp: 1000,
},
refs: ["STARTHASH00001", "CONTENTHASH0001"],
});
const parsed = parseCasThreadNode(yaml);
expect(parsed).not.toBeNull();
expect(parsed?.kind).toBe("state");
if (parsed?.kind !== "state") return;
expect(parsed.node.payload.childThread).toBeNull();
});
});
@@ -9,5 +9,8 @@ export function collectRefs(payload: StateNode["payload"]): string[] {
if (payload.compact !== null) {
out.push(payload.compact);
}
if (payload.childThread !== null) {
out.push(payload.childThread);
}
return out;
}
+47 -3
View File
@@ -18,6 +18,10 @@ function isStartPayload(value: unknown): value is StartNodePayload {
if (!isRecord(value)) {
return false;
}
const parentState = value.parentState;
if (parentState !== undefined && parentState !== null && typeof parentState !== "string") {
return false;
}
return (
typeof value.name === "string" &&
typeof value.hash === "string" &&
@@ -25,6 +29,16 @@ function isStartPayload(value: unknown): value is StartNodePayload {
);
}
/** Normalizes a raw start payload, defaulting `parentState` to `null` for legacy nodes. */
function normalizeStartPayload(raw: StartNodePayload): StartNodePayload {
return {
name: raw.name,
hash: raw.hash,
depth: raw.depth,
parentState: raw.parentState ?? null,
};
}
function isStatePayload(value: unknown): value is StateNodePayload {
if (!isRecord(value)) {
return false;
@@ -41,6 +55,10 @@ function isStatePayload(value: unknown): value is StateNodePayload {
if (!isRecord(meta)) {
return false;
}
const childThread = value.childThread;
if (childThread !== undefined && childThread !== null && typeof childThread !== "string") {
return false;
}
return (
typeof value.role === "string" &&
typeof value.start === "string" &&
@@ -49,6 +67,20 @@ function isStatePayload(value: unknown): value is StateNodePayload {
);
}
/** Normalizes a raw state payload, defaulting `childThread` to `null` for legacy nodes. */
function normalizeStatePayload(raw: StateNodePayload): StateNodePayload {
return {
role: raw.role,
meta: raw.meta,
start: raw.start,
content: raw.content,
ancestors: raw.ancestors,
compact: raw.compact,
timestamp: raw.timestamp,
childThread: raw.childThread ?? null,
};
}
/** Parses a YAML CAS blob into a typed RFC v3 thread node (or legacy content layout with `children`). */
export function parseCasThreadNode(yamlText: string): ParsedCasThreadNode | null {
let raw: unknown;
@@ -86,14 +118,22 @@ export function parseCasThreadNode(yamlText: string): ParsedCasThreadNode | null
if (!isStartPayload(raw.payload)) {
return null;
}
const node: StartNode = { type: "start", payload: raw.payload, refs: [...refs] };
const node: StartNode = {
type: "start",
payload: normalizeStartPayload(raw.payload),
refs: [...refs],
};
return { kind: "start", node };
}
if (!isStatePayload(raw.payload)) {
return null;
}
const node: StateNode = { type: "state", payload: raw.payload, refs: [...refs] };
const node: StateNode = {
type: "state",
payload: normalizeStatePayload(raw.payload),
refs: [...refs],
};
return { kind: "state", node };
}
@@ -143,10 +183,14 @@ export async function putStartNode(
payload: StartNode["payload"],
promptHash: string,
): Promise<string> {
const refs = [promptHash];
if (payload.parentState !== null) {
refs.push(payload.parentState);
}
const node: StartNode = {
type: "start",
payload,
refs: [promptHash],
refs,
};
return store.put(serializeCasNode(node));
}
@@ -35,6 +35,7 @@ function noLogger(): (tag: string, content: string) => void {
function makeOptions(overrides: Partial<ExecuteThreadOptions>): ExecuteThreadOptions {
return {
depth: 0,
parentStateHash: null,
signal: new AbortController().signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: null,
@@ -144,9 +145,9 @@ describe("executeThread (Phase 2 — CAS thread storage)", () => {
runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
const h1 = await runtime.cas.put("plan-text");
yield { role: "planner", contentHash: h1, meta: { plan: 1 }, refs: [h1] };
yield { role: "planner", contentHash: h1, meta: { plan: 1 }, refs: [h1], childThread: null };
const h2 = await runtime.cas.put("code-text");
yield { role: "coder", contentHash: h2, meta: { diff: "y" }, refs: [h2] };
yield { role: "coder", contentHash: h2, meta: { diff: "y" }, refs: [h2], childThread: null };
return { returnCode: 0, summary: "done" };
};
@@ -210,7 +211,7 @@ describe("executeThread (Phase 2 — CAS thread storage)", () => {
runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
const h = await runtime.cas.put("only-step");
yield { role: "only", contentHash: h, meta: {}, refs: [h] };
yield { role: "only", contentHash: h, meta: {}, refs: [h], childThread: null };
return { returnCode: 0, summary: "completed" };
};
@@ -261,7 +262,7 @@ describe("executeThread (Phase 2 — CAS thread storage)", () => {
runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
const h = await runtime.cas.put("step");
yield { role: "only", contentHash: h, meta: {}, refs: [h] };
yield { role: "only", contentHash: h, meta: {}, refs: [h], childThread: null };
return { returnCode: 0, summary: "done" };
};
@@ -46,6 +46,7 @@ describe("garbageCollectCas (mark-and-sweep)", () => {
name: "demo",
hash: bundleHash,
depth: 0,
parentState: null,
},
promptHash,
);
@@ -59,6 +60,7 @@ describe("garbageCollectCas (mark-and-sweep)", () => {
ancestors: [],
compact: null,
timestamp: 1,
childThread: null,
} satisfies StateNodePayload);
const c2 = await putContentNodeWithRefs(cas, "c1", []);
@@ -70,6 +72,7 @@ describe("garbageCollectCas (mark-and-sweep)", () => {
ancestors: [h1],
compact: null,
timestamp: 2,
childThread: null,
} satisfies StateNodePayload);
const ec = await putContentNodeWithRefs(cas, "", []);
@@ -81,6 +84,7 @@ describe("garbageCollectCas (mark-and-sweep)", () => {
ancestors: [h1],
compact: null,
timestamp: 3,
childThread: null,
} satisfies StateNodePayload);
await upsertThreadEntry(bundleDir, "THREAD_AAAAAAA", {
@@ -0,0 +1,306 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import type { CasStore } from "@uncaged/workflow-cas";
import { createCasStore, parseCasThreadNode } from "@uncaged/workflow-cas";
import type { StartNode, StateNode } from "@uncaged/workflow-protocol";
import type {
RoleOutput,
ThreadContext,
WorkflowCompletion,
WorkflowFn,
WorkflowRuntime,
} from "@uncaged/workflow-runtime";
import { executeThread } from "../src/engine/engine.js";
import type { ExecuteThreadIo, ExecuteThreadOptions } from "../src/engine/types.js";
const TEST_REGISTRY_YAML = `config:
maxDepth: 3
supervisorInterval: 0
providers:
stub:
baseUrl: http://127.0.0.1:9
apiKey: test
models:
default: stub/m
workflows: {}
`;
function noLogger(): (tag: string, content: string) => void {
return () => {};
}
function makeOptions(overrides: Partial<ExecuteThreadOptions>): ExecuteThreadOptions {
return {
depth: 0,
parentStateHash: null,
signal: new AbortController().signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: null,
prefilledDiskSteps: null,
forkContinuation: null,
replayTimestamps: null,
storageRoot: "/tmp/never",
...overrides,
};
}
async function setupStorage(): Promise<{
storageRoot: string;
casDir: string;
}> {
const storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-merkle-"));
await writeFile(join(storageRoot, "workflow.yaml"), TEST_REGISTRY_YAML, "utf8");
const casDir = join(storageRoot, "cas");
await mkdir(casDir, { recursive: true });
return { storageRoot, casDir };
}
async function loadStartNode(cas: CasStore, endHash: string): Promise<StartNode> {
const endBlob = await cas.get(endHash);
const endParsed = parseCasThreadNode(endBlob ?? "");
if (endParsed?.kind !== "state") throw new Error("expected state node");
const startBlob = await cas.get(endParsed.node.payload.start);
const startParsed = parseCasThreadNode(startBlob ?? "");
if (startParsed?.kind !== "start") throw new Error("expected start node");
return startParsed.node;
}
async function loadStateNode(cas: CasStore, hash: string): Promise<StateNode> {
const blob = await cas.get(hash);
const parsed = parseCasThreadNode(blob ?? "");
if (parsed?.kind !== "state") throw new Error("expected state node");
return parsed.node;
}
describe("Merkle call stack — cross-thread DAG linking (Phase 2)", () => {
let storageRoot: string;
let casDir: string;
beforeEach(async () => {
const setup = await setupStorage();
storageRoot = setup.storageRoot;
casDir = setup.casDir;
});
afterEach(async () => {
await rm(storageRoot, { recursive: true, force: true });
});
test("parentStateHash is written into child start node's parentState and refs", async () => {
const cas = createCasStore(casDir);
// biome-ignore lint/correctness/useYield: testing start-only path
const parentWf: WorkflowFn = async function* (
_thread: ThreadContext,
_runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
return { returnCode: 0, summary: "parent done" };
};
const parentResult = await executeThread(
parentWf,
"parent-wf",
{ prompt: "parent task", steps: [] },
makeOptions({ storageRoot }),
{
threadId: "P_THREAD_01",
hash: "PARENTHASH0001",
infoJsonlPath: join(storageRoot, "logs", "PARENTHASH0001", "P1.info.jsonl"),
cas,
},
noLogger(),
);
// biome-ignore lint/correctness/useYield: testing start-only path
const childWf: WorkflowFn = async function* (
_thread: ThreadContext,
_runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
return { returnCode: 0, summary: "child done" };
};
const childResult = await executeThread(
childWf,
"child-wf",
{ prompt: "child task", steps: [] },
makeOptions({ storageRoot, depth: 1, parentStateHash: parentResult.rootHash }),
{
threadId: "C_THREAD_01",
hash: "CHILDHASH00001",
infoJsonlPath: join(storageRoot, "logs", "CHILDHASH00001", "C1.info.jsonl"),
cas,
},
noLogger(),
);
const childStart = await loadStartNode(cas, childResult.rootHash);
expect(childStart.payload.parentState).toBe(parentResult.rootHash);
expect(childStart.refs).toContain(parentResult.rootHash);
});
test("childThread on parent state node points to child's final state and is in refs", async () => {
const cas = createCasStore(casDir);
const childFinalHash = "CHILD_FINAL_001";
const parentWf: WorkflowFn = async function* (
_thread: ThreadContext,
runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
const h = await runtime.cas.put("developer output");
yield {
role: "developer",
contentHash: h,
meta: { action: "delegate" },
refs: [h],
childThread: childFinalHash,
};
return { returnCode: 0, summary: "parent complete" };
};
const result = await executeThread(
parentWf,
"parent-wf",
{ prompt: "parent task", steps: [] },
makeOptions({ storageRoot }),
{
threadId: "P_THREAD_02",
hash: "CTHREAD_TEST01",
infoJsonlPath: join(storageRoot, "logs", "CTHREAD_TEST01", "P2.info.jsonl"),
cas,
},
noLogger(),
);
const endNode = await loadStateNode(cas, result.rootHash);
const devStateHash = endNode.payload.ancestors[0] ?? "";
const devNode = await loadStateNode(cas, devStateHash);
expect(devNode.payload.role).toBe("developer");
expect(devNode.payload.childThread).toBe(childFinalHash);
expect(devNode.refs).toContain(childFinalHash);
});
test("parent state with no child has childThread: null", async () => {
const cas = createCasStore(casDir);
const wf: WorkflowFn = async function* (
_thread: ThreadContext,
runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
const h = await runtime.cas.put("prep output");
yield { role: "preparer", contentHash: h, meta: {}, refs: [h], childThread: null };
return { returnCode: 0, summary: "done" };
};
const result = await executeThread(
wf,
"test-wf",
{ prompt: "task", steps: [] },
makeOptions({ storageRoot }),
{
threadId: "NULL_CT_01",
hash: "NULLCT_TEST001",
infoJsonlPath: join(storageRoot, "logs", "NULLCT_TEST001", "N1.info.jsonl"),
cas,
},
noLogger(),
);
const endNode = await loadStateNode(cas, result.rootHash);
const prepHash = endNode.payload.ancestors[0] ?? "";
const prepNode = await loadStateNode(cas, prepHash);
expect(prepNode.payload.childThread).toBeNull();
expect(prepNode.refs).not.toContain(null);
});
test("full bidirectional: child parentState is traversable to parent's context", async () => {
const cas = createCasStore(casDir);
const parentHash = "BIDIR_PARENT01";
const parentWf: WorkflowFn = async function* (
_thread: ThreadContext,
runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
const h1 = await runtime.cas.put("preparation output");
yield {
role: "preparer",
contentHash: h1,
meta: { repoPath: "/test" },
refs: [h1],
childThread: null,
};
const h2 = await runtime.cas.put("developer output");
yield {
role: "developer",
contentHash: h2,
meta: { action: "code" },
refs: [h2],
childThread: "CHILD_END_HASH1",
};
return { returnCode: 0, summary: "all done" };
};
const observedHeads: string[] = [];
const opts = makeOptions({
storageRoot,
awaitAfterEachYield: async () => {
const bundleDir = join(storageRoot, "bundles", parentHash);
const text = await readFile(join(bundleDir, "threads.json"), "utf8");
const parsed = JSON.parse(text) as Record<string, { head: string }>;
const head = parsed.BIDIR_T_001?.head ?? null;
if (head !== null) observedHeads.push(head);
},
});
await executeThread(
parentWf,
"bidir-wf",
{ prompt: "bidir test", steps: [] },
opts,
{
threadId: "BIDIR_T_001",
hash: parentHash,
infoJsonlPath: join(storageRoot, "logs", parentHash, "BD1.info.jsonl"),
cas,
},
noLogger(),
);
expect(observedHeads.length).toBe(2);
const preparerStateHash = observedHeads[0] ?? "";
// Execute child with parentState pointing to parent's preparer state
// biome-ignore lint/correctness/useYield: testing start-only path
const childWf: WorkflowFn = async function* (
_t: ThreadContext,
_r: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
return { returnCode: 0, summary: "child ok" };
};
const childResult = await executeThread(
childWf,
"bidir-child",
{ prompt: "child bidir", steps: [] },
makeOptions({ storageRoot, depth: 1, parentStateHash: preparerStateHash }),
{
threadId: "BIDIR_C_001",
hash: "BIDIR_CHILD001",
infoJsonlPath: join(storageRoot, "logs", "BIDIR_CHILD001", "BC1.info.jsonl"),
cas,
},
noLogger(),
);
// Upward traversal: child start → parentState → preparer state → meta.repoPath
const childStart = await loadStartNode(cas, childResult.rootHash);
expect(childStart.payload.parentState).toBe(preparerStateHash);
const parentPrep = await loadStateNode(cas, preparerStateHash);
expect(parentPrep.payload.meta.repoPath).toBe("/test");
});
});
@@ -94,6 +94,7 @@ async function appendStateForStep(params: {
meta: Record<string, unknown>;
refs: readonly string[];
timestamp: number;
childThread: string | null;
}): Promise<{ stateHash: string; chain: ChainState }> {
const text = await getContentMerklePayload(params.cas, params.contentHash);
if (text === null) {
@@ -112,6 +113,7 @@ async function appendStateForStep(params: {
ancestors,
compact: null,
timestamp: params.timestamp,
childThread: params.childThread,
};
const stateHash = await putStateNode(params.cas, payload);
return {
@@ -137,6 +139,7 @@ async function appendEndState(params: {
ancestors,
compact: null,
timestamp: params.timestamp,
childThread: null,
};
return putStateNode(params.cas, payload);
}
@@ -329,6 +332,7 @@ async function driveWorkflowGenerator(params: {
meta: step.meta,
refs: step.refs,
timestamp: ts,
childThread: step.childThread ?? null,
});
chain = written_.chain;
await publishHead({ bundleDir, threadId, startHash, headHash: written_.stateHash });
@@ -439,6 +443,7 @@ export async function executeThread(
name: workflowName,
hash: io.hash,
depth: options.depth,
parentState: options.parentStateHash,
},
promptHash,
);
@@ -466,6 +471,7 @@ export async function executeThread(
meta: row.meta,
refs: row.refs,
timestamp: row.timestamp,
childThread: null,
});
chain = written.chain;
await publishHead({
@@ -487,6 +493,7 @@ export async function executeThread(
const thread: ThreadContext = {
threadId: io.threadId,
depth: options.depth,
bundleHash: io.hash,
start: {
role: START,
content: input.prompt,
@@ -144,6 +144,7 @@ async function payloadToRoleOutput(cas: CasStore, payload: StateNodePayload): Pr
contentHash: payload.content,
meta: payload.meta,
refs,
childThread: payload.childThread,
};
}
@@ -240,6 +241,7 @@ async function buildForkContinuation(params: {
ancestors: ancestorsMarker,
compact: null,
timestamp: Date.now(),
childThread: null,
};
const markerHash = await putStateNode(cas, markerPayload);
@@ -41,6 +41,8 @@ export type PrefilledDiskStep = {
export type ExecuteThreadOptions = {
/** Passed to the bundle thread context as `ThreadContext.depth`. */
depth: number;
/** Parent thread's head state hash at spawn time; `null` for top-level threads. */
parentStateHash: string | null;
signal: AbortSignal;
/** Invoked after each successful yield (and outer-loop checks); used for pause/resume. */
awaitAfterEachYield: () => Promise<void>;
@@ -72,11 +72,13 @@ function parseRoleOutputRecord(obj: Record<string, unknown>): RoleOutput | null
if (meta === null || typeof meta !== "object") {
return null;
}
const childThread = obj.childThread;
return {
role,
contentHash,
meta: meta as Record<string, unknown>,
refs: normalizeRefsField(obj.refs),
childThread: typeof childThread === "string" ? childThread : null,
};
}
@@ -497,6 +499,7 @@ async function main(): Promise<void> {
{ prompt: cmd.prompt, steps: cmd.steps },
{
...cmd.options,
parentStateHash: null,
signal: ac.signal,
awaitAfterEachYield: () => pauseGate.awaitAfterYield(),
forkSourceThreadId: cmd.forkSourceThreadId,
@@ -6,7 +6,7 @@ import {
getRegisteredWorkflow,
readWorkflowRegistry,
} from "@uncaged/workflow-register";
import type { AgentContext, AgentFn } from "@uncaged/workflow-runtime";
import type { AgentContext, AgentFn, AgentFnResult } from "@uncaged/workflow-runtime";
import {
createLogger,
generateUlid,
@@ -14,7 +14,7 @@ import {
getGlobalCasDir,
} from "@uncaged/workflow-util";
import type { ExecuteThreadIo } from "./engine/index.js";
import { executeThread } from "./engine/index.js";
import { executeThread, getBundleDir, readThreadsIndex } from "./engine/index.js";
const DEFAULT_WORKFLOW_AS_AGENT_MAX_DEPTH = 3;
@@ -37,6 +37,13 @@ function resolveWorkflowAsAgentStorageRoot(options: WorkflowAsAgentOptions | nul
return getDefaultWorkflowStorageRoot();
}
async function readParentHeadState(storageRoot: string, ctx: AgentContext): Promise<string | null> {
const bundleDir = getBundleDir(storageRoot, ctx.bundleHash);
const index = await readThreadsIndex(bundleDir);
const entry = index[ctx.threadId] ?? null;
return entry !== null ? entry.head : null;
}
/**
* Returns an {@link AgentFn} that runs another registered workflow in a new thread,
* using the parent thread's initial prompt (`ctx.start.content`) as the child prompt.
@@ -45,7 +52,7 @@ export function workflowAsAgent(
workflowName: string,
options: WorkflowAsAgentOptions | null = null,
): AgentFn {
return async (ctx: AgentContext): Promise<string> => {
return async (ctx: AgentContext): Promise<AgentFnResult> => {
const nextDepth = ctx.depth + 1;
const storageRoot = resolveWorkflowAsAgentStorageRoot(options);
@@ -89,6 +96,8 @@ export function workflowAsAgent(
const logger = createLogger({ sink: { kind: "file", path: infoJsonlPath } });
const signalNever = new AbortController();
const parentHeadState = await readParentHeadState(storageRoot, ctx);
try {
const result = await executeThread(
bundleExportsResult.value.run,
@@ -96,6 +105,7 @@ export function workflowAsAgent(
input,
{
depth: nextDepth,
parentStateHash: parentHeadState,
signal: signalNever.signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: ctx.threadId,
@@ -107,7 +117,8 @@ export function workflowAsAgent(
io,
logger,
);
return `Child workflow "${workflowName}" completed (returnCode=${result.returnCode}).\n\nSummary: ${result.summary}\n\nChild thread root hash: ${result.rootHash}`;
const summary = `Child workflow "${workflowName}" completed (returnCode=${result.returnCode}).\n\nSummary: ${result.summary}\n\nChild thread root hash: ${result.rootHash}`;
return { output: summary, childThread: result.rootHash };
} catch (e) {
const message = e instanceof Error ? e.message : String(e);
return `ERROR: ${message}`;
@@ -21,6 +21,7 @@ function makeCtx(roles: (keyof TestMeta & string)[]): ModeratorContext<TestMeta>
return {
threadId: "test-thread",
depth: 0,
bundleHash: "TESTHASH00001",
start: {
role: START,
content: "test",
@@ -4,6 +4,8 @@ export type StartNodePayload = {
name: string;
hash: string;
depth: number;
/** Parent thread's head state hash at spawn time. `null` for top-level workflows. */
parentState: string | null;
};
export type StartNode = {
@@ -20,6 +22,8 @@ export type StateNodePayload = {
ancestors: string[];
compact: string | null;
timestamp: number;
/** Child thread's final state hash (workflow-as-agent). `null` when no child spawned. */
childThread: string | null;
};
export type StateNode = {
+1
View File
@@ -13,6 +13,7 @@ export type {
AgentBinding,
AgentContext,
AgentFn,
AgentFnResult,
CasStore,
ExtractFn,
ExtractResult,
+5 -1
View File
@@ -41,6 +41,7 @@ export type RoleOutput = {
contentHash: string;
meta: Record<string, unknown>;
refs: string[];
childThread: string | null;
};
export type StartStep = {
@@ -63,6 +64,7 @@ export type RoleStep<M extends RoleMeta> = {
export type ThreadContext<M extends RoleMeta = RoleMeta> = {
threadId: string;
depth: number;
bundleHash: string;
start: StartStep;
steps: RoleStep<M>[];
};
@@ -127,7 +129,9 @@ export type ExtractFn = <T extends Record<string, unknown>>(
contentHash: string,
) => Promise<ExtractResult<T>>;
export type AgentFn = (ctx: AgentContext) => Promise<string>;
export type AgentFnResult = string | { output: string; childThread: string | null };
export type AgentFn = (ctx: AgentContext) => Promise<AgentFnResult>;
export type AgentBinding = {
agent: AgentFn;
@@ -27,7 +27,7 @@ describe("buildThreadContext", () => {
const bundleHash = "BHAAAAAAAAAAA";
const startHash = await putStartNode(
cas,
{ name: "demo", hash: bundleHash, depth: 2 },
{ name: "demo", hash: bundleHash, depth: 2, parentState: null },
promptHash,
);
@@ -41,6 +41,7 @@ describe("buildThreadContext", () => {
ancestors: [],
compact: null,
timestamp: 1000,
childThread: null,
});
const chCode = await putContentNodeWithRefs(cas, "code body", []);
@@ -52,6 +53,7 @@ describe("buildThreadContext", () => {
ancestors: [statePlan],
compact: null,
timestamp: 2000,
childThread: null,
});
const ctx = await buildThreadContext(stateCode, cas);
@@ -71,7 +73,7 @@ describe("buildThreadContext", () => {
const promptHash = await cas.put("only-prompt");
const startHash = await putStartNode(
cas,
{ name: "solo", hash: "BHBBBBBBBBBBB", depth: 1 },
{ name: "solo", hash: "BHBBBBBBBBBBB", depth: 1, parentState: null },
promptHash,
);
@@ -87,7 +89,7 @@ describe("buildThreadContext", () => {
const bundleHash = "BHCCCCCCCCCCC";
const startHash = await putStartNode(
cas,
{ name: "demo", hash: bundleHash, depth: 0 },
{ name: "demo", hash: bundleHash, depth: 0, parentState: null },
promptHash,
);
@@ -100,6 +102,7 @@ describe("buildThreadContext", () => {
ancestors: [],
compact: null,
timestamp: 500,
childThread: null,
});
const endContent = await putContentNodeWithRefs(cas, "finished", []);
@@ -111,6 +114,7 @@ describe("buildThreadContext", () => {
ancestors: [state1],
compact: null,
timestamp: 600,
childThread: null,
});
const ctx = await buildThreadContext(endState, cas);
@@ -54,6 +54,7 @@ async function threadFromStartHead<M extends RoleMeta>(
return {
threadId: "",
depth: p.depth,
bundleHash: p.hash,
start: {
role: START,
content: prompt,
@@ -113,6 +114,7 @@ async function threadFromStateHead<M extends RoleMeta>(
return {
threadId: "",
depth: sp.depth,
bundleHash: sp.hash,
start: {
role: START,
content: prompt,
@@ -6,6 +6,7 @@ import {
type AgentBinding,
type AgentContext,
type AgentFn,
type AgentFnResult,
END,
type ModeratorContext,
type RoleDefinition,
@@ -49,6 +50,16 @@ function mergeUniqueHashes(a: readonly string[], b: readonly string[]): string[]
return out;
}
function normalizeAgentResult(result: AgentFnResult): {
output: string;
childThread: string | null;
} {
if (typeof result === "string") {
return { output: result, childThread: null };
}
return result;
}
function agentForRole(binding: AgentBinding, roleName: string): AgentFn {
const overrides = binding.overrides;
const overrideFn: AgentFn | undefined =
@@ -86,9 +97,9 @@ async function advanceOneRound<M extends RoleMeta>(
};
const agent = agentForRole(binding, next);
const raw = await agent(agentCtx as unknown as AgentContext);
const agentResult = normalizeAgentResult(await agent(agentCtx as unknown as AgentContext));
const agentContentHash = await putContentNodeWithRefs(runtime.cas, raw, []);
const agentContentHash = await putContentNodeWithRefs(runtime.cas, agentResult.output, []);
const extracted = await runtime.extract(
roleDef.schema as z.ZodType<Record<string, unknown>>,
@@ -122,6 +133,7 @@ async function advanceOneRound<M extends RoleMeta>(
contentHash: step.contentHash,
meta: step.meta,
refs: step.refs,
childThread: agentResult.childThread,
},
step,
};
+1
View File
@@ -5,6 +5,7 @@ export type {
AgentBinding,
AgentContext,
AgentFn,
AgentFnResult,
CasStore,
ExtractFn,
ExtractResult,
+1
View File
@@ -7,6 +7,7 @@ export type {
AgentBinding,
AgentContext,
AgentFn,
AgentFnResult,
CasStore,
ExtractFn,
ExtractResult,
@@ -26,6 +26,7 @@ function makeCtx(steps: ModeratorContext<DevelopMeta>["steps"]): ModeratorContex
return {
threadId: "01TEST000000000000000000TR",
depth: 0,
bundleHash: "TESTHASH00001",
start: makeStart(),
steps,
};
@@ -113,6 +113,7 @@ function makeCtx(
return {
threadId: "01TEST000000000000000000TR",
depth: 0,
bundleHash: "TESTHASH00001",
start: makeStart(),
steps,
};
@@ -178,6 +179,7 @@ function makeThread(prompt: string) {
return {
threadId: "01TEST000000000000000000TR",
depth: 0,
bundleHash: "TESTHASH00001",
start: {
role: START,
content: prompt,
@@ -17,6 +17,7 @@ describe("buildAgentPrompt", () => {
const ctx: AgentContext = {
start: startTask("fix the bug"),
depth: 0,
bundleHash: "TESTHASH00001",
steps: [],
threadId: "01TEST000000000000000000TR",
currentRole: { name: START, systemPrompt: "You are an agent." },
@@ -33,6 +34,7 @@ describe("buildAgentPrompt", () => {
const ctx: AgentContext = {
start: startTask("user task"),
depth: 0,
bundleHash: "TESTHASH00001",
threadId: "01TEST000000000000000000TR",
currentRole: { name: "coder", systemPrompt: "Be helpful." },
steps: [
@@ -61,6 +63,7 @@ describe("buildAgentPrompt", () => {
const ctx: AgentContext = {
start: startTask("first message full: task content here"),
depth: 0,
bundleHash: "TESTHASH00001",
threadId: "01TEST000000000000000000TR",
currentRole: { name: "coder", systemPrompt: "System." },
steps: [
@@ -99,6 +102,7 @@ describe("buildAgentPrompt", () => {
const ctx: AgentContext = {
start: startTask("start"),
depth: 0,
bundleHash: "TESTHASH00001",
threadId: "01TEST000000000000000000TR",
currentRole: { name: "c", systemPrompt: "S" },
steps: [