Merge pull request 'feat(#194): Phase 2 — Engine layer Merkle call stack' (#202) from feat/194-merkle-call-stack-phase2 into main
This commit is contained in:
@@ -12,6 +12,7 @@ function makeCtx(userContent: string): AgentContext {
|
|||||||
timestamp: 1,
|
timestamp: 1,
|
||||||
},
|
},
|
||||||
depth: 0,
|
depth: 0,
|
||||||
|
bundleHash: "TESTHASH00001",
|
||||||
steps: [],
|
steps: [],
|
||||||
threadId: "01TEST000000000000000000TR",
|
threadId: "01TEST000000000000000000TR",
|
||||||
currentRole: { name: "planner", systemPrompt: "system instructions" },
|
currentRole: { name: "planner", systemPrompt: "system instructions" },
|
||||||
|
|||||||
@@ -118,14 +118,22 @@ export function parseCasThreadNode(yamlText: string): ParsedCasThreadNode | null
|
|||||||
if (!isStartPayload(raw.payload)) {
|
if (!isStartPayload(raw.payload)) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
const node: StartNode = { type: "start", payload: normalizeStartPayload(raw.payload), refs: [...refs] };
|
const node: StartNode = {
|
||||||
|
type: "start",
|
||||||
|
payload: normalizeStartPayload(raw.payload),
|
||||||
|
refs: [...refs],
|
||||||
|
};
|
||||||
return { kind: "start", node };
|
return { kind: "start", node };
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!isStatePayload(raw.payload)) {
|
if (!isStatePayload(raw.payload)) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
const node: StateNode = { type: "state", payload: normalizeStatePayload(raw.payload), refs: [...refs] };
|
const node: StateNode = {
|
||||||
|
type: "state",
|
||||||
|
payload: normalizeStatePayload(raw.payload),
|
||||||
|
refs: [...refs],
|
||||||
|
};
|
||||||
return { kind: "state", node };
|
return { kind: "state", node };
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -35,6 +35,7 @@ function noLogger(): (tag: string, content: string) => void {
|
|||||||
function makeOptions(overrides: Partial<ExecuteThreadOptions>): ExecuteThreadOptions {
|
function makeOptions(overrides: Partial<ExecuteThreadOptions>): ExecuteThreadOptions {
|
||||||
return {
|
return {
|
||||||
depth: 0,
|
depth: 0,
|
||||||
|
parentStateHash: null,
|
||||||
signal: new AbortController().signal,
|
signal: new AbortController().signal,
|
||||||
awaitAfterEachYield: async () => {},
|
awaitAfterEachYield: async () => {},
|
||||||
forkSourceThreadId: null,
|
forkSourceThreadId: null,
|
||||||
@@ -144,9 +145,9 @@ describe("executeThread (Phase 2 — CAS thread storage)", () => {
|
|||||||
runtime: WorkflowRuntime,
|
runtime: WorkflowRuntime,
|
||||||
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
|
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
|
||||||
const h1 = await runtime.cas.put("plan-text");
|
const h1 = await runtime.cas.put("plan-text");
|
||||||
yield { role: "planner", contentHash: h1, meta: { plan: 1 }, refs: [h1] };
|
yield { role: "planner", contentHash: h1, meta: { plan: 1 }, refs: [h1], childThread: null };
|
||||||
const h2 = await runtime.cas.put("code-text");
|
const h2 = await runtime.cas.put("code-text");
|
||||||
yield { role: "coder", contentHash: h2, meta: { diff: "y" }, refs: [h2] };
|
yield { role: "coder", contentHash: h2, meta: { diff: "y" }, refs: [h2], childThread: null };
|
||||||
return { returnCode: 0, summary: "done" };
|
return { returnCode: 0, summary: "done" };
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -210,7 +211,7 @@ describe("executeThread (Phase 2 — CAS thread storage)", () => {
|
|||||||
runtime: WorkflowRuntime,
|
runtime: WorkflowRuntime,
|
||||||
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
|
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
|
||||||
const h = await runtime.cas.put("only-step");
|
const h = await runtime.cas.put("only-step");
|
||||||
yield { role: "only", contentHash: h, meta: {}, refs: [h] };
|
yield { role: "only", contentHash: h, meta: {}, refs: [h], childThread: null };
|
||||||
return { returnCode: 0, summary: "completed" };
|
return { returnCode: 0, summary: "completed" };
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -261,7 +262,7 @@ describe("executeThread (Phase 2 — CAS thread storage)", () => {
|
|||||||
runtime: WorkflowRuntime,
|
runtime: WorkflowRuntime,
|
||||||
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
|
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
|
||||||
const h = await runtime.cas.put("step");
|
const h = await runtime.cas.put("step");
|
||||||
yield { role: "only", contentHash: h, meta: {}, refs: [h] };
|
yield { role: "only", contentHash: h, meta: {}, refs: [h], childThread: null };
|
||||||
return { returnCode: 0, summary: "done" };
|
return { returnCode: 0, summary: "done" };
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,306 @@
|
|||||||
|
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
|
||||||
|
import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
|
||||||
|
import { tmpdir } from "node:os";
|
||||||
|
import { join } from "node:path";
|
||||||
|
import type { CasStore } from "@uncaged/workflow-cas";
|
||||||
|
import { createCasStore, parseCasThreadNode } from "@uncaged/workflow-cas";
|
||||||
|
import type { StartNode, StateNode } from "@uncaged/workflow-protocol";
|
||||||
|
import type {
|
||||||
|
RoleOutput,
|
||||||
|
ThreadContext,
|
||||||
|
WorkflowCompletion,
|
||||||
|
WorkflowFn,
|
||||||
|
WorkflowRuntime,
|
||||||
|
} from "@uncaged/workflow-runtime";
|
||||||
|
|
||||||
|
import { executeThread } from "../src/engine/engine.js";
|
||||||
|
import type { ExecuteThreadIo, ExecuteThreadOptions } from "../src/engine/types.js";
|
||||||
|
|
||||||
|
const TEST_REGISTRY_YAML = `config:
|
||||||
|
maxDepth: 3
|
||||||
|
supervisorInterval: 0
|
||||||
|
providers:
|
||||||
|
stub:
|
||||||
|
baseUrl: http://127.0.0.1:9
|
||||||
|
apiKey: test
|
||||||
|
models:
|
||||||
|
default: stub/m
|
||||||
|
workflows: {}
|
||||||
|
`;
|
||||||
|
|
||||||
|
function noLogger(): (tag: string, content: string) => void {
|
||||||
|
return () => {};
|
||||||
|
}
|
||||||
|
|
||||||
|
function makeOptions(overrides: Partial<ExecuteThreadOptions>): ExecuteThreadOptions {
|
||||||
|
return {
|
||||||
|
depth: 0,
|
||||||
|
parentStateHash: null,
|
||||||
|
signal: new AbortController().signal,
|
||||||
|
awaitAfterEachYield: async () => {},
|
||||||
|
forkSourceThreadId: null,
|
||||||
|
prefilledDiskSteps: null,
|
||||||
|
forkContinuation: null,
|
||||||
|
replayTimestamps: null,
|
||||||
|
storageRoot: "/tmp/never",
|
||||||
|
...overrides,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
async function setupStorage(): Promise<{
|
||||||
|
storageRoot: string;
|
||||||
|
casDir: string;
|
||||||
|
}> {
|
||||||
|
const storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-merkle-"));
|
||||||
|
await writeFile(join(storageRoot, "workflow.yaml"), TEST_REGISTRY_YAML, "utf8");
|
||||||
|
const casDir = join(storageRoot, "cas");
|
||||||
|
await mkdir(casDir, { recursive: true });
|
||||||
|
return { storageRoot, casDir };
|
||||||
|
}
|
||||||
|
|
||||||
|
async function loadStartNode(cas: CasStore, endHash: string): Promise<StartNode> {
|
||||||
|
const endBlob = await cas.get(endHash);
|
||||||
|
const endParsed = parseCasThreadNode(endBlob ?? "");
|
||||||
|
if (endParsed?.kind !== "state") throw new Error("expected state node");
|
||||||
|
const startBlob = await cas.get(endParsed.node.payload.start);
|
||||||
|
const startParsed = parseCasThreadNode(startBlob ?? "");
|
||||||
|
if (startParsed?.kind !== "start") throw new Error("expected start node");
|
||||||
|
return startParsed.node;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function loadStateNode(cas: CasStore, hash: string): Promise<StateNode> {
|
||||||
|
const blob = await cas.get(hash);
|
||||||
|
const parsed = parseCasThreadNode(blob ?? "");
|
||||||
|
if (parsed?.kind !== "state") throw new Error("expected state node");
|
||||||
|
return parsed.node;
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("Merkle call stack — cross-thread DAG linking (Phase 2)", () => {
|
||||||
|
let storageRoot: string;
|
||||||
|
let casDir: string;
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
const setup = await setupStorage();
|
||||||
|
storageRoot = setup.storageRoot;
|
||||||
|
casDir = setup.casDir;
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async () => {
|
||||||
|
await rm(storageRoot, { recursive: true, force: true });
|
||||||
|
});
|
||||||
|
|
||||||
|
test("parentStateHash is written into child start node's parentState and refs", async () => {
|
||||||
|
const cas = createCasStore(casDir);
|
||||||
|
|
||||||
|
// biome-ignore lint/correctness/useYield: testing start-only path
|
||||||
|
const parentWf: WorkflowFn = async function* (
|
||||||
|
_thread: ThreadContext,
|
||||||
|
_runtime: WorkflowRuntime,
|
||||||
|
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
|
||||||
|
return { returnCode: 0, summary: "parent done" };
|
||||||
|
};
|
||||||
|
|
||||||
|
const parentResult = await executeThread(
|
||||||
|
parentWf,
|
||||||
|
"parent-wf",
|
||||||
|
{ prompt: "parent task", steps: [] },
|
||||||
|
makeOptions({ storageRoot }),
|
||||||
|
{
|
||||||
|
threadId: "P_THREAD_01",
|
||||||
|
hash: "PARENTHASH0001",
|
||||||
|
infoJsonlPath: join(storageRoot, "logs", "PARENTHASH0001", "P1.info.jsonl"),
|
||||||
|
cas,
|
||||||
|
},
|
||||||
|
noLogger(),
|
||||||
|
);
|
||||||
|
|
||||||
|
// biome-ignore lint/correctness/useYield: testing start-only path
|
||||||
|
const childWf: WorkflowFn = async function* (
|
||||||
|
_thread: ThreadContext,
|
||||||
|
_runtime: WorkflowRuntime,
|
||||||
|
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
|
||||||
|
return { returnCode: 0, summary: "child done" };
|
||||||
|
};
|
||||||
|
|
||||||
|
const childResult = await executeThread(
|
||||||
|
childWf,
|
||||||
|
"child-wf",
|
||||||
|
{ prompt: "child task", steps: [] },
|
||||||
|
makeOptions({ storageRoot, depth: 1, parentStateHash: parentResult.rootHash }),
|
||||||
|
{
|
||||||
|
threadId: "C_THREAD_01",
|
||||||
|
hash: "CHILDHASH00001",
|
||||||
|
infoJsonlPath: join(storageRoot, "logs", "CHILDHASH00001", "C1.info.jsonl"),
|
||||||
|
cas,
|
||||||
|
},
|
||||||
|
noLogger(),
|
||||||
|
);
|
||||||
|
|
||||||
|
const childStart = await loadStartNode(cas, childResult.rootHash);
|
||||||
|
expect(childStart.payload.parentState).toBe(parentResult.rootHash);
|
||||||
|
expect(childStart.refs).toContain(parentResult.rootHash);
|
||||||
|
});
|
||||||
|
|
||||||
|
test("childThread on parent state node points to child's final state and is in refs", async () => {
|
||||||
|
const cas = createCasStore(casDir);
|
||||||
|
const childFinalHash = "CHILD_FINAL_001";
|
||||||
|
|
||||||
|
const parentWf: WorkflowFn = async function* (
|
||||||
|
_thread: ThreadContext,
|
||||||
|
runtime: WorkflowRuntime,
|
||||||
|
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
|
||||||
|
const h = await runtime.cas.put("developer output");
|
||||||
|
yield {
|
||||||
|
role: "developer",
|
||||||
|
contentHash: h,
|
||||||
|
meta: { action: "delegate" },
|
||||||
|
refs: [h],
|
||||||
|
childThread: childFinalHash,
|
||||||
|
};
|
||||||
|
return { returnCode: 0, summary: "parent complete" };
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = await executeThread(
|
||||||
|
parentWf,
|
||||||
|
"parent-wf",
|
||||||
|
{ prompt: "parent task", steps: [] },
|
||||||
|
makeOptions({ storageRoot }),
|
||||||
|
{
|
||||||
|
threadId: "P_THREAD_02",
|
||||||
|
hash: "CTHREAD_TEST01",
|
||||||
|
infoJsonlPath: join(storageRoot, "logs", "CTHREAD_TEST01", "P2.info.jsonl"),
|
||||||
|
cas,
|
||||||
|
},
|
||||||
|
noLogger(),
|
||||||
|
);
|
||||||
|
|
||||||
|
const endNode = await loadStateNode(cas, result.rootHash);
|
||||||
|
const devStateHash = endNode.payload.ancestors[0] ?? "";
|
||||||
|
const devNode = await loadStateNode(cas, devStateHash);
|
||||||
|
|
||||||
|
expect(devNode.payload.role).toBe("developer");
|
||||||
|
expect(devNode.payload.childThread).toBe(childFinalHash);
|
||||||
|
expect(devNode.refs).toContain(childFinalHash);
|
||||||
|
});
|
||||||
|
|
||||||
|
test("parent state with no child has childThread: null", async () => {
|
||||||
|
const cas = createCasStore(casDir);
|
||||||
|
|
||||||
|
const wf: WorkflowFn = async function* (
|
||||||
|
_thread: ThreadContext,
|
||||||
|
runtime: WorkflowRuntime,
|
||||||
|
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
|
||||||
|
const h = await runtime.cas.put("prep output");
|
||||||
|
yield { role: "preparer", contentHash: h, meta: {}, refs: [h], childThread: null };
|
||||||
|
return { returnCode: 0, summary: "done" };
|
||||||
|
};
|
||||||
|
|
||||||
|
const result = await executeThread(
|
||||||
|
wf,
|
||||||
|
"test-wf",
|
||||||
|
{ prompt: "task", steps: [] },
|
||||||
|
makeOptions({ storageRoot }),
|
||||||
|
{
|
||||||
|
threadId: "NULL_CT_01",
|
||||||
|
hash: "NULLCT_TEST001",
|
||||||
|
infoJsonlPath: join(storageRoot, "logs", "NULLCT_TEST001", "N1.info.jsonl"),
|
||||||
|
cas,
|
||||||
|
},
|
||||||
|
noLogger(),
|
||||||
|
);
|
||||||
|
|
||||||
|
const endNode = await loadStateNode(cas, result.rootHash);
|
||||||
|
const prepHash = endNode.payload.ancestors[0] ?? "";
|
||||||
|
const prepNode = await loadStateNode(cas, prepHash);
|
||||||
|
|
||||||
|
expect(prepNode.payload.childThread).toBeNull();
|
||||||
|
expect(prepNode.refs).not.toContain(null);
|
||||||
|
});
|
||||||
|
|
||||||
|
test("full bidirectional: child parentState is traversable to parent's context", async () => {
|
||||||
|
const cas = createCasStore(casDir);
|
||||||
|
const parentHash = "BIDIR_PARENT01";
|
||||||
|
|
||||||
|
const parentWf: WorkflowFn = async function* (
|
||||||
|
_thread: ThreadContext,
|
||||||
|
runtime: WorkflowRuntime,
|
||||||
|
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
|
||||||
|
const h1 = await runtime.cas.put("preparation output");
|
||||||
|
yield {
|
||||||
|
role: "preparer",
|
||||||
|
contentHash: h1,
|
||||||
|
meta: { repoPath: "/test" },
|
||||||
|
refs: [h1],
|
||||||
|
childThread: null,
|
||||||
|
};
|
||||||
|
const h2 = await runtime.cas.put("developer output");
|
||||||
|
yield {
|
||||||
|
role: "developer",
|
||||||
|
contentHash: h2,
|
||||||
|
meta: { action: "code" },
|
||||||
|
refs: [h2],
|
||||||
|
childThread: "CHILD_END_HASH1",
|
||||||
|
};
|
||||||
|
return { returnCode: 0, summary: "all done" };
|
||||||
|
};
|
||||||
|
|
||||||
|
const observedHeads: string[] = [];
|
||||||
|
const opts = makeOptions({
|
||||||
|
storageRoot,
|
||||||
|
awaitAfterEachYield: async () => {
|
||||||
|
const bundleDir = join(storageRoot, "bundles", parentHash);
|
||||||
|
const text = await readFile(join(bundleDir, "threads.json"), "utf8");
|
||||||
|
const parsed = JSON.parse(text) as Record<string, { head: string }>;
|
||||||
|
const head = parsed.BIDIR_T_001?.head ?? null;
|
||||||
|
if (head !== null) observedHeads.push(head);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
await executeThread(
|
||||||
|
parentWf,
|
||||||
|
"bidir-wf",
|
||||||
|
{ prompt: "bidir test", steps: [] },
|
||||||
|
opts,
|
||||||
|
{
|
||||||
|
threadId: "BIDIR_T_001",
|
||||||
|
hash: parentHash,
|
||||||
|
infoJsonlPath: join(storageRoot, "logs", parentHash, "BD1.info.jsonl"),
|
||||||
|
cas,
|
||||||
|
},
|
||||||
|
noLogger(),
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(observedHeads.length).toBe(2);
|
||||||
|
const preparerStateHash = observedHeads[0] ?? "";
|
||||||
|
|
||||||
|
// Execute child with parentState pointing to parent's preparer state
|
||||||
|
// biome-ignore lint/correctness/useYield: testing start-only path
|
||||||
|
const childWf: WorkflowFn = async function* (
|
||||||
|
_t: ThreadContext,
|
||||||
|
_r: WorkflowRuntime,
|
||||||
|
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
|
||||||
|
return { returnCode: 0, summary: "child ok" };
|
||||||
|
};
|
||||||
|
|
||||||
|
const childResult = await executeThread(
|
||||||
|
childWf,
|
||||||
|
"bidir-child",
|
||||||
|
{ prompt: "child bidir", steps: [] },
|
||||||
|
makeOptions({ storageRoot, depth: 1, parentStateHash: preparerStateHash }),
|
||||||
|
{
|
||||||
|
threadId: "BIDIR_C_001",
|
||||||
|
hash: "BIDIR_CHILD001",
|
||||||
|
infoJsonlPath: join(storageRoot, "logs", "BIDIR_CHILD001", "BC1.info.jsonl"),
|
||||||
|
cas,
|
||||||
|
},
|
||||||
|
noLogger(),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Upward traversal: child start → parentState → preparer state → meta.repoPath
|
||||||
|
const childStart = await loadStartNode(cas, childResult.rootHash);
|
||||||
|
expect(childStart.payload.parentState).toBe(preparerStateHash);
|
||||||
|
|
||||||
|
const parentPrep = await loadStateNode(cas, preparerStateHash);
|
||||||
|
expect(parentPrep.payload.meta.repoPath).toBe("/test");
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -94,6 +94,7 @@ async function appendStateForStep(params: {
|
|||||||
meta: Record<string, unknown>;
|
meta: Record<string, unknown>;
|
||||||
refs: readonly string[];
|
refs: readonly string[];
|
||||||
timestamp: number;
|
timestamp: number;
|
||||||
|
childThread: string | null;
|
||||||
}): Promise<{ stateHash: string; chain: ChainState }> {
|
}): Promise<{ stateHash: string; chain: ChainState }> {
|
||||||
const text = await getContentMerklePayload(params.cas, params.contentHash);
|
const text = await getContentMerklePayload(params.cas, params.contentHash);
|
||||||
if (text === null) {
|
if (text === null) {
|
||||||
@@ -112,7 +113,7 @@ async function appendStateForStep(params: {
|
|||||||
ancestors,
|
ancestors,
|
||||||
compact: null,
|
compact: null,
|
||||||
timestamp: params.timestamp,
|
timestamp: params.timestamp,
|
||||||
childThread: null,
|
childThread: params.childThread,
|
||||||
};
|
};
|
||||||
const stateHash = await putStateNode(params.cas, payload);
|
const stateHash = await putStateNode(params.cas, payload);
|
||||||
return {
|
return {
|
||||||
@@ -331,6 +332,7 @@ async function driveWorkflowGenerator(params: {
|
|||||||
meta: step.meta,
|
meta: step.meta,
|
||||||
refs: step.refs,
|
refs: step.refs,
|
||||||
timestamp: ts,
|
timestamp: ts,
|
||||||
|
childThread: step.childThread ?? null,
|
||||||
});
|
});
|
||||||
chain = written_.chain;
|
chain = written_.chain;
|
||||||
await publishHead({ bundleDir, threadId, startHash, headHash: written_.stateHash });
|
await publishHead({ bundleDir, threadId, startHash, headHash: written_.stateHash });
|
||||||
@@ -441,7 +443,7 @@ export async function executeThread(
|
|||||||
name: workflowName,
|
name: workflowName,
|
||||||
hash: io.hash,
|
hash: io.hash,
|
||||||
depth: options.depth,
|
depth: options.depth,
|
||||||
parentState: null,
|
parentState: options.parentStateHash,
|
||||||
},
|
},
|
||||||
promptHash,
|
promptHash,
|
||||||
);
|
);
|
||||||
@@ -469,6 +471,7 @@ export async function executeThread(
|
|||||||
meta: row.meta,
|
meta: row.meta,
|
||||||
refs: row.refs,
|
refs: row.refs,
|
||||||
timestamp: row.timestamp,
|
timestamp: row.timestamp,
|
||||||
|
childThread: null,
|
||||||
});
|
});
|
||||||
chain = written.chain;
|
chain = written.chain;
|
||||||
await publishHead({
|
await publishHead({
|
||||||
@@ -490,6 +493,7 @@ export async function executeThread(
|
|||||||
const thread: ThreadContext = {
|
const thread: ThreadContext = {
|
||||||
threadId: io.threadId,
|
threadId: io.threadId,
|
||||||
depth: options.depth,
|
depth: options.depth,
|
||||||
|
bundleHash: io.hash,
|
||||||
start: {
|
start: {
|
||||||
role: START,
|
role: START,
|
||||||
content: input.prompt,
|
content: input.prompt,
|
||||||
|
|||||||
@@ -144,6 +144,7 @@ async function payloadToRoleOutput(cas: CasStore, payload: StateNodePayload): Pr
|
|||||||
contentHash: payload.content,
|
contentHash: payload.content,
|
||||||
meta: payload.meta,
|
meta: payload.meta,
|
||||||
refs,
|
refs,
|
||||||
|
childThread: payload.childThread,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -41,6 +41,8 @@ export type PrefilledDiskStep = {
|
|||||||
export type ExecuteThreadOptions = {
|
export type ExecuteThreadOptions = {
|
||||||
/** Passed to the bundle thread context as `ThreadContext.depth`. */
|
/** Passed to the bundle thread context as `ThreadContext.depth`. */
|
||||||
depth: number;
|
depth: number;
|
||||||
|
/** Parent thread's head state hash at spawn time; `null` for top-level threads. */
|
||||||
|
parentStateHash: string | null;
|
||||||
signal: AbortSignal;
|
signal: AbortSignal;
|
||||||
/** Invoked after each successful yield (and outer-loop checks); used for pause/resume. */
|
/** Invoked after each successful yield (and outer-loop checks); used for pause/resume. */
|
||||||
awaitAfterEachYield: () => Promise<void>;
|
awaitAfterEachYield: () => Promise<void>;
|
||||||
|
|||||||
@@ -72,11 +72,13 @@ function parseRoleOutputRecord(obj: Record<string, unknown>): RoleOutput | null
|
|||||||
if (meta === null || typeof meta !== "object") {
|
if (meta === null || typeof meta !== "object") {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
const childThread = obj.childThread;
|
||||||
return {
|
return {
|
||||||
role,
|
role,
|
||||||
contentHash,
|
contentHash,
|
||||||
meta: meta as Record<string, unknown>,
|
meta: meta as Record<string, unknown>,
|
||||||
refs: normalizeRefsField(obj.refs),
|
refs: normalizeRefsField(obj.refs),
|
||||||
|
childThread: typeof childThread === "string" ? childThread : null,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -497,6 +499,7 @@ async function main(): Promise<void> {
|
|||||||
{ prompt: cmd.prompt, steps: cmd.steps },
|
{ prompt: cmd.prompt, steps: cmd.steps },
|
||||||
{
|
{
|
||||||
...cmd.options,
|
...cmd.options,
|
||||||
|
parentStateHash: null,
|
||||||
signal: ac.signal,
|
signal: ac.signal,
|
||||||
awaitAfterEachYield: () => pauseGate.awaitAfterYield(),
|
awaitAfterEachYield: () => pauseGate.awaitAfterYield(),
|
||||||
forkSourceThreadId: cmd.forkSourceThreadId,
|
forkSourceThreadId: cmd.forkSourceThreadId,
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import {
|
|||||||
getRegisteredWorkflow,
|
getRegisteredWorkflow,
|
||||||
readWorkflowRegistry,
|
readWorkflowRegistry,
|
||||||
} from "@uncaged/workflow-register";
|
} from "@uncaged/workflow-register";
|
||||||
import type { AgentContext, AgentFn } from "@uncaged/workflow-runtime";
|
import type { AgentContext, AgentFn, AgentFnResult } from "@uncaged/workflow-runtime";
|
||||||
import {
|
import {
|
||||||
createLogger,
|
createLogger,
|
||||||
generateUlid,
|
generateUlid,
|
||||||
@@ -14,7 +14,7 @@ import {
|
|||||||
getGlobalCasDir,
|
getGlobalCasDir,
|
||||||
} from "@uncaged/workflow-util";
|
} from "@uncaged/workflow-util";
|
||||||
import type { ExecuteThreadIo } from "./engine/index.js";
|
import type { ExecuteThreadIo } from "./engine/index.js";
|
||||||
import { executeThread } from "./engine/index.js";
|
import { executeThread, getBundleDir, readThreadsIndex } from "./engine/index.js";
|
||||||
|
|
||||||
const DEFAULT_WORKFLOW_AS_AGENT_MAX_DEPTH = 3;
|
const DEFAULT_WORKFLOW_AS_AGENT_MAX_DEPTH = 3;
|
||||||
|
|
||||||
@@ -37,6 +37,13 @@ function resolveWorkflowAsAgentStorageRoot(options: WorkflowAsAgentOptions | nul
|
|||||||
return getDefaultWorkflowStorageRoot();
|
return getDefaultWorkflowStorageRoot();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function readParentHeadState(storageRoot: string, ctx: AgentContext): Promise<string | null> {
|
||||||
|
const bundleDir = getBundleDir(storageRoot, ctx.bundleHash);
|
||||||
|
const index = await readThreadsIndex(bundleDir);
|
||||||
|
const entry = index[ctx.threadId] ?? null;
|
||||||
|
return entry !== null ? entry.head : null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns an {@link AgentFn} that runs another registered workflow in a new thread,
|
* Returns an {@link AgentFn} that runs another registered workflow in a new thread,
|
||||||
* using the parent thread's initial prompt (`ctx.start.content`) as the child prompt.
|
* using the parent thread's initial prompt (`ctx.start.content`) as the child prompt.
|
||||||
@@ -45,7 +52,7 @@ export function workflowAsAgent(
|
|||||||
workflowName: string,
|
workflowName: string,
|
||||||
options: WorkflowAsAgentOptions | null = null,
|
options: WorkflowAsAgentOptions | null = null,
|
||||||
): AgentFn {
|
): AgentFn {
|
||||||
return async (ctx: AgentContext): Promise<string> => {
|
return async (ctx: AgentContext): Promise<AgentFnResult> => {
|
||||||
const nextDepth = ctx.depth + 1;
|
const nextDepth = ctx.depth + 1;
|
||||||
|
|
||||||
const storageRoot = resolveWorkflowAsAgentStorageRoot(options);
|
const storageRoot = resolveWorkflowAsAgentStorageRoot(options);
|
||||||
@@ -89,6 +96,8 @@ export function workflowAsAgent(
|
|||||||
const logger = createLogger({ sink: { kind: "file", path: infoJsonlPath } });
|
const logger = createLogger({ sink: { kind: "file", path: infoJsonlPath } });
|
||||||
const signalNever = new AbortController();
|
const signalNever = new AbortController();
|
||||||
|
|
||||||
|
const parentHeadState = await readParentHeadState(storageRoot, ctx);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const result = await executeThread(
|
const result = await executeThread(
|
||||||
bundleExportsResult.value.run,
|
bundleExportsResult.value.run,
|
||||||
@@ -96,6 +105,7 @@ export function workflowAsAgent(
|
|||||||
input,
|
input,
|
||||||
{
|
{
|
||||||
depth: nextDepth,
|
depth: nextDepth,
|
||||||
|
parentStateHash: parentHeadState,
|
||||||
signal: signalNever.signal,
|
signal: signalNever.signal,
|
||||||
awaitAfterEachYield: async () => {},
|
awaitAfterEachYield: async () => {},
|
||||||
forkSourceThreadId: ctx.threadId,
|
forkSourceThreadId: ctx.threadId,
|
||||||
@@ -107,7 +117,8 @@ export function workflowAsAgent(
|
|||||||
io,
|
io,
|
||||||
logger,
|
logger,
|
||||||
);
|
);
|
||||||
return `Child workflow "${workflowName}" completed (returnCode=${result.returnCode}).\n\nSummary: ${result.summary}\n\nChild thread root hash: ${result.rootHash}`;
|
const summary = `Child workflow "${workflowName}" completed (returnCode=${result.returnCode}).\n\nSummary: ${result.summary}\n\nChild thread root hash: ${result.rootHash}`;
|
||||||
|
return { output: summary, childThread: result.rootHash };
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
const message = e instanceof Error ? e.message : String(e);
|
const message = e instanceof Error ? e.message : String(e);
|
||||||
return `ERROR: ${message}`;
|
return `ERROR: ${message}`;
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ function makeCtx(roles: (keyof TestMeta & string)[]): ModeratorContext<TestMeta>
|
|||||||
return {
|
return {
|
||||||
threadId: "test-thread",
|
threadId: "test-thread",
|
||||||
depth: 0,
|
depth: 0,
|
||||||
|
bundleHash: "TESTHASH00001",
|
||||||
start: {
|
start: {
|
||||||
role: START,
|
role: START,
|
||||||
content: "test",
|
content: "test",
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ export type {
|
|||||||
AgentBinding,
|
AgentBinding,
|
||||||
AgentContext,
|
AgentContext,
|
||||||
AgentFn,
|
AgentFn,
|
||||||
|
AgentFnResult,
|
||||||
CasStore,
|
CasStore,
|
||||||
ExtractFn,
|
ExtractFn,
|
||||||
ExtractResult,
|
ExtractResult,
|
||||||
|
|||||||
@@ -54,6 +54,7 @@ export type RoleOutput = {
|
|||||||
contentHash: string;
|
contentHash: string;
|
||||||
meta: Record<string, unknown>;
|
meta: Record<string, unknown>;
|
||||||
refs: string[];
|
refs: string[];
|
||||||
|
childThread: string | null;
|
||||||
};
|
};
|
||||||
|
|
||||||
export type StartStep = {
|
export type StartStep = {
|
||||||
@@ -76,6 +77,7 @@ export type RoleStep<M extends RoleMeta> = {
|
|||||||
export type ThreadContext<M extends RoleMeta = RoleMeta> = {
|
export type ThreadContext<M extends RoleMeta = RoleMeta> = {
|
||||||
threadId: string;
|
threadId: string;
|
||||||
depth: number;
|
depth: number;
|
||||||
|
bundleHash: string;
|
||||||
start: StartStep;
|
start: StartStep;
|
||||||
steps: RoleStep<M>[];
|
steps: RoleStep<M>[];
|
||||||
};
|
};
|
||||||
@@ -140,7 +142,9 @@ export type ExtractFn = <T extends Record<string, unknown>>(
|
|||||||
contentHash: string,
|
contentHash: string,
|
||||||
) => Promise<ExtractResult<T>>;
|
) => Promise<ExtractResult<T>>;
|
||||||
|
|
||||||
export type AgentFn = (ctx: AgentContext) => Promise<string>;
|
export type AgentFnResult = string | { output: string; childThread: string | null };
|
||||||
|
|
||||||
|
export type AgentFn = (ctx: AgentContext) => Promise<AgentFnResult>;
|
||||||
|
|
||||||
export type AgentBinding = {
|
export type AgentBinding = {
|
||||||
agent: AgentFn;
|
agent: AgentFn;
|
||||||
|
|||||||
@@ -54,6 +54,7 @@ async function threadFromStartHead<M extends RoleMeta>(
|
|||||||
return {
|
return {
|
||||||
threadId: "",
|
threadId: "",
|
||||||
depth: p.depth,
|
depth: p.depth,
|
||||||
|
bundleHash: p.hash,
|
||||||
start: {
|
start: {
|
||||||
role: START,
|
role: START,
|
||||||
content: prompt,
|
content: prompt,
|
||||||
@@ -113,6 +114,7 @@ async function threadFromStateHead<M extends RoleMeta>(
|
|||||||
return {
|
return {
|
||||||
threadId: "",
|
threadId: "",
|
||||||
depth: sp.depth,
|
depth: sp.depth,
|
||||||
|
bundleHash: sp.hash,
|
||||||
start: {
|
start: {
|
||||||
role: START,
|
role: START,
|
||||||
content: prompt,
|
content: prompt,
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import {
|
|||||||
type AgentBinding,
|
type AgentBinding,
|
||||||
type AgentContext,
|
type AgentContext,
|
||||||
type AgentFn,
|
type AgentFn,
|
||||||
|
type AgentFnResult,
|
||||||
END,
|
END,
|
||||||
type ModeratorContext,
|
type ModeratorContext,
|
||||||
type RoleDefinition,
|
type RoleDefinition,
|
||||||
@@ -50,6 +51,16 @@ function mergeUniqueHashes(a: readonly string[], b: readonly string[]): string[]
|
|||||||
return out;
|
return out;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function normalizeAgentResult(result: AgentFnResult): {
|
||||||
|
output: string;
|
||||||
|
childThread: string | null;
|
||||||
|
} {
|
||||||
|
if (typeof result === "string") {
|
||||||
|
return { output: result, childThread: null };
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
function agentForRole(binding: AgentBinding, roleName: string): AgentFn {
|
function agentForRole(binding: AgentBinding, roleName: string): AgentFn {
|
||||||
const overrides = binding.overrides;
|
const overrides = binding.overrides;
|
||||||
const overrideFn: AgentFn | undefined =
|
const overrideFn: AgentFn | undefined =
|
||||||
@@ -89,9 +100,9 @@ async function advanceOneRound<M extends RoleMeta>(
|
|||||||
};
|
};
|
||||||
|
|
||||||
const agent = agentForRole(binding, next);
|
const agent = agentForRole(binding, next);
|
||||||
const raw = await agent(agentCtx as unknown as AgentContext);
|
const agentResult = normalizeAgentResult(await agent(agentCtx as unknown as AgentContext));
|
||||||
|
|
||||||
const agentContentHash = await putContentNodeWithRefs(runtime.cas, raw, []);
|
const agentContentHash = await putContentNodeWithRefs(runtime.cas, agentResult.output, []);
|
||||||
|
|
||||||
const extracted = await runtime.extract(
|
const extracted = await runtime.extract(
|
||||||
roleDef.schema as z.ZodType<Record<string, unknown>>,
|
roleDef.schema as z.ZodType<Record<string, unknown>>,
|
||||||
@@ -125,6 +136,7 @@ async function advanceOneRound<M extends RoleMeta>(
|
|||||||
contentHash: step.contentHash,
|
contentHash: step.contentHash,
|
||||||
meta: step.meta,
|
meta: step.meta,
|
||||||
refs: step.refs,
|
refs: step.refs,
|
||||||
|
childThread: agentResult.childThread,
|
||||||
},
|
},
|
||||||
step,
|
step,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ export type {
|
|||||||
AgentBinding,
|
AgentBinding,
|
||||||
AgentContext,
|
AgentContext,
|
||||||
AgentFn,
|
AgentFn,
|
||||||
|
AgentFnResult,
|
||||||
CasStore,
|
CasStore,
|
||||||
ExtractFn,
|
ExtractFn,
|
||||||
ExtractResult,
|
ExtractResult,
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ export type {
|
|||||||
AgentBinding,
|
AgentBinding,
|
||||||
AgentContext,
|
AgentContext,
|
||||||
AgentFn,
|
AgentFn,
|
||||||
|
AgentFnResult,
|
||||||
CasStore,
|
CasStore,
|
||||||
ExtractFn,
|
ExtractFn,
|
||||||
ExtractResult,
|
ExtractResult,
|
||||||
|
|||||||
@@ -29,6 +29,7 @@ function makeCtx(steps: ModeratorContext<DevelopMeta>["steps"]): ModeratorContex
|
|||||||
return {
|
return {
|
||||||
threadId: "01TEST000000000000000000TR",
|
threadId: "01TEST000000000000000000TR",
|
||||||
depth: 0,
|
depth: 0,
|
||||||
|
bundleHash: "TESTHASH00001",
|
||||||
start: makeStart(),
|
start: makeStart(),
|
||||||
steps,
|
steps,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -116,6 +116,7 @@ function makeCtx(
|
|||||||
return {
|
return {
|
||||||
threadId: "01TEST000000000000000000TR",
|
threadId: "01TEST000000000000000000TR",
|
||||||
depth: 0,
|
depth: 0,
|
||||||
|
bundleHash: "TESTHASH00001",
|
||||||
start: makeStart(),
|
start: makeStart(),
|
||||||
steps,
|
steps,
|
||||||
};
|
};
|
||||||
@@ -181,6 +182,7 @@ function makeThread(prompt: string) {
|
|||||||
return {
|
return {
|
||||||
threadId: "01TEST000000000000000000TR",
|
threadId: "01TEST000000000000000000TR",
|
||||||
depth: 0,
|
depth: 0,
|
||||||
|
bundleHash: "TESTHASH00001",
|
||||||
start: {
|
start: {
|
||||||
role: START,
|
role: START,
|
||||||
content: prompt,
|
content: prompt,
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ describe("buildAgentPrompt", () => {
|
|||||||
const ctx: AgentContext = {
|
const ctx: AgentContext = {
|
||||||
start: startTask("fix the bug"),
|
start: startTask("fix the bug"),
|
||||||
depth: 0,
|
depth: 0,
|
||||||
|
bundleHash: "TESTHASH00001",
|
||||||
steps: [],
|
steps: [],
|
||||||
threadId: "01TEST000000000000000000TR",
|
threadId: "01TEST000000000000000000TR",
|
||||||
currentRole: { name: START, systemPrompt: "You are an agent." },
|
currentRole: { name: START, systemPrompt: "You are an agent." },
|
||||||
@@ -33,6 +34,7 @@ describe("buildAgentPrompt", () => {
|
|||||||
const ctx: AgentContext = {
|
const ctx: AgentContext = {
|
||||||
start: startTask("user task"),
|
start: startTask("user task"),
|
||||||
depth: 0,
|
depth: 0,
|
||||||
|
bundleHash: "TESTHASH00001",
|
||||||
threadId: "01TEST000000000000000000TR",
|
threadId: "01TEST000000000000000000TR",
|
||||||
currentRole: { name: "coder", systemPrompt: "Be helpful." },
|
currentRole: { name: "coder", systemPrompt: "Be helpful." },
|
||||||
steps: [
|
steps: [
|
||||||
@@ -61,6 +63,7 @@ describe("buildAgentPrompt", () => {
|
|||||||
const ctx: AgentContext = {
|
const ctx: AgentContext = {
|
||||||
start: startTask("first message full: task content here"),
|
start: startTask("first message full: task content here"),
|
||||||
depth: 0,
|
depth: 0,
|
||||||
|
bundleHash: "TESTHASH00001",
|
||||||
threadId: "01TEST000000000000000000TR",
|
threadId: "01TEST000000000000000000TR",
|
||||||
currentRole: { name: "coder", systemPrompt: "System." },
|
currentRole: { name: "coder", systemPrompt: "System." },
|
||||||
steps: [
|
steps: [
|
||||||
@@ -99,6 +102,7 @@ describe("buildAgentPrompt", () => {
|
|||||||
const ctx: AgentContext = {
|
const ctx: AgentContext = {
|
||||||
start: startTask("start"),
|
start: startTask("start"),
|
||||||
depth: 0,
|
depth: 0,
|
||||||
|
bundleHash: "TESTHASH00001",
|
||||||
threadId: "01TEST000000000000000000TR",
|
threadId: "01TEST000000000000000000TR",
|
||||||
currentRole: { name: "c", systemPrompt: "S" },
|
currentRole: { name: "c", systemPrompt: "S" },
|
||||||
steps: [
|
steps: [
|
||||||
|
|||||||
Reference in New Issue
Block a user