feat: thread root node + workflowAsAgent returns root hash #51

Merged
xiaoju merged 1 commits from feat/42-thread-root-node into main 2026-05-07 13:18:13 +00:00
10 changed files with 305 additions and 26 deletions
+110
View File
@@ -12,6 +12,7 @@ import { createLogger } from "../src/logger.js";
import {
createContentMerkleNode,
getContentMerklePayload,
parseMerkleNode,
serializeMerkleNode,
} from "../src/merkle.js";
import { END } from "../src/types.js";
@@ -168,6 +169,19 @@ describe("executeThread", () => {
);
expect(result.returnCode).toBe(0);
expect(typeof result.rootHash).toBe("string");
expect(result.rootHash.length).toBeGreaterThan(0);
const rootYaml = await cas.get(result.rootHash);
expect(rootYaml).not.toBeNull();
const rootNode = parseMerkleNode(rootYaml ?? "");
expect(rootNode.type).toBe("thread");
const rootPayload = rootNode.payload as Record<string, unknown>;
expect(rootPayload.workflow).toBe("demo-flow");
expect(rootPayload.threadId).toBe(threadId);
const rootResult = rootPayload.result as Record<string, unknown>;
expect(rootResult.returnCode).toBe(0);
expect(rootNode.children.length).toBe(2);
const dataText = await readFile(dataPath, "utf8");
const lines = dataText
@@ -201,6 +215,20 @@ describe("executeThread", () => {
expect(role2.role).toBe("coder");
expect(role2.refs).toEqual([role2.contentHash]);
const step1Yaml = await cas.get(rootNode.children[0] ?? "");
const step2Yaml = await cas.get(rootNode.children[1] ?? "");
expect(step1Yaml).not.toBeNull();
expect(step2Yaml).not.toBeNull();
const step1Node = parseMerkleNode(step1Yaml ?? "");
const step2Node = parseMerkleNode(step2Yaml ?? "");
expect(step1Node.type).toBe("step");
expect(step2Node.type).toBe("step");
expect(step1Node.children).toEqual([String(role1.contentHash)]);
expect(step2Node.children).toEqual([String(role2.contentHash)]);
const step1Payload = step1Node.payload as Record<string, unknown>;
expect(step1Payload.role).toBe("planner");
expect(step1Payload.meta).toEqual({ plan: "do-it", files: ["a.ts"] });
const infoText = await readFile(infoPath, "utf8");
const infoLines = infoText
.trim()
@@ -270,6 +298,11 @@ describe("executeThread", () => {
);
expect(result.returnCode).toBe(0);
expect(typeof result.rootHash).toBe("string");
const rootYaml = await cas.get(result.rootHash);
const rootNode = parseMerkleNode(rootYaml ?? "");
expect(rootNode.children.length).toBe(2);
const dataText = await readFile(dataPath, "utf8");
const lines = dataText
@@ -324,6 +357,12 @@ describe("executeThread", () => {
);
expect(result.returnCode).toBe(0);
expect(typeof result.rootHash).toBe("string");
const rootYaml = await cas.get(result.rootHash);
const rootNode = parseMerkleNode(rootYaml ?? "");
expect(rootNode.type).toBe("thread");
expect(rootNode.children.length).toBe(0);
const dataText = await readFile(dataPath, "utf8");
const lines = dataText
@@ -335,4 +374,75 @@ describe("executeThread", () => {
await rm(root, { recursive: true, force: true });
}
});
test("Merkle DAG: root → step nodes → content for full thread traversal", async () => {
restoreFetch = installMockChatCompletions([
{ plan: "do-it", files: ["a.ts"] },
{ diff: "+ok" },
]);
const root = await mkdtemp(join(tmpdir(), "wf-engine-dag-"));
try {
const threadId = "01KQXKW18CT8G75T53R8F4G7YG";
const hash = "C9NMV6V2TQT81";
const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`);
const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`);
await mkdir(join(root, "logs", hash), { recursive: true });
const cas = createCasStore(join(root, "cas"));
const logger = createLogger({ sink: { kind: "file", path: infoPath } });
const ac = new AbortController();
const result = await executeThread(
demoWorkflow,
"demo-flow",
{ prompt: "DAG test", steps: [] },
{
maxRounds: 5,
depth: 0,
signal: ac.signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: null,
prefilledDiskSteps: null,
},
{ threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath, cas },
logger,
);
const dataText = await readFile(dataPath, "utf8");
const lines = dataText
.trim()
.split("\n")
.filter((l) => l !== "");
expect(lines.length).toBe(3);
const rolePlanner = JSON.parse(lines[1] ?? "{}") as Record<string, unknown>;
const roleCoder = JSON.parse(lines[2] ?? "{}") as Record<string, unknown>;
const threadYaml = await cas.get(result.rootHash);
expect(threadYaml).not.toBeNull();
const threadNode = parseMerkleNode(threadYaml ?? "");
expect(threadNode.type).toBe("thread");
const bodies: string[] = [];
for (const stepHash of threadNode.children) {
const stepYaml = await cas.get(stepHash);
expect(stepYaml).not.toBeNull();
const stepNode = parseMerkleNode(stepYaml ?? "");
expect(stepNode.type).toBe("step");
expect(stepNode.children.length).toBe(1);
const contentHash = stepNode.children[0];
expect(contentHash).toBeDefined();
const body = await getContentMerklePayload(cas, contentHash ?? "");
expect(body).not.toBeNull();
bodies.push(body ?? "");
}
expect(bodies.sort()).toEqual(["code-body", "plan-body"].sort());
expect(rolePlanner.role).toBe("planner");
expect(roleCoder.role).toBe("coder");
} finally {
await rm(root, { recursive: true, force: true });
}
});
});
@@ -162,6 +162,8 @@ describe("RoleStep refs tracking", () => {
);
expect(result.returnCode).toBe(0);
expect(typeof result.rootHash).toBe("string");
expect(result.rootHash.length).toBeGreaterThan(0);
const dataText = await readFile(dataPath, "utf8");
const lines = dataText
@@ -10,7 +10,7 @@ import { executeThread } from "../src/engine.js";
import { createExtract } from "../src/extract-fn.js";
import { hashWorkflowBundleBytes } from "../src/hash.js";
import { createLogger } from "../src/logger.js";
import { getContentMerklePayload } from "../src/merkle.js";
import { getContentMerklePayload, parseMerkleNode } from "../src/merkle.js";
import {
readWorkflowRegistry,
registerWorkflowVersion,
@@ -177,6 +177,7 @@ describe("workflowAsAgent integration", () => {
);
expect(result.returnCode).toBe(0);
expect(typeof result.rootHash).toBe("string");
const parentText = await readFile(dataPath, "utf8");
const parentLines = parentText
@@ -186,9 +187,16 @@ describe("workflowAsAgent integration", () => {
expect(parentLines.length).toBe(2);
const callerLine = JSON.parse(parentLines[1] ?? "{}") as Record<string, unknown>;
expect(callerLine.role).toBe("caller");
expect(await getContentMerklePayload(cas, String(callerLine.contentHash))).toBe(
"child-done:from-parent",
);
const childRootHash = await getContentMerklePayload(cas, String(callerLine.contentHash));
expect(childRootHash).not.toBeNull();
const childThreadYaml = await cas.get(childRootHash ?? "");
expect(childThreadYaml).not.toBeNull();
const childThreadNode = parseMerkleNode(childThreadYaml ?? "");
expect(childThreadNode.type).toBe("thread");
const childPayload = childThreadNode.payload as Record<string, unknown>;
expect(childPayload.workflow).toBe("child-wf");
const childResult = childPayload.result as Record<string, unknown>;
expect(childResult.summary).toBe("child-done:from-parent");
const childDir = join(root, "logs", childHash);
const childFiles = await readdir(childDir);
@@ -5,6 +5,7 @@ import { join } from "node:path";
import { createCasStore } from "../src/cas.js";
import { hashWorkflowBundleBytes } from "../src/hash.js";
import { parseMerkleNode } from "../src/merkle.js";
import {
readWorkflowRegistry,
registerWorkflowVersion,
@@ -89,7 +90,7 @@ describe("workflowAsAgent", () => {
}
});
test("runs registered workflow and returns child summary string", async () => {
test("runs registered workflow and returns child thread root CAS hash", async () => {
const root = await mkdtemp(join(tmpdir(), "wf-waa-ok-"));
try {
await installChildWorkflow(root);
@@ -97,7 +98,16 @@ describe("workflowAsAgent", () => {
const out = await agent(
makeAgentCtx({ storageRoot: root, depth: 0, prompt: "hello-parent", maxRounds: 5 }),
);
expect(out).toBe("child-done:hello-parent");
const cas = createCasStore(join(root, "cas"));
const threadYaml = await cas.get(out);
expect(threadYaml).not.toBeNull();
const node = parseMerkleNode(threadYaml ?? "");
expect(node.type).toBe("thread");
const payload = node.payload as Record<string, unknown>;
expect(payload.workflow).toBe("child-wf");
const resultObj = payload.result as Record<string, unknown>;
expect(resultObj.summary).toBe("child-done:hello-parent");
expect(node.children.length).toBe(1);
} finally {
await rm(root, { recursive: true, force: true });
}
+2 -2
View File
@@ -13,10 +13,10 @@ import {
type RoleStep,
START,
type ThreadInput,
type WorkflowCompletion,
type WorkflowDefinition,
type WorkflowFn,
type WorkflowFnOptions,
type WorkflowResult,
} from "./types.js";
function isRoleNext<M extends RoleMeta>(
@@ -48,7 +48,7 @@ export function createWorkflow<M extends RoleMeta>(
return async function* workflowLoop(
input: ThreadInput,
options: WorkflowFnOptions,
): AsyncGenerator<RoleOutput, WorkflowResult> {
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
const nowMs = Date.now();
const start: ModeratorContext<M>["start"] = {
role: START,
+107 -14
View File
@@ -3,9 +3,15 @@ import { dirname } from "node:path";
import type { CasStore } from "./cas.js";
import type { LogFn } from "./logger.js";
import { getContentMerklePayload } from "./merkle.js";
import { getContentMerklePayload, putStepMerkleNode, putThreadMerkleNode } from "./merkle.js";
import { normalizeRefsField } from "./refs-field.js";
import type { ThreadInput, WorkflowFn, WorkflowFnOptions, WorkflowResult } from "./types.js";
import type {
ThreadInput,
WorkflowCompletion,
WorkflowFn,
WorkflowFnOptions,
WorkflowResult,
} from "./types.js";
export type ExecuteThreadIo = {
threadId: string;
@@ -45,8 +51,35 @@ async function appendDataLine(path: string, record: unknown): Promise<void> {
await appendFile(path, line, "utf8");
}
async function finalizeThreadResult(params: {
cas: CasStore;
workflowName: string;
threadId: string;
stepMerkleHashes: readonly string[];
completion: WorkflowCompletion;
}): Promise<WorkflowResult> {
const rootHash = await putThreadMerkleNode(
params.cas,
{
workflow: params.workflowName,
threadId: params.threadId,
result: {
returnCode: params.completion.returnCode,
summary: params.completion.summary,
},
},
params.stepMerkleHashes,
);
return {
returnCode: params.completion.returnCode,
summary: params.completion.summary,
rootHash,
};
}
async function driveWorkflowGenerator(params: {
fn: WorkflowFn;
workflowName: string;
input: ThreadInput;
bundleOptions: WorkflowFnOptions;
executeOptions: ExecuteThreadOptions;
@@ -54,30 +87,61 @@ async function driveWorkflowGenerator(params: {
threadId: string;
logger: LogFn;
cas: CasStore;
stepMerkleHashes: string[];
}): Promise<WorkflowResult> {
const { fn, input, bundleOptions, executeOptions, dataJsonlPath, threadId, logger, cas } = params;
const {
fn,
workflowName,
input,
bundleOptions,
executeOptions,
dataJsonlPath,
threadId,
logger,
cas,
stepMerkleHashes,
} = params;
const gen = fn(input, bundleOptions);
let written = 0;
while (true) {
if (executeOptions.signal.aborted) {
logger("V8JX4NP2", `thread ${threadId} aborted`);
return { returnCode: 130, summary: "thread aborted" };
return await finalizeThreadResult({
cas,
workflowName,
threadId,
stepMerkleHashes,
completion: { returnCode: 130, summary: "thread aborted" },
});
}
if (written >= executeOptions.maxRounds) {
logger("R3CW7YBQ", `thread ${threadId} stopped at maxRounds=${executeOptions.maxRounds}`);
return {
returnCode: 0,
summary: `completed: reached maxRounds (${executeOptions.maxRounds})`,
};
return await finalizeThreadResult({
cas,
workflowName,
threadId,
stepMerkleHashes,
completion: {
returnCode: 0,
summary: `completed: reached maxRounds (${executeOptions.maxRounds})`,
},
});
}
const iterResult = await gen.next();
if (iterResult.done) {
logger("F3HN8QKP", `thread ${threadId} generator finished`);
return iterResult.value;
const completion = iterResult.value;
return await finalizeThreadResult({
cas,
workflowName,
threadId,
stepMerkleHashes,
completion,
});
}
written++;
@@ -97,6 +161,13 @@ async function driveWorkflowGenerator(params: {
timestamp: ts,
});
const stepNodeHash = await putStepMerkleNode(
cas,
{ role: step.role, meta: step.meta },
step.contentHash,
);
stepMerkleHashes.push(stepNodeHash);
logger("N7BW4YHQ", `thread ${threadId} wrote role ${step.role}`);
await Promise.race([
@@ -112,7 +183,13 @@ async function driveWorkflowGenerator(params: {
if (executeOptions.signal.aborted) {
logger("V8JX4NP4", `thread ${threadId} aborted`);
return { returnCode: 130, summary: "thread aborted" };
return await finalizeThreadResult({
cas,
workflowName,
threadId,
stepMerkleHashes,
completion: { returnCode: 130, summary: "thread aborted" },
});
}
}
}
@@ -161,6 +238,8 @@ export async function executeThread(
logger("T9HQ2KHM", `thread ${io.threadId} started for workflow ${workflowName}`);
const stepMerkleHashes: string[] = [];
if (prefilled !== null) {
for (const row of prefilled) {
const prefilledPayload = await getContentMerklePayload(io.cas, row.contentHash);
@@ -176,15 +255,27 @@ export async function executeThread(
refs: normalizeRefsField(row.refs),
timestamp: row.timestamp,
});
const stepNodeHash = await putStepMerkleNode(
io.cas,
{ role: row.role, meta: row.meta },
row.contentHash,
);
stepMerkleHashes.push(stepNodeHash);
}
}
if (options.maxRounds <= 0) {
logger("R3CW7YBQ", `thread ${io.threadId} stopped at maxRounds=${options.maxRounds}`);
return {
returnCode: 0,
summary: `completed: reached maxRounds (${options.maxRounds})`,
};
return await finalizeThreadResult({
cas: io.cas,
workflowName,
threadId: io.threadId,
stepMerkleHashes,
completion: {
returnCode: 0,
summary: `completed: reached maxRounds (${options.maxRounds})`,
},
});
}
const bundleOptions: WorkflowFnOptions = {
@@ -196,6 +287,7 @@ export async function executeThread(
return await driveWorkflowGenerator({
fn,
workflowName,
input,
bundleOptions,
executeOptions: options,
@@ -203,5 +295,6 @@ export async function executeThread(
threadId: io.threadId,
logger,
cas: io.cas,
stepMerkleHashes,
});
}
+5
View File
@@ -47,7 +47,11 @@ export {
type MerkleNodeType,
parseMerkleNode,
putContentMerkleNode,
putStepMerkleNode,
putThreadMerkleNode,
type StepMerklePayload,
serializeMerkleNode,
type ThreadMerklePayload,
} from "./merkle.js";
export {
getRegisteredWorkflow,
@@ -84,6 +88,7 @@ export {
type StartStep,
type ThreadContext,
type ThreadInput,
type WorkflowCompletion,
type WorkflowDefinition,
type WorkflowFn,
type WorkflowFnOptions,
+46
View File
@@ -53,6 +53,52 @@ export function createContentMerkleNode(payload: string): MerkleNode {
return { type: "content", payload, children: [] };
}
export type StepMerklePayload = {
role: string;
meta: Record<string, unknown>;
};
export type ThreadMerklePayload = {
workflow: string;
threadId: string;
result: {
returnCode: number;
summary: string;
};
};
/** Serializes a step Merkle node (role + meta + content child) and stores it in CAS. */
export async function putStepMerkleNode(
store: CasStore,
payload: StepMerklePayload,
contentHash: string,
): Promise<string> {
const node: MerkleNode = {
type: "step",
payload: { role: payload.role, meta: payload.meta },
children: [contentHash],
};
return store.put(serializeMerkleNode(node));
}
/** Serializes the thread root Merkle node and stores it in CAS. */
export async function putThreadMerkleNode(
store: CasStore,
payload: ThreadMerklePayload,
stepHashes: readonly string[],
): Promise<string> {
const node: MerkleNode = {
type: "thread",
payload: {
workflow: payload.workflow,
threadId: payload.threadId,
result: payload.result,
},
children: [...stepHashes],
};
return store.put(serializeMerkleNode(node));
}
/** Serializes a content Merkle node and stores it in CAS; returns its hash. */
export async function putContentMerkleNode(store: CasStore, content: string): Promise<string> {
const yamlText = serializeMerkleNode(createContentMerkleNode(content));
+8 -3
View File
@@ -26,12 +26,17 @@ export type RoleOutput = {
refs: string[];
};
/** What the workflow AsyncGenerator returns when done. */
export type WorkflowResult = {
/** Generator completion value from a workflow bundle (`run` export). Root hash is added by the engine. */
export type WorkflowCompletion = {
returnCode: number;
summary: string;
};
/** Final thread outcome from {@link executeThread}, including Merkle thread root CAS hash. */
export type WorkflowResult = WorkflowCompletion & {
rootHash: string;
};
/** Input to a workflow — prompt plus optional historical steps for fork/resume. */
export type ThreadInput = {
prompt: string;
@@ -52,7 +57,7 @@ export type WorkflowFnOptions = {
export type WorkflowFn = (
input: ThreadInput,
options: WorkflowFnOptions,
) => AsyncGenerator<RoleOutput, WorkflowResult>;
) => AsyncGenerator<RoleOutput, WorkflowCompletion>;
/** Engine start frame: initial prompt + thread identity. */
export type StartStep = {
+1 -1
View File
@@ -92,7 +92,7 @@ export function workflowAsAgent(
io,
logger,
);
return result.summary;
return result.rootHash;
} catch (e) {
const message = e instanceof Error ? e.message : String(e);
return `ERROR: ${message}`;