feat: Phase 3 — engine read path + runtime context builder

- Add buildThreadContext(headHash, cas) to workflow-runtime
- Expand extract phase to return { meta, contentPayload, refs[] }
- Add parseCasThreadNode() to workflow-cas for node type parsing
- Update createWorkflow to write ContentMerkleNode with artifact refs
- Tests: 4 pass (build-context + extract-refs)
- Biome format pass on all files

Refs #155, closes #158

小橘 <xiaoju@shazhou.work>
This commit is contained in:
2026-05-09 08:00:24 +00:00
parent 81c582ae0e
commit 26cf51366f
51 changed files with 701 additions and 234 deletions
@@ -0,0 +1,121 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import {
createCasStore,
putContentNodeWithRefs,
putStartNode,
putStateNode,
} from "@uncaged/workflow-cas";
import { buildThreadContext, END, START } from "../src/index.js";
describe("buildThreadContext", () => {
let dir: string;
beforeEach(async () => {
dir = await mkdtemp(join(tmpdir(), "wf-build-ctx-"));
});
afterEach(async () => {
await rm(dir, { recursive: true, force: true });
});
test("walks ancestor chain, resolves prompt, orders steps chronologically", async () => {
const cas = createCasStore(join(dir, "cas"));
const promptHash = await cas.put("hello-task");
const bundleHash = "BHAAAAAAAAAAA";
const startHash = await putStartNode(
cas,
{ name: "demo", hash: bundleHash, maxRounds: 99, depth: 2 },
promptHash,
);
const art = await cas.put("artifact-a");
const chPlan = await putContentNodeWithRefs(cas, "plan body", [art]);
const statePlan = await putStateNode(cas, {
role: "planner",
meta: { phase: 1 },
start: startHash,
content: chPlan,
ancestors: [],
compact: null,
timestamp: 1000,
});
const chCode = await putContentNodeWithRefs(cas, "code body", []);
const stateCode = await putStateNode(cas, {
role: "coder",
meta: { phase: 2 },
start: startHash,
content: chCode,
ancestors: [statePlan],
compact: null,
timestamp: 2000,
});
const ctx = await buildThreadContext(stateCode, cas);
expect(ctx.threadId).toBe("");
expect(ctx.depth).toBe(2);
expect(ctx.start.role).toBe(START);
expect(ctx.start.content).toBe("hello-task");
expect(ctx.start.meta.maxRounds).toBe(99);
expect(ctx.steps.map((s) => s.role)).toEqual(["planner", "coder"]);
expect(ctx.steps[0]?.refs).toEqual([art]);
expect(ctx.steps[1]?.refs).toEqual([]);
expect(ctx.steps[0]?.timestamp).toBe(1000);
expect(ctx.steps[1]?.timestamp).toBe(2000);
});
test("StartNode head yields empty steps", async () => {
const cas = createCasStore(join(dir, "cas"));
const promptHash = await cas.put("only-prompt");
const startHash = await putStartNode(
cas,
{ name: "solo", hash: "BHBBBBBBBBBBB", maxRounds: 3, depth: 1 },
promptHash,
);
const ctx = await buildThreadContext(startHash, cas);
expect(ctx.steps).toEqual([]);
expect(ctx.start.content).toBe("only-prompt");
expect(ctx.depth).toBe(1);
expect(ctx.start.meta.maxRounds).toBe(3);
});
test("omits __end__ states from steps", async () => {
const cas = createCasStore(join(dir, "cas"));
const promptHash = await cas.put("task");
const bundleHash = "BHCCCCCCCCCCC";
const startHash = await putStartNode(
cas,
{ name: "demo", hash: bundleHash, maxRounds: 10, depth: 0 },
promptHash,
);
const ch1 = await putContentNodeWithRefs(cas, "step-one", []);
const state1 = await putStateNode(cas, {
role: "worker",
meta: { done: false },
start: startHash,
content: ch1,
ancestors: [],
compact: null,
timestamp: 500,
});
const endContent = await putContentNodeWithRefs(cas, "finished", []);
const endState = await putStateNode(cas, {
role: END,
meta: { returnCode: 0, summary: "finished" },
start: startHash,
content: endContent,
ancestors: [state1],
compact: null,
timestamp: 600,
});
const ctx = await buildThreadContext(endState, cas);
expect(ctx.steps.map((s) => s.role)).toEqual(["worker"]);
});
});
+1
View File
@@ -8,6 +8,7 @@
"test": "bun test"
},
"dependencies": {
"@uncaged/workflow-cas": "workspace:*",
"@uncaged/workflow-protocol": "workspace:*"
},
"peerDependencies": {
@@ -0,0 +1,153 @@
import { getContentMerklePayload, parseCasThreadNode } from "@uncaged/workflow-cas";
import type {
CasStore,
RoleMeta,
RoleStep,
StartNode,
StateNode,
ThreadContext,
} from "@uncaged/workflow-protocol";
import { END, START } from "@uncaged/workflow-protocol";
async function loadParsedNode(cas: CasStore, hash: string) {
const yamlText = await cas.get(hash);
if (yamlText === null) {
return null;
}
return parseCasThreadNode(yamlText);
}
async function resolvePromptText(cas: CasStore, promptHash: string): Promise<string> {
const text = await getContentMerklePayload(cas, promptHash);
if (text !== null) {
return text;
}
throw new Error(`buildThreadContext: could not resolve prompt text at ${promptHash}`);
}
async function collectStateChainFromHead(cas: CasStore, headHash: string): Promise<StateNode[]> {
const reversed: StateNode[] = [];
let hash: string | null = headHash;
while (hash !== null) {
const parsed = await loadParsedNode(cas, hash);
if (parsed === null || parsed.kind !== "state") {
throw new Error(`buildThreadContext: expected state node at ${hash}`);
}
reversed.push(parsed.node);
const anc = parsed.node.payload.ancestors;
hash = anc.length > 0 ? anc[0] : null;
}
reversed.reverse();
return reversed;
}
async function threadFromStartHead<M extends RoleMeta>(
node: StartNode,
cas: CasStore,
): Promise<ThreadContext<M>> {
const promptHash = node.refs[0];
if (promptHash === undefined) {
throw new Error("buildThreadContext: StartNode missing refs[0] prompt");
}
const prompt = await resolvePromptText(cas, promptHash);
const p = node.payload;
return {
threadId: "",
depth: p.depth,
start: {
role: START,
content: prompt,
meta: { maxRounds: p.maxRounds },
timestamp: 0,
},
steps: [],
};
}
async function buildRoleStepsFromStates<M extends RoleMeta>(
chronologicalStates: StateNode[],
cas: CasStore,
): Promise<RoleStep<M>[]> {
const steps: RoleStep<M>[] = [];
for (const st of chronologicalStates) {
if (st.payload.role === END) {
continue;
}
const contentParsed = await loadParsedNode(cas, st.payload.content);
if (contentParsed === null || contentParsed.kind !== "content") {
throw new Error(`buildThreadContext: expected content node at ${st.payload.content}`);
}
steps.push({
role: st.payload.role,
meta: st.payload.meta,
contentHash: st.payload.content,
refs: [...contentParsed.node.refs],
timestamp: st.payload.timestamp,
} as RoleStep<M>);
}
return steps;
}
async function threadFromStateHead<M extends RoleMeta>(
headHash: string,
cas: CasStore,
): Promise<ThreadContext<M>> {
const chronologicalStates = await collectStateChainFromHead(cas, headHash);
const firstState = chronologicalStates[0];
if (firstState === undefined) {
throw new Error("buildThreadContext: empty state chain");
}
const startBlob = await loadParsedNode(cas, firstState.payload.start);
if (startBlob === null || startBlob.kind !== "start") {
throw new Error(`buildThreadContext: StartNode missing at ${firstState.payload.start}`);
}
const promptHash = startBlob.node.refs[0];
if (promptHash === undefined) {
throw new Error("buildThreadContext: StartNode missing refs[0] prompt");
}
const prompt = await resolvePromptText(cas, promptHash);
const sp = startBlob.node.payload;
const steps = await buildRoleStepsFromStates<M>(chronologicalStates, cas);
const firstTs = steps[0]?.timestamp ?? 0;
return {
threadId: "",
depth: sp.depth,
start: {
role: START,
content: prompt,
meta: { maxRounds: sp.maxRounds },
timestamp: firstTs,
},
steps,
};
}
/**
* Reconstructs {@link ThreadContext} by walking the CAS state chain from {@link headHash}.
*
* Walks each {@link StateNode} via `payload.ancestors[0]` until the ancestor list is empty,
* resolves the prompt from the shared {@link StartNode} (`refs[0]` → prompt blob), and builds
* steps from non-`__end__` states in chronological order.
*
* `threadId` is set to `""` — callers that load from `threads.json` should overwrite it.
*/
export async function buildThreadContext<M extends RoleMeta = RoleMeta>(
headHash: string,
cas: CasStore,
): Promise<ThreadContext<M>> {
const headParsed = await loadParsedNode(cas, headHash);
if (headParsed === null) {
throw new Error(`buildThreadContext: missing or invalid CAS blob ${headHash}`);
}
if (headParsed.kind === "start") {
return threadFromStartHead(headParsed.node, cas);
}
if (headParsed.kind !== "state") {
throw new Error(`buildThreadContext: head ${headHash} must be start or state node`);
}
return threadFromStateHead(headHash, cas);
}
@@ -1,3 +1,4 @@
import { putContentNodeWithRefs } from "@uncaged/workflow-cas";
import type * as z from "zod/v4";
import {
@@ -5,7 +6,6 @@ import {
type AgentBinding,
type AgentContext,
type AgentFn,
type CasStore,
END,
type ExtractContext,
type ModeratorContext,
@@ -38,8 +38,16 @@ function resolveExtractedRefs(
return extractRefsFn(meta as Record<string, unknown>);
}
async function putContentBlob(store: CasStore, raw: string): Promise<string> {
return store.put(raw);
function mergeUniqueHashes(a: readonly string[], b: readonly string[]): string[] {
const seen = new Set<string>();
const out: string[] = [];
for (const h of [...a, ...b]) {
if (!seen.has(h)) {
seen.add(h);
out.push(h);
}
}
return out;
}
function agentForRole(binding: AgentBinding, roleName: string): AgentFn {
@@ -86,23 +94,29 @@ async function advanceOneRound<M extends RoleMeta>(
agentContent: raw,
};
const meta = await runtime.extract(
const extracted = await runtime.extract(
roleDef.schema as z.ZodType<Record<string, unknown>>,
roleDef.extractPrompt,
extractCtx as unknown as ExtractContext,
);
const contentHash = await putContentBlob(runtime.cas, raw);
const refsFromMeta = resolveExtractedRefs(
roleDef as unknown as RoleDefinition<Record<string, unknown>>,
meta,
extracted.meta,
);
const refs = refsFromMeta.includes(contentHash) ? refsFromMeta : [...refsFromMeta, contentHash];
const artifactRefs = mergeUniqueHashes(extracted.refs, refsFromMeta);
const contentHash = await putContentNodeWithRefs(
runtime.cas,
extracted.contentPayload,
artifactRefs,
);
const refs = artifactRefs.includes(contentHash) ? artifactRefs : [...artifactRefs, contentHash];
const step = {
role: next,
contentHash,
meta,
meta: extracted.meta,
refs,
timestamp: Date.now(),
} as RoleStep<M>;
+26 -24
View File
@@ -1,29 +1,31 @@
export { buildThreadContext } from "./build-context.js";
export { createWorkflow } from "./create-workflow.js";
export { err, ok } from "./result.js";
export type {
AgentBinding,
AgentContext,
AgentFn,
CasStore,
ExtractContext,
ExtractFn,
LlmProvider,
Moderator,
ModeratorContext,
Result,
RoleDefinition,
RoleMeta,
RoleOutput,
RoleStep,
StartStep,
ThreadContext,
WorkflowCompletion,
WorkflowDefinition,
WorkflowDescriptor,
WorkflowFn,
WorkflowResult,
WorkflowRoleDescriptor,
WorkflowRoleSchema,
WorkflowRuntime,
AgentBinding,
AgentContext,
AgentFn,
CasStore,
ExtractContext,
ExtractFn,
ExtractResult,
LlmProvider,
Moderator,
ModeratorContext,
Result,
RoleDefinition,
RoleMeta,
RoleOutput,
RoleStep,
StartStep,
ThreadContext,
WorkflowCompletion,
WorkflowDefinition,
WorkflowDescriptor,
WorkflowFn,
WorkflowResult,
WorkflowRoleDescriptor,
WorkflowRoleSchema,
WorkflowRuntime,
} from "./types.js";
export { END, START } from "./types.js";
+26 -25
View File
@@ -3,31 +3,32 @@
// imports from "@uncaged/workflow-runtime" continues to work.
export type {
AgentBinding,
AgentContext,
AgentFn,
AdvanceOutcome,
CasStore,
ExtractContext,
ExtractFn,
LlmProvider,
Moderator,
ModeratorContext,
Result,
RoleDefinition,
RoleMeta,
RoleOutput,
RoleStep,
StartStep,
ThreadContext,
WorkflowCompletion,
WorkflowDefinition,
WorkflowDescriptor,
WorkflowFn,
WorkflowResult,
WorkflowRoleDescriptor,
WorkflowRoleSchema,
WorkflowRuntime,
AdvanceOutcome,
AgentBinding,
AgentContext,
AgentFn,
CasStore,
ExtractContext,
ExtractFn,
ExtractResult,
LlmProvider,
Moderator,
ModeratorContext,
Result,
RoleDefinition,
RoleMeta,
RoleOutput,
RoleStep,
StartStep,
ThreadContext,
WorkflowCompletion,
WorkflowDefinition,
WorkflowDescriptor,
WorkflowFn,
WorkflowResult,
WorkflowRoleDescriptor,
WorkflowRoleSchema,
WorkflowRuntime,
} from "@uncaged/workflow-protocol";
export { END, START } from "@uncaged/workflow-protocol";
+1 -3
View File
@@ -18,7 +18,5 @@
"types": ["bun-types"]
},
"include": ["src/**/*.ts"],
"references": [
{ "path": "../workflow-protocol" }
]
"references": [{ "path": "../workflow-cas" }, { "path": "../workflow-protocol" }]
}