diff --git a/packages/workflow-execute/__tests__/json-cas-engine.test.ts b/packages/workflow-execute/__tests__/json-cas-engine.test.ts new file mode 100644 index 0000000..e904aa7 --- /dev/null +++ b/packages/workflow-execute/__tests__/json-cas-engine.test.ts @@ -0,0 +1,869 @@ +import { describe, expect, test } from "bun:test"; +import { createMemoryStore, type Store, walk } from "@uncaged/json-cas"; +import { + registerWorkflowSchemas, + type WorkflowSchemaHashes, + type ContentPayload, + type ThreadEndPayload, + type ThreadStartPayload, + type ThreadStepPayload, +} from "@uncaged/json-cas-workflow"; +import { registerWorkflow, type WorkflowInput } from "@uncaged/workflow-json-def"; + +import { + buildJsonCasThreadContext, + buildJsonCasThreadSnapshot, + readContentText, +} from "../src/engine/json-cas-context.js"; +import { executeJsonCasThread } from "../src/engine/json-cas-engine.js"; +import type { + JsonCasAgentFn, + JsonCasEngineIo, + JsonCasEngineOptions, +} from "../src/engine/json-cas-types.js"; + +// ── Test fixtures ───────────────────────────────────────────────────── + +const START = "__start__"; +const END = "__end__"; + +const SIMPLE_WORKFLOW: WorkflowInput = { + name: "test-simple", + description: "A simple two-role workflow for testing", + roles: { + planner: { + description: "Plans the work", + systemPrompt: "You are a planner.", + extractPrompt: "Extract planner output.", + schema: { + type: "object", + required: ["plan"], + properties: { plan: { type: "string" } }, + }, + }, + coder: { + description: "Implements the plan", + systemPrompt: "You are a coder.", + extractPrompt: "Extract coder output.", + schema: { + type: "object", + required: ["code"], + properties: { code: { type: "string" } }, + }, + }, + }, + moderator: [ + { from: START, to: "planner", when: null }, + { from: "planner", to: "coder", when: null }, + { from: "coder", to: END, when: null }, + ], +}; + +const SINGLE_ROLE_WORKFLOW: WorkflowInput = { + name: "test-single", + description: "A single-role workflow", + roles: { + worker: { + description: "Does all the work", + systemPrompt: "You are a worker.", + extractPrompt: "Extract worker output.", + schema: { + type: "object", + required: ["result"], + properties: { result: { type: "string" } }, + }, + }, + }, + moderator: [ + { from: START, to: "worker", when: null }, + { from: "worker", to: END, when: null }, + ], +}; + +const CONDITIONAL_WORKFLOW: WorkflowInput = { + name: "test-conditional", + description: "A workflow with JSONata conditions", + roles: { + checker: { + description: "Checks the input", + systemPrompt: "You are a checker.", + extractPrompt: "Extract checker output.", + schema: { + type: "object", + required: ["status"], + properties: { status: { type: "string" } }, + }, + }, + fixer: { + description: "Fixes issues", + systemPrompt: "You are a fixer.", + extractPrompt: "Extract fixer output.", + schema: { + type: "object", + required: ["fix"], + properties: { fix: { type: "string" } }, + }, + }, + }, + moderator: [ + { from: START, to: "checker", when: null }, + { from: "checker", to: END, when: "steps[-1].meta.status = 'ok'" }, + { from: "checker", to: "fixer", when: null }, + { from: "fixer", to: "checker", when: null }, + ], +}; + +function noLogger(): (tag: string, content: string) => void { + return () => {}; +} + +async function setupStore(): Promise<{ + store: Store; + typeHashes: WorkflowSchemaHashes; +}> { + const store = createMemoryStore(); + const typeHashes = await registerWorkflowSchemas(store); + return { store, typeHashes }; +} + +async function setupWorkflow( + store: Store, + typeHashes: WorkflowSchemaHashes, + workflowDef: WorkflowInput, +) { + const workflowHash = await registerWorkflow(store, typeHashes, workflowDef); + return { workflowHash }; +} + +function makeOptions(overrides: Partial = {}): JsonCasEngineOptions { + return { + depth: 0, + parentThread: null, + signal: new AbortController().signal, + agents: {}, + ...overrides, + }; +} + +function makeIo( + store: Store, + typeHashes: WorkflowSchemaHashes, + threadId: string, +): JsonCasEngineIo { + return { threadId, store, typeHashes }; +} + +/** + * A mock agent that returns a canned text and meta for each role. + */ +function createMockAgent( + responses: Record }>, +): JsonCasAgentFn { + return async (role, _systemPrompt, _snapshot) => { + const resp = responses[role]; + if (resp === undefined) { + throw new Error(`mock agent: no response configured for role "${role}"`); + } + return resp; + }; +} + +// ── Tests ───────────────────────────────────────────────────────────── + +describe("executeJsonCasThread", () => { + describe("thread lifecycle", () => { + test("simple two-role workflow creates start, two steps, and end nodes", async () => { + const { store, typeHashes } = await setupStore(); + const { workflowHash } = await setupWorkflow(store, typeHashes, SIMPLE_WORKFLOW); + + const agentFn = createMockAgent({ + planner: { text: "I will plan", meta: { plan: "phase-1" } }, + coder: { text: "I wrote code", meta: { code: "done" } }, + }); + + const result = await executeJsonCasThread({ + workflowHash, + input: "Build a widget", + moderatorRules: SIMPLE_WORKFLOW.moderator, + io: makeIo(store, typeHashes, "THREAD01"), + options: makeOptions(), + agentFn, + logger: noLogger(), + workflow: null, + }); + + expect(result.returnCode).toBe(0); + expect(result.summary).toContain("END"); + expect(result.rootHash).toBeTruthy(); + + const endNode = store.get(result.rootHash); + expect(endNode).not.toBeNull(); + const endPayload = endNode!.payload as ThreadEndPayload; + expect(endPayload.returnCode).toBe(0); + expect(endPayload.start).toBeTruthy(); + expect(endPayload.lastStep).toBeTruthy(); + }); + + test("single-role workflow creates correct chain", async () => { + const { store, typeHashes } = await setupStore(); + const { workflowHash } = await setupWorkflow(store, typeHashes, SINGLE_ROLE_WORKFLOW); + + const agentFn = createMockAgent({ + worker: { text: "work done", meta: { result: "success" } }, + }); + + const result = await executeJsonCasThread({ + workflowHash, + input: "Do the thing", + moderatorRules: SINGLE_ROLE_WORKFLOW.moderator, + io: makeIo(store, typeHashes, "THREAD02"), + options: makeOptions(), + agentFn, + logger: noLogger(), + workflow: null, + }); + + expect(result.returnCode).toBe(0); + + const endNode = store.get(result.rootHash); + expect(endNode).not.toBeNull(); + const endPayload = endNode!.payload as ThreadEndPayload; + + const lastStepNode = store.get(endPayload.lastStep); + expect(lastStepNode).not.toBeNull(); + const lastStepPayload = lastStepNode!.payload as ThreadStepPayload; + expect(lastStepPayload.role).toBe("worker"); + expect(lastStepPayload.previous).toBeNull(); + + const startNode = store.get(endPayload.start); + expect(startNode).not.toBeNull(); + const startPayload = startNode!.payload as ThreadStartPayload; + expect(startPayload.input).toBe("Do the thing"); + expect(startPayload.depth).toBe(0); + }); + }); + + describe("CAS node structure", () => { + test("thread-start contains workflow ref, input, depth, agents", async () => { + const { store, typeHashes } = await setupStore(); + const { workflowHash } = await setupWorkflow(store, typeHashes, SINGLE_ROLE_WORKFLOW); + + const agentFn = createMockAgent({ + worker: { text: "ok", meta: { result: "ok" } }, + }); + + const agentHash = await store.put(typeHashes.agent, { + package: "test-agent", + version: "1.0.0", + config: {}, + }); + + const result = await executeJsonCasThread({ + workflowHash, + input: "Test input", + moderatorRules: SINGLE_ROLE_WORKFLOW.moderator, + io: makeIo(store, typeHashes, "THREAD03"), + options: makeOptions({ agents: { worker: agentHash }, depth: 2 }), + agentFn, + logger: noLogger(), + workflow: null, + }); + + const endPayload = store.get(result.rootHash)!.payload as ThreadEndPayload; + const startPayload = store.get(endPayload.start)!.payload as ThreadStartPayload; + + expect(startPayload.workflow).toBe(workflowHash); + expect(startPayload.input).toBe("Test input"); + expect(startPayload.depth).toBe(2); + expect(startPayload.parentThread).toBeNull(); + expect(startPayload.agents).toEqual({ worker: agentHash }); + }); + + test("thread-start records parentThread when provided", async () => { + const { store, typeHashes } = await setupStore(); + const { workflowHash } = await setupWorkflow(store, typeHashes, SINGLE_ROLE_WORKFLOW); + + const agentFn = createMockAgent({ + worker: { text: "nested", meta: { result: "nested" } }, + }); + + const fakeParent = "FAKEPARENT0001"; + + const result = await executeJsonCasThread({ + workflowHash, + input: "nested task", + moderatorRules: SINGLE_ROLE_WORKFLOW.moderator, + io: makeIo(store, typeHashes, "THREAD04"), + options: makeOptions({ parentThread: fakeParent, depth: 1 }), + agentFn, + logger: noLogger(), + workflow: null, + }); + + const endPayload = store.get(result.rootHash)!.payload as ThreadEndPayload; + const startPayload = store.get(endPayload.start)!.payload as ThreadStartPayload; + expect(startPayload.parentThread).toBe(fakeParent); + expect(startPayload.depth).toBe(1); + }); + + test("each thread-step has content, react, start, and previous refs", async () => { + const { store, typeHashes } = await setupStore(); + const { workflowHash } = await setupWorkflow(store, typeHashes, SIMPLE_WORKFLOW); + + const agentFn = createMockAgent({ + planner: { text: "plan text", meta: { plan: "p1" } }, + coder: { text: "code text", meta: { code: "c1" } }, + }); + + const result = await executeJsonCasThread({ + workflowHash, + input: "go", + moderatorRules: SIMPLE_WORKFLOW.moderator, + io: makeIo(store, typeHashes, "THREAD05"), + options: makeOptions(), + agentFn, + logger: noLogger(), + workflow: null, + }); + + const endPayload = store.get(result.rootHash)!.payload as ThreadEndPayload; + const startHash = endPayload.start; + + const step2 = store.get(endPayload.lastStep)!.payload as ThreadStepPayload; + expect(step2.role).toBe("coder"); + expect(step2.start).toBe(startHash); + expect(step2.previous).not.toBeNull(); + + const contentNode2 = store.get(step2.content); + expect(contentNode2).not.toBeNull(); + expect((contentNode2!.payload as ContentPayload).text).toBe("code text"); + + const reactNode2 = store.get(step2.react); + expect(reactNode2).not.toBeNull(); + + const step1 = store.get(step2.previous!)!.payload as ThreadStepPayload; + expect(step1.role).toBe("planner"); + expect(step1.start).toBe(startHash); + expect(step1.previous).toBeNull(); + + const contentNode1 = store.get(step1.content); + expect(contentNode1).not.toBeNull(); + expect((contentNode1!.payload as ContentPayload).text).toBe("plan text"); + }); + + test("thread-end references start and last step", async () => { + const { store, typeHashes } = await setupStore(); + const { workflowHash } = await setupWorkflow(store, typeHashes, SIMPLE_WORKFLOW); + + const agentFn = createMockAgent({ + planner: { text: "plan", meta: { plan: "x" } }, + coder: { text: "code", meta: { code: "x" } }, + }); + + const result = await executeJsonCasThread({ + workflowHash, + input: "test", + moderatorRules: SIMPLE_WORKFLOW.moderator, + io: makeIo(store, typeHashes, "THREAD06"), + options: makeOptions(), + agentFn, + logger: noLogger(), + workflow: null, + }); + + const endPayload = store.get(result.rootHash)!.payload as ThreadEndPayload; + expect(endPayload.returnCode).toBe(0); + expect(endPayload.summary).toBeTruthy(); + + const startNode = store.get(endPayload.start); + expect(startNode).not.toBeNull(); + expect((startNode!.payload as ThreadStartPayload).workflow).toBe(workflowHash); + + const lastStepNode = store.get(endPayload.lastStep); + expect(lastStepNode).not.toBeNull(); + expect((lastStepNode!.payload as ThreadStepPayload).role).toBe("coder"); + }); + + test("content nodes store the agent text verbatim", async () => { + const { store, typeHashes } = await setupStore(); + const { workflowHash } = await setupWorkflow(store, typeHashes, SINGLE_ROLE_WORKFLOW); + + const longText = "This is a longer text with\nnewlines\nand special chars: <>&\"'"; + + const agentFn = createMockAgent({ + worker: { text: longText, meta: { result: "done" } }, + }); + + const result = await executeJsonCasThread({ + workflowHash, + input: "process this", + moderatorRules: SINGLE_ROLE_WORKFLOW.moderator, + io: makeIo(store, typeHashes, "THREAD07"), + options: makeOptions(), + agentFn, + logger: noLogger(), + workflow: null, + }); + + const endPayload = store.get(result.rootHash)!.payload as ThreadEndPayload; + const stepPayload = store.get(endPayload.lastStep)!.payload as ThreadStepPayload; + const contentPayload = store.get(stepPayload.content)!.payload as ContentPayload; + expect(contentPayload.text).toBe(longText); + }); + + test("meta is stored in thread-step payload", async () => { + const { store, typeHashes } = await setupStore(); + const { workflowHash } = await setupWorkflow(store, typeHashes, SIMPLE_WORKFLOW); + + const complexMeta = { + plan: "phase-1", + phases: [{ hash: "abc", title: "first" }], + nested: { deep: true }, + }; + + const agentFn = createMockAgent({ + planner: { text: "plan", meta: complexMeta }, + coder: { text: "code", meta: { code: "done" } }, + }); + + const result = await executeJsonCasThread({ + workflowHash, + input: "go", + moderatorRules: SIMPLE_WORKFLOW.moderator, + io: makeIo(store, typeHashes, "THREAD08"), + options: makeOptions(), + agentFn, + logger: noLogger(), + workflow: null, + }); + + const endPayload = store.get(result.rootHash)!.payload as ThreadEndPayload; + const step2 = store.get(endPayload.lastStep)!.payload as ThreadStepPayload; + const step1 = store.get(step2.previous!)!.payload as ThreadStepPayload; + + expect(step1.meta).toEqual(complexMeta); + expect(step2.meta).toEqual({ code: "done" }); + }); + }); + + describe("moderator routing", () => { + test("conditional moderator routes based on agent meta", async () => { + const { store, typeHashes } = await setupStore(); + const { workflowHash } = await setupWorkflow(store, typeHashes, CONDITIONAL_WORKFLOW); + + let checkerCallCount = 0; + const agentFn: JsonCasAgentFn = async (role, _sp, _snap) => { + if (role === "checker") { + checkerCallCount++; + if (checkerCallCount === 1) { + return { text: "found issue", meta: { status: "bad" } }; + } + return { text: "all good now", meta: { status: "ok" } }; + } + return { text: "fixed it", meta: { fix: "patched" } }; + }; + + const result = await executeJsonCasThread({ + workflowHash, + input: "check and fix", + moderatorRules: CONDITIONAL_WORKFLOW.moderator, + io: makeIo(store, typeHashes, "THREAD09"), + options: makeOptions(), + agentFn, + logger: noLogger(), + workflow: null, + }); + + expect(result.returnCode).toBe(0); + expect(checkerCallCount).toBe(2); + + const endPayload = store.get(result.rootHash)!.payload as ThreadEndPayload; + const lastStep = store.get(endPayload.lastStep)!.payload as ThreadStepPayload; + expect(lastStep.role).toBe("checker"); + + const step2 = store.get(lastStep.previous!)!.payload as ThreadStepPayload; + expect(step2.role).toBe("fixer"); + + const step1 = store.get(step2.previous!)!.payload as ThreadStepPayload; + expect(step1.role).toBe("checker"); + expect(step1.previous).toBeNull(); + }); + + test("immediate END from moderator still produces a valid thread", async () => { + const { store, typeHashes } = await setupStore(); + + const immediateEnd: WorkflowInput = { + name: "test-immediate-end", + description: "Ends immediately", + roles: { + worker: { + description: "Never called", + systemPrompt: "N/A", + extractPrompt: "N/A", + schema: { type: "object" }, + }, + }, + moderator: [{ from: START, to: END, when: null }], + }; + + const { workflowHash } = await setupWorkflow(store, typeHashes, immediateEnd); + + const agentFn: JsonCasAgentFn = async () => { + throw new Error("should not be called"); + }; + + const result = await executeJsonCasThread({ + workflowHash, + input: "skip", + moderatorRules: immediateEnd.moderator, + io: makeIo(store, typeHashes, "THREAD10"), + options: makeOptions(), + agentFn, + logger: noLogger(), + workflow: null, + }); + + expect(result.returnCode).toBe(0); + + const endNode = store.get(result.rootHash); + expect(endNode).not.toBeNull(); + const endPayload = endNode!.payload as ThreadEndPayload; + expect(endPayload.start).toBeTruthy(); + expect(endPayload.lastStep).toBeTruthy(); + }); + }); + + describe("abort handling", () => { + test("aborted signal produces returnCode 130", async () => { + const { store, typeHashes } = await setupStore(); + const { workflowHash } = await setupWorkflow(store, typeHashes, SINGLE_ROLE_WORKFLOW); + + const ac = new AbortController(); + ac.abort(); + + const agentFn: JsonCasAgentFn = async () => { + throw new Error("should not be called"); + }; + + const result = await executeJsonCasThread({ + workflowHash, + input: "will abort", + moderatorRules: SINGLE_ROLE_WORKFLOW.moderator, + io: makeIo(store, typeHashes, "THREAD11"), + options: makeOptions({ signal: ac.signal }), + agentFn, + logger: noLogger(), + workflow: null, + }); + + expect(result.returnCode).toBe(130); + expect(result.summary).toContain("abort"); + + const endPayload = store.get(result.rootHash)!.payload as ThreadEndPayload; + expect(endPayload.returnCode).toBe(130); + }); + }); + + describe("agent receives correct context", () => { + test("agent receives role name, system prompt, and accumulated steps", async () => { + const { store, typeHashes } = await setupStore(); + const { workflowHash } = await setupWorkflow(store, typeHashes, SIMPLE_WORKFLOW); + + const { loadWorkflow } = await import("@uncaged/workflow-json-def"); + const hydrated = loadWorkflow(store, typeHashes, workflowHash); + + const receivedCalls: Array<{ + role: string; + systemPrompt: string; + stepCount: number; + input: string; + }> = []; + + const agentFn: JsonCasAgentFn = async (role, systemPrompt, snapshot) => { + receivedCalls.push({ + role, + systemPrompt, + stepCount: snapshot.steps.length, + input: snapshot.start.input, + }); + return { text: `output for ${role}`, meta: {} }; + }; + + await executeJsonCasThread({ + workflowHash, + input: "my prompt", + moderatorRules: SIMPLE_WORKFLOW.moderator, + io: makeIo(store, typeHashes, "THREAD12"), + options: makeOptions(), + agentFn, + logger: noLogger(), + workflow: hydrated, + }); + + expect(receivedCalls.length).toBe(2); + + expect(receivedCalls[0]!.role).toBe("planner"); + expect(receivedCalls[0]!.systemPrompt).toBe("You are a planner."); + expect(receivedCalls[0]!.stepCount).toBe(0); + expect(receivedCalls[0]!.input).toBe("my prompt"); + + expect(receivedCalls[1]!.role).toBe("coder"); + expect(receivedCalls[1]!.systemPrompt).toBe("You are a coder."); + expect(receivedCalls[1]!.stepCount).toBe(1); + }); + + test("snapshot accumulates step meta from previous rounds", async () => { + const { store, typeHashes } = await setupStore(); + const { workflowHash } = await setupWorkflow(store, typeHashes, CONDITIONAL_WORKFLOW); + + let round = 0; + const snapshots: Array<{ role: string; steps: readonly { role: string; meta: Record }[] }> = []; + + const agentFn: JsonCasAgentFn = async (role, _sp, snapshot) => { + snapshots.push({ role, steps: [...snapshot.steps] }); + round++; + if (role === "checker") { + return round === 1 + ? { text: "bad", meta: { status: "bad" } } + : { text: "ok", meta: { status: "ok" } }; + } + return { text: "fixed", meta: { fix: "yes" } }; + }; + + await executeJsonCasThread({ + workflowHash, + input: "go", + moderatorRules: CONDITIONAL_WORKFLOW.moderator, + io: makeIo(store, typeHashes, "THREAD13"), + options: makeOptions(), + agentFn, + logger: noLogger(), + workflow: null, + }); + + expect(snapshots.length).toBe(3); + + expect(snapshots[0]!.steps.length).toBe(0); + + expect(snapshots[1]!.steps.length).toBe(1); + expect(snapshots[1]!.steps[0]!.role).toBe("checker"); + expect(snapshots[1]!.steps[0]!.meta).toEqual({ status: "bad" }); + + expect(snapshots[2]!.steps.length).toBe(2); + expect(snapshots[2]!.steps[0]!.role).toBe("checker"); + expect(snapshots[2]!.steps[1]!.role).toBe("fixer"); + }); + }); +}); + +describe("buildJsonCasThreadSnapshot", () => { + test("builds snapshot from start + step chain", async () => { + const { store, typeHashes } = await setupStore(); + const { workflowHash } = await setupWorkflow(store, typeHashes, SIMPLE_WORKFLOW); + + const agentFn = createMockAgent({ + planner: { text: "plan text", meta: { plan: "alpha" } }, + coder: { text: "code text", meta: { code: "beta" } }, + }); + + const result = await executeJsonCasThread({ + workflowHash, + input: "build it", + moderatorRules: SIMPLE_WORKFLOW.moderator, + io: makeIo(store, typeHashes, "THREAD_SNAP"), + options: makeOptions(), + agentFn, + logger: noLogger(), + workflow: null, + }); + + const endPayload = store.get(result.rootHash)!.payload as ThreadEndPayload; + const startHash = endPayload.start; + const lastStepHash = endPayload.lastStep; + + const snapshot = buildJsonCasThreadSnapshot( + store, typeHashes, startHash, lastStepHash, "THREAD_SNAP", + ); + + expect(snapshot.threadId).toBe("THREAD_SNAP"); + expect(snapshot.start.input).toBe("build it"); + expect(snapshot.start.workflowHash).toBe(workflowHash); + expect(snapshot.steps.length).toBe(2); + expect(snapshot.steps[0]!.role).toBe("planner"); + expect(snapshot.steps[0]!.meta).toEqual({ plan: "alpha" }); + expect(snapshot.steps[1]!.role).toBe("coder"); + expect(snapshot.steps[1]!.meta).toEqual({ code: "beta" }); + }); + + test("builds snapshot with null headStepHash (start only)", async () => { + const { store, typeHashes } = await setupStore(); + const { workflowHash } = await setupWorkflow(store, typeHashes, SINGLE_ROLE_WORKFLOW); + + const startHash = await store.put(typeHashes.threadStart, { + workflow: workflowHash, + input: "just started", + depth: 0, + parentThread: null, + agents: {}, + }); + + const snapshot = buildJsonCasThreadSnapshot( + store, typeHashes, startHash, null, "THREAD_SNAP2", + ); + + expect(snapshot.threadId).toBe("THREAD_SNAP2"); + expect(snapshot.start.input).toBe("just started"); + expect(snapshot.steps.length).toBe(0); + }); +}); + +describe("buildJsonCasThreadContext", () => { + test("builds a protocol-compatible ThreadContext", async () => { + const { store, typeHashes } = await setupStore(); + const { workflowHash } = await setupWorkflow(store, typeHashes, SIMPLE_WORKFLOW); + + const agentFn = createMockAgent({ + planner: { text: "plan text", meta: { plan: "ctx-test" } }, + coder: { text: "code text", meta: { code: "ctx-done" } }, + }); + + const result = await executeJsonCasThread({ + workflowHash, + input: "context test", + moderatorRules: SIMPLE_WORKFLOW.moderator, + io: makeIo(store, typeHashes, "THREAD_CTX"), + options: makeOptions({ depth: 3 }), + agentFn, + logger: noLogger(), + workflow: null, + }); + + const endPayload = store.get(result.rootHash)!.payload as ThreadEndPayload; + const ctx = buildJsonCasThreadContext( + store, typeHashes, endPayload.start, endPayload.lastStep, + ); + + expect(ctx.threadId).toBe(""); + expect(ctx.depth).toBe(3); + expect(ctx.bundleHash).toBe(workflowHash); + expect(ctx.start.role).toBe("__start__"); + expect(ctx.start.content).toBe("context test"); + expect(ctx.steps.length).toBe(2); + expect(ctx.steps[0]!.role).toBe("planner"); + expect(ctx.steps[0]!.meta).toEqual({ plan: "ctx-test" }); + expect(ctx.steps[1]!.role).toBe("coder"); + expect(ctx.steps[1]!.meta).toEqual({ code: "ctx-done" }); + }); + + test("context from start-only thread has empty steps", async () => { + const { store, typeHashes } = await setupStore(); + const { workflowHash } = await setupWorkflow(store, typeHashes, SINGLE_ROLE_WORKFLOW); + + const startHash = await store.put(typeHashes.threadStart, { + workflow: workflowHash, + input: "start only", + depth: 0, + parentThread: null, + agents: {}, + }); + + const ctx = buildJsonCasThreadContext(store, typeHashes, startHash, null); + + expect(ctx.start.content).toBe("start only"); + expect(ctx.steps.length).toBe(0); + }); +}); + +describe("readContentText", () => { + test("reads text from a content node", async () => { + const { store, typeHashes } = await setupStore(); + const hash = await store.put(typeHashes.content, { text: "hello world" }); + + const text = readContentText(store, hash); + expect(text).toBe("hello world"); + }); + + test("returns null for missing hash", async () => { + const { store } = await setupStore(); + const text = readContentText(store, "NONEXISTENT0001"); + expect(text).toBeNull(); + }); +}); + +describe("CAS graph integrity", () => { + test("all nodes are reachable via walk from thread-end", async () => { + const { store, typeHashes } = await setupStore(); + const { workflowHash } = await setupWorkflow(store, typeHashes, SIMPLE_WORKFLOW); + + const agentFn = createMockAgent({ + planner: { text: "plan", meta: { plan: "x" } }, + coder: { text: "code", meta: { code: "y" } }, + }); + + const result = await executeJsonCasThread({ + workflowHash, + input: "walk test", + moderatorRules: SIMPLE_WORKFLOW.moderator, + io: makeIo(store, typeHashes, "THREAD_WALK"), + options: makeOptions(), + agentFn, + logger: noLogger(), + workflow: null, + }); + + const visited = new Set(); + walk(store, result.rootHash, (hash) => { + visited.add(hash); + }); + + expect(visited.has(result.rootHash)).toBe(true); + + const endPayload = store.get(result.rootHash)!.payload as ThreadEndPayload; + expect(visited.has(endPayload.start)).toBe(true); + expect(visited.has(endPayload.lastStep)).toBe(true); + + const step2 = store.get(endPayload.lastStep)!.payload as ThreadStepPayload; + expect(visited.has(step2.content)).toBe(true); + expect(visited.has(step2.react)).toBe(true); + expect(visited.has(step2.start)).toBe(true); + + if (step2.previous !== null) { + expect(visited.has(step2.previous)).toBe(true); + const step1 = store.get(step2.previous)!.payload as ThreadStepPayload; + expect(visited.has(step1.content)).toBe(true); + expect(visited.has(step1.react)).toBe(true); + } + }); + + test("react session nodes have placeholder structure", async () => { + const { store, typeHashes } = await setupStore(); + const { workflowHash } = await setupWorkflow(store, typeHashes, SINGLE_ROLE_WORKFLOW); + + const agentFn = createMockAgent({ + worker: { text: "w", meta: { result: "r" } }, + }); + + const result = await executeJsonCasThread({ + workflowHash, + input: "react check", + moderatorRules: SINGLE_ROLE_WORKFLOW.moderator, + io: makeIo(store, typeHashes, "THREAD_REACT"), + options: makeOptions(), + agentFn, + logger: noLogger(), + workflow: null, + }); + + const endPayload = store.get(result.rootHash)!.payload as ThreadEndPayload; + const stepPayload = store.get(endPayload.lastStep)!.payload as ThreadStepPayload; + const reactNode = store.get(stepPayload.react); + + expect(reactNode).not.toBeNull(); + const reactPayload = reactNode!.payload as Record; + expect(reactPayload.turns).toEqual([]); + expect(reactPayload.totalTokens).toBe(0); + expect(reactPayload.durationMs).toBe(0); + expect(reactPayload.role).toBe("worker"); + expect(typeof reactPayload.agent).toBe("string"); + }); +}); diff --git a/packages/workflow-execute/package.json b/packages/workflow-execute/package.json index e9e18d1..e54787e 100644 --- a/packages/workflow-execute/package.json +++ b/packages/workflow-execute/package.json @@ -24,6 +24,9 @@ "@uncaged/workflow-cas": "workspace:^", "@uncaged/workflow-reactor": "workspace:^", "@uncaged/workflow-register": "workspace:^", + "@uncaged/json-cas": "file:../../../json-cas/packages/json-cas", + "@uncaged/json-cas-workflow": "file:../../../json-cas/packages/json-cas-workflow", + "@uncaged/workflow-json-def": "workspace:^", "yaml": "^2.7.1" }, "peerDependencies": { diff --git a/packages/workflow-execute/src/engine/index.ts b/packages/workflow-execute/src/engine/index.ts index 4a64b4d..9d3c1ab 100644 --- a/packages/workflow-execute/src/engine/index.ts +++ b/packages/workflow-execute/src/engine/index.ts @@ -7,6 +7,18 @@ export { walkStateFramesNewestFirst, } from "./fork-thread.js"; export { garbageCollectCas } from "./gc.js"; +export { buildJsonCasThreadContext, buildJsonCasThreadSnapshot, readContentText } from "./json-cas-context.js"; +export { executeJsonCasThread } from "./json-cas-engine.js"; +export type { + AgentBindings, + JsonCasAgentFn, + JsonCasEngineIo, + JsonCasEngineOptions, + JsonCasStartSnapshot, + JsonCasStepSnapshot, + JsonCasThreadPauseGate, + JsonCasThreadSnapshot, +} from "./json-cas-types.js"; export { createThreadPauseGate } from "./thread-pause-gate.js"; export type { ThreadHistoryEntry, ThreadIndex, ThreadIndexEntry } from "./threads-index.js"; export { diff --git a/packages/workflow-execute/src/engine/json-cas-context.ts b/packages/workflow-execute/src/engine/json-cas-context.ts new file mode 100644 index 0000000..156c3b7 --- /dev/null +++ b/packages/workflow-execute/src/engine/json-cas-context.ts @@ -0,0 +1,130 @@ +import type { Hash, Store } from "@uncaged/json-cas"; +import type { + ContentPayload, + ThreadStartPayload, + ThreadStepPayload, + WorkflowSchemaHashes, +} from "@uncaged/json-cas-workflow"; +import type { ThreadContext } from "@uncaged/workflow-protocol"; +import { START } from "@uncaged/workflow-protocol"; + +import type { JsonCasStepSnapshot, JsonCasThreadSnapshot } from "./json-cas-types.js"; + +// ── Snapshot builder (lightweight, for agent & moderator) ───────────── + +/** + * Walk the thread-step chain backwards via `previous` refs, then reverse + * to get chronological order. Returns a {@link JsonCasThreadSnapshot}. + */ +export function buildJsonCasThreadSnapshot( + store: Store, + _typeHashes: WorkflowSchemaHashes, + startHash: Hash, + headStepHash: Hash | null, + threadId: string, +): JsonCasThreadSnapshot { + const startNode = store.get(startHash); + if (startNode === null) { + throw new Error(`buildJsonCasThreadSnapshot: missing thread-start node at ${startHash}`); + } + const startPayload = startNode.payload as ThreadStartPayload; + + const steps: JsonCasStepSnapshot[] = []; + + let cursor: Hash | null = headStepHash; + while (cursor !== null) { + const stepNode = store.get(cursor); + if (stepNode === null) { + throw new Error(`buildJsonCasThreadSnapshot: missing thread-step node at ${cursor}`); + } + const stepPayload = stepNode.payload as ThreadStepPayload; + steps.push({ + role: stepPayload.role, + meta: stepPayload.meta, + contentHash: stepPayload.content, + }); + cursor = stepPayload.previous; + } + + steps.reverse(); + + return { + threadId, + start: { + input: startPayload.input, + depth: startPayload.depth, + workflowHash: startPayload.workflow, + }, + steps, + }; +} + +// ── ThreadContext builder (protocol-compatible) ─────────────────────── + +/** + * Build a full {@link ThreadContext} from a json-cas thread chain. + * Reads the thread-start node, walks thread-step backwards, and resolves + * content text from each step's content node. + * + * `bundleHash` is set from the workflow ref in the thread-start payload. + * `threadId` is set to `""` — callers should overwrite when known. + */ +export function buildJsonCasThreadContext( + store: Store, + _typeHashes: WorkflowSchemaHashes, + startHash: Hash, + headStepHash: Hash | null, +): ThreadContext { + const startNode = store.get(startHash); + if (startNode === null) { + throw new Error(`buildJsonCasThreadContext: missing thread-start node at ${startHash}`); + } + const startPayload = startNode.payload as ThreadStartPayload; + + const rawSteps: ThreadStepPayload[] = []; + let cursor: Hash | null = headStepHash; + while (cursor !== null) { + const stepNode = store.get(cursor); + if (stepNode === null) { + throw new Error(`buildJsonCasThreadContext: missing thread-step node at ${cursor}`); + } + const payload = stepNode.payload as ThreadStepPayload; + rawSteps.push(payload); + cursor = payload.previous; + } + rawSteps.reverse(); + + const steps = rawSteps.map((sp) => ({ + role: sp.role, + meta: sp.meta, + contentHash: sp.content, + refs: [] as string[], + timestamp: 0, + })); + + return { + threadId: "", + depth: startPayload.depth, + bundleHash: startPayload.workflow, + start: { + role: START, + content: startPayload.input, + meta: {}, + timestamp: 0, + parentState: startPayload.parentThread, + }, + steps, + }; +} + +/** + * Read the text payload from a content node. + */ +export function readContentText(store: Store, contentHash: Hash): string | null { + const node = store.get(contentHash); + if (node === null) { + return null; + } + const payload = node.payload as ContentPayload; + return payload.text; +} diff --git a/packages/workflow-execute/src/engine/json-cas-engine.ts b/packages/workflow-execute/src/engine/json-cas-engine.ts new file mode 100644 index 0000000..c898a36 --- /dev/null +++ b/packages/workflow-execute/src/engine/json-cas-engine.ts @@ -0,0 +1,317 @@ +import type { Hash, Store } from "@uncaged/json-cas"; +import type { + ContentPayload, + ThreadEndPayload, + ThreadStartPayload, + ThreadStepPayload, + WorkflowSchemaHashes, +} from "@uncaged/json-cas-workflow"; +import type { HydratedWorkflow } from "@uncaged/workflow-json-def"; +import type { ModeratorRule, WorkflowResult } from "@uncaged/workflow-protocol"; +import { END, evaluateModerator, START } from "@uncaged/workflow-protocol"; +import type { LogFn } from "@uncaged/workflow-util"; + +import type { + AgentBindings, + JsonCasAgentFn, + JsonCasEngineIo, + JsonCasEngineOptions, + JsonCasStepSnapshot, + JsonCasThreadSnapshot, +} from "./json-cas-types.js"; + +// ── Helpers: CAS node writers ───────────────────────────────────────── + +async function writeContent( + store: Store, + typeHashes: WorkflowSchemaHashes, + text: string, +): Promise { + const payload: ContentPayload = { text }; + return store.put(typeHashes.content, payload); +} + +async function writePlaceholderReactSession( + store: Store, + typeHashes: WorkflowSchemaHashes, + role: string, + agentHash: Hash, +): Promise { + return store.put(typeHashes.reactSession, { + agent: agentHash, + role, + turns: [], + totalTokens: 0, + durationMs: 0, + }); +} + +async function writeThreadStart( + store: Store, + typeHashes: WorkflowSchemaHashes, + params: { + workflowHash: Hash; + input: string; + depth: number; + parentThread: Hash | null; + agents: AgentBindings; + }, +): Promise { + const payload: ThreadStartPayload = { + workflow: params.workflowHash, + input: params.input, + depth: params.depth, + parentThread: params.parentThread, + agents: params.agents, + }; + return store.put(typeHashes.threadStart, payload); +} + +async function writeThreadStep( + store: Store, + typeHashes: WorkflowSchemaHashes, + params: { + role: string; + meta: Record; + contentHash: Hash; + reactHash: Hash; + startHash: Hash; + previousHash: Hash | null; + }, +): Promise { + const payload: ThreadStepPayload = { + role: params.role, + meta: params.meta, + content: params.contentHash, + react: params.reactHash, + start: params.startHash, + previous: params.previousHash, + }; + return store.put(typeHashes.threadStep, payload); +} + +async function writeThreadEnd( + store: Store, + typeHashes: WorkflowSchemaHashes, + params: { + returnCode: number; + summary: string; + startHash: Hash; + lastStepHash: Hash; + }, +): Promise { + const payload: ThreadEndPayload = { + returnCode: params.returnCode, + summary: params.summary, + start: params.startHash, + lastStep: params.lastStepHash, + }; + return store.put(typeHashes.threadEnd, payload); +} + +// ── Placeholder agent ───────────────────────────────────────────────── + +async function ensurePlaceholderAgent( + store: Store, + typeHashes: WorkflowSchemaHashes, +): Promise { + return store.put(typeHashes.agent, { + package: "placeholder", + version: "0.0.0", + config: {}, + }); +} + +// ── JSONata moderator adapter ───────────────────────────────────────── + +function snapshotToModeratorContext( + snapshot: JsonCasThreadSnapshot, +): Parameters[1] { + return { + threadId: snapshot.threadId, + depth: snapshot.start.depth, + bundleHash: snapshot.start.workflowHash, + start: { + role: START, + content: snapshot.start.input, + meta: {}, + timestamp: 0, + parentState: null, + }, + steps: snapshot.steps.map((s) => ({ + role: s.role, + meta: s.meta, + contentHash: s.contentHash, + refs: [], + timestamp: 0, + })), + }; +} + +// ── Main engine ─────────────────────────────────────────────────────── + +/** + * Execute a workflow thread using json-cas as the storage layer. + * + * Drives the moderator→agent loop: + * 1. Writes a thread-start node. + * 2. On each round: evaluates the moderator, invokes the agent, writes + * content + thread-step nodes (react is a placeholder for now). + * 3. On END: writes a thread-end node and returns the result. + * + * The `agentFn` callback is invoked for each role step. It receives the + * role name, system prompt, and current thread snapshot, and returns the + * agent's text output plus structured meta. + */ +export async function executeJsonCasThread(params: { + workflowHash: Hash; + input: string; + moderatorRules: readonly ModeratorRule[]; + io: JsonCasEngineIo; + options: JsonCasEngineOptions; + agentFn: JsonCasAgentFn; + logger: LogFn; + /** Hydrated workflow for role system prompts. Null disables prompt forwarding. */ + workflow: HydratedWorkflow | null; +}): Promise { + const { io, options, agentFn, logger, moderatorRules, workflow } = params; + const { store, typeHashes, threadId } = io; + + const placeholderAgentHash = await ensurePlaceholderAgent(store, typeHashes); + + const startHash = await writeThreadStart(store, typeHashes, { + workflowHash: params.workflowHash, + input: params.input, + depth: options.depth, + parentThread: options.parentThread, + agents: options.agents, + }); + + logger("X3RK7QWN", `json-cas thread ${threadId} started`); + + let previousStepHash: Hash | null = null; + let headStepHash: Hash | null = null; + const stepSnapshots: JsonCasStepSnapshot[] = []; + + while (true) { + if (options.signal.aborted) { + return abortThread(store, typeHashes, startHash, headStepHash, logger, threadId); + } + + const snapshot: JsonCasThreadSnapshot = { + threadId, + start: { + input: params.input, + depth: options.depth, + workflowHash: params.workflowHash, + }, + steps: stepSnapshots, + }; + + const modCtx = snapshotToModeratorContext(snapshot); + const nextRole = await evaluateModerator(moderatorRules, modCtx); + + if (nextRole === END) { + logger("Y5TN8RVK", `json-cas thread ${threadId} moderator returned END`); + + if (headStepHash === null) { + const dummyContentHash = await writeContent(store, typeHashes, "no-op"); + const dummyReactHash = await writePlaceholderReactSession( + store, + typeHashes, + END, + placeholderAgentHash, + ); + headStepHash = await writeThreadStep(store, typeHashes, { + role: END, + meta: {}, + contentHash: dummyContentHash, + reactHash: dummyReactHash, + startHash, + previousHash: null, + }); + } + + const endHash = await writeThreadEnd(store, typeHashes, { + returnCode: 0, + summary: "completed: moderator returned END", + startHash, + lastStepHash: headStepHash, + }); + + return { returnCode: 0, summary: "completed: moderator returned END", rootHash: endHash }; + } + + const roleSystemPrompt = + workflow !== null && workflow.roles[nextRole] !== undefined + ? workflow.roles[nextRole].systemPrompt + : ""; + + const agentResult = await agentFn(nextRole, roleSystemPrompt, snapshot); + + const contentHash = await writeContent(store, typeHashes, agentResult.text); + + const agentHash = options.agents[nextRole] ?? placeholderAgentHash; + const reactHash = await writePlaceholderReactSession(store, typeHashes, nextRole, agentHash); + + const stepHash = await writeThreadStep(store, typeHashes, { + role: nextRole, + meta: agentResult.meta, + contentHash, + reactHash, + startHash, + previousHash: previousStepHash, + }); + + previousStepHash = stepHash; + headStepHash = stepHash; + stepSnapshots.push({ + role: nextRole, + meta: agentResult.meta, + contentHash, + }); + + logger("Z7WP4NHK", `json-cas thread ${threadId} wrote role ${nextRole}`); + } +} + +async function abortThread( + store: Store, + typeHashes: WorkflowSchemaHashes, + startHash: Hash, + headStepHash: Hash | null, + logger: LogFn, + threadId: string, +): Promise { + logger("A8QK3VNR", `json-cas thread ${threadId} aborted`); + + const placeholderAgentHash = await ensurePlaceholderAgent(store, typeHashes); + + let lastStep = headStepHash; + if (lastStep === null) { + const dummyContentHash = await writeContent(store, typeHashes, "thread aborted"); + const dummyReactHash = await writePlaceholderReactSession( + store, + typeHashes, + END, + placeholderAgentHash, + ); + lastStep = await writeThreadStep(store, typeHashes, { + role: END, + meta: {}, + contentHash: dummyContentHash, + reactHash: dummyReactHash, + startHash, + previousHash: null, + }); + } + + const endHash = await writeThreadEnd(store, typeHashes, { + returnCode: 130, + summary: "thread aborted", + startHash, + lastStepHash: lastStep, + }); + + return { returnCode: 130, summary: "thread aborted", rootHash: endHash }; +} diff --git a/packages/workflow-execute/src/engine/json-cas-types.ts b/packages/workflow-execute/src/engine/json-cas-types.ts new file mode 100644 index 0000000..20706af --- /dev/null +++ b/packages/workflow-execute/src/engine/json-cas-types.ts @@ -0,0 +1,71 @@ +import type { Hash, Store } from "@uncaged/json-cas"; +import type { WorkflowSchemaHashes } from "@uncaged/json-cas-workflow"; + +import type { Result } from "@uncaged/workflow-util"; + +// ── Engine IO ───────────────────────────────────────────────────────── + +export type JsonCasEngineIo = { + threadId: string; + store: Store; + typeHashes: WorkflowSchemaHashes; +}; + +// ── Agent binding ───────────────────────────────────────────────────── + +/** + * Maps each role name to a CAS hash referencing an agent node. + * Phase 4 uses a simple role→hash mapping; full agent resolution comes later. + */ +export type AgentBindings = Record; + +// ── Engine options ──────────────────────────────────────────────────── + +export type JsonCasEngineOptions = { + depth: number; + parentThread: Hash | null; + signal: AbortSignal; + agents: AgentBindings; +}; + +// ── Agent function (mock-friendly) ──────────────────────────────────── + +/** + * Invoked for each role step. Returns the agent's raw text output and + * structured meta. The engine stores the text in a content node and the + * meta inside the thread-step node. + */ +export type JsonCasAgentFn = ( + role: string, + systemPrompt: string, + context: JsonCasThreadSnapshot, +) => Promise<{ text: string; meta: Record }>; + +// ── Thread snapshot (read-only view for agents & moderator) ─────────── + +export type JsonCasStartSnapshot = { + input: string; + depth: number; + workflowHash: Hash; +}; + +export type JsonCasStepSnapshot = { + role: string; + meta: Record; + contentHash: Hash; +}; + +export type JsonCasThreadSnapshot = { + threadId: string; + start: JsonCasStartSnapshot; + steps: readonly JsonCasStepSnapshot[]; +}; + +// ── Thread pause gate (re-use from existing types) ──────────────────── + +export type JsonCasThreadPauseGate = { + awaitAfterYield: () => Promise; + pause: () => Result; + resume: () => Result; + isPaused: () => boolean; +}; diff --git a/packages/workflow-execute/src/index.ts b/packages/workflow-execute/src/index.ts index 92b477f..dcdaae5 100644 --- a/packages/workflow-execute/src/index.ts +++ b/packages/workflow-execute/src/index.ts @@ -4,6 +4,18 @@ export { walkStateFramesNewestFirst, } from "./engine/fork-thread.js"; export { garbageCollectCas } from "./engine/gc.js"; +export { buildJsonCasThreadContext, buildJsonCasThreadSnapshot, readContentText } from "./engine/json-cas-context.js"; +export { executeJsonCasThread } from "./engine/json-cas-engine.js"; +export type { + AgentBindings, + JsonCasAgentFn, + JsonCasEngineIo, + JsonCasEngineOptions, + JsonCasStartSnapshot, + JsonCasStepSnapshot, + JsonCasThreadPauseGate, + JsonCasThreadSnapshot, +} from "./engine/json-cas-types.js"; export type { ThreadHistoryEntry, ThreadIndex, diff --git a/packages/workflow-execute/tsconfig.json b/packages/workflow-execute/tsconfig.json index 2b204f7..0b8dc57 100644 --- a/packages/workflow-execute/tsconfig.json +++ b/packages/workflow-execute/tsconfig.json @@ -11,6 +11,7 @@ { "path": "../workflow-util" }, { "path": "../workflow-cas" }, { "path": "../workflow-reactor" }, - { "path": "../workflow-register" } + { "path": "../workflow-register" }, + { "path": "../workflow-json-def" } ] }