Compare commits

..

2 Commits

Author SHA1 Message Date
xiaoju e01c08dacb feat: Phase 4 — json-cas engine (new engine alongside old)
- json-cas-engine.ts: new engine using json-cas Store + typed nodes
- json-cas-context.ts: build ThreadContext from thread-step chain
- json-cas-types.ts: engine types (JsonCasEngineIo, JsonCasAgentFn, etc.)
- thread-start/step/end/content nodes in json-cas format
- JSONata moderator via evaluateModerator
- react placeholder (Phase 5 will fill in)
- 21 tests passing, biome clean

Closes #299
小橘 <xiaoju@shazhou.work>
2026-05-18 02:37:05 +00:00
xiaoju f9d3d38008 Merge pull request 'feat: Phase 3 — workflow JSON definitions in CAS' (#298) from feat/294-phase3-workflow-json into main 2026-05-18 02:27:55 +00:00
8 changed files with 1416 additions and 1 deletions
@@ -0,0 +1,869 @@
import { describe, expect, test } from "bun:test";
import { createMemoryStore, type Store, walk } from "@uncaged/json-cas";
import {
registerWorkflowSchemas,
type WorkflowSchemaHashes,
type ContentPayload,
type ThreadEndPayload,
type ThreadStartPayload,
type ThreadStepPayload,
} from "@uncaged/json-cas-workflow";
import { registerWorkflow, type WorkflowInput } from "@uncaged/workflow-json-def";
import {
buildJsonCasThreadContext,
buildJsonCasThreadSnapshot,
readContentText,
} from "../src/engine/json-cas-context.js";
import { executeJsonCasThread } from "../src/engine/json-cas-engine.js";
import type {
JsonCasAgentFn,
JsonCasEngineIo,
JsonCasEngineOptions,
} from "../src/engine/json-cas-types.js";
// ── Test fixtures ─────────────────────────────────────────────────────
const START = "__start__";
const END = "__end__";
const SIMPLE_WORKFLOW: WorkflowInput = {
name: "test-simple",
description: "A simple two-role workflow for testing",
roles: {
planner: {
description: "Plans the work",
systemPrompt: "You are a planner.",
extractPrompt: "Extract planner output.",
schema: {
type: "object",
required: ["plan"],
properties: { plan: { type: "string" } },
},
},
coder: {
description: "Implements the plan",
systemPrompt: "You are a coder.",
extractPrompt: "Extract coder output.",
schema: {
type: "object",
required: ["code"],
properties: { code: { type: "string" } },
},
},
},
moderator: [
{ from: START, to: "planner", when: null },
{ from: "planner", to: "coder", when: null },
{ from: "coder", to: END, when: null },
],
};
const SINGLE_ROLE_WORKFLOW: WorkflowInput = {
name: "test-single",
description: "A single-role workflow",
roles: {
worker: {
description: "Does all the work",
systemPrompt: "You are a worker.",
extractPrompt: "Extract worker output.",
schema: {
type: "object",
required: ["result"],
properties: { result: { type: "string" } },
},
},
},
moderator: [
{ from: START, to: "worker", when: null },
{ from: "worker", to: END, when: null },
],
};
const CONDITIONAL_WORKFLOW: WorkflowInput = {
name: "test-conditional",
description: "A workflow with JSONata conditions",
roles: {
checker: {
description: "Checks the input",
systemPrompt: "You are a checker.",
extractPrompt: "Extract checker output.",
schema: {
type: "object",
required: ["status"],
properties: { status: { type: "string" } },
},
},
fixer: {
description: "Fixes issues",
systemPrompt: "You are a fixer.",
extractPrompt: "Extract fixer output.",
schema: {
type: "object",
required: ["fix"],
properties: { fix: { type: "string" } },
},
},
},
moderator: [
{ from: START, to: "checker", when: null },
{ from: "checker", to: END, when: "steps[-1].meta.status = 'ok'" },
{ from: "checker", to: "fixer", when: null },
{ from: "fixer", to: "checker", when: null },
],
};
function noLogger(): (tag: string, content: string) => void {
return () => {};
}
async function setupStore(): Promise<{
store: Store;
typeHashes: WorkflowSchemaHashes;
}> {
const store = createMemoryStore();
const typeHashes = await registerWorkflowSchemas(store);
return { store, typeHashes };
}
async function setupWorkflow(
store: Store,
typeHashes: WorkflowSchemaHashes,
workflowDef: WorkflowInput,
) {
const workflowHash = await registerWorkflow(store, typeHashes, workflowDef);
return { workflowHash };
}
function makeOptions(overrides: Partial<JsonCasEngineOptions> = {}): JsonCasEngineOptions {
return {
depth: 0,
parentThread: null,
signal: new AbortController().signal,
agents: {},
...overrides,
};
}
function makeIo(
store: Store,
typeHashes: WorkflowSchemaHashes,
threadId: string,
): JsonCasEngineIo {
return { threadId, store, typeHashes };
}
/**
* A mock agent that returns a canned text and meta for each role.
*/
function createMockAgent(
responses: Record<string, { text: string; meta: Record<string, unknown> }>,
): JsonCasAgentFn {
return async (role, _systemPrompt, _snapshot) => {
const resp = responses[role];
if (resp === undefined) {
throw new Error(`mock agent: no response configured for role "${role}"`);
}
return resp;
};
}
// ── Tests ─────────────────────────────────────────────────────────────
describe("executeJsonCasThread", () => {
describe("thread lifecycle", () => {
test("simple two-role workflow creates start, two steps, and end nodes", async () => {
const { store, typeHashes } = await setupStore();
const { workflowHash } = await setupWorkflow(store, typeHashes, SIMPLE_WORKFLOW);
const agentFn = createMockAgent({
planner: { text: "I will plan", meta: { plan: "phase-1" } },
coder: { text: "I wrote code", meta: { code: "done" } },
});
const result = await executeJsonCasThread({
workflowHash,
input: "Build a widget",
moderatorRules: SIMPLE_WORKFLOW.moderator,
io: makeIo(store, typeHashes, "THREAD01"),
options: makeOptions(),
agentFn,
logger: noLogger(),
workflow: null,
});
expect(result.returnCode).toBe(0);
expect(result.summary).toContain("END");
expect(result.rootHash).toBeTruthy();
const endNode = store.get(result.rootHash);
expect(endNode).not.toBeNull();
const endPayload = endNode!.payload as ThreadEndPayload;
expect(endPayload.returnCode).toBe(0);
expect(endPayload.start).toBeTruthy();
expect(endPayload.lastStep).toBeTruthy();
});
test("single-role workflow creates correct chain", async () => {
const { store, typeHashes } = await setupStore();
const { workflowHash } = await setupWorkflow(store, typeHashes, SINGLE_ROLE_WORKFLOW);
const agentFn = createMockAgent({
worker: { text: "work done", meta: { result: "success" } },
});
const result = await executeJsonCasThread({
workflowHash,
input: "Do the thing",
moderatorRules: SINGLE_ROLE_WORKFLOW.moderator,
io: makeIo(store, typeHashes, "THREAD02"),
options: makeOptions(),
agentFn,
logger: noLogger(),
workflow: null,
});
expect(result.returnCode).toBe(0);
const endNode = store.get(result.rootHash);
expect(endNode).not.toBeNull();
const endPayload = endNode!.payload as ThreadEndPayload;
const lastStepNode = store.get(endPayload.lastStep);
expect(lastStepNode).not.toBeNull();
const lastStepPayload = lastStepNode!.payload as ThreadStepPayload;
expect(lastStepPayload.role).toBe("worker");
expect(lastStepPayload.previous).toBeNull();
const startNode = store.get(endPayload.start);
expect(startNode).not.toBeNull();
const startPayload = startNode!.payload as ThreadStartPayload;
expect(startPayload.input).toBe("Do the thing");
expect(startPayload.depth).toBe(0);
});
});
describe("CAS node structure", () => {
test("thread-start contains workflow ref, input, depth, agents", async () => {
const { store, typeHashes } = await setupStore();
const { workflowHash } = await setupWorkflow(store, typeHashes, SINGLE_ROLE_WORKFLOW);
const agentFn = createMockAgent({
worker: { text: "ok", meta: { result: "ok" } },
});
const agentHash = await store.put(typeHashes.agent, {
package: "test-agent",
version: "1.0.0",
config: {},
});
const result = await executeJsonCasThread({
workflowHash,
input: "Test input",
moderatorRules: SINGLE_ROLE_WORKFLOW.moderator,
io: makeIo(store, typeHashes, "THREAD03"),
options: makeOptions({ agents: { worker: agentHash }, depth: 2 }),
agentFn,
logger: noLogger(),
workflow: null,
});
const endPayload = store.get(result.rootHash)!.payload as ThreadEndPayload;
const startPayload = store.get(endPayload.start)!.payload as ThreadStartPayload;
expect(startPayload.workflow).toBe(workflowHash);
expect(startPayload.input).toBe("Test input");
expect(startPayload.depth).toBe(2);
expect(startPayload.parentThread).toBeNull();
expect(startPayload.agents).toEqual({ worker: agentHash });
});
test("thread-start records parentThread when provided", async () => {
const { store, typeHashes } = await setupStore();
const { workflowHash } = await setupWorkflow(store, typeHashes, SINGLE_ROLE_WORKFLOW);
const agentFn = createMockAgent({
worker: { text: "nested", meta: { result: "nested" } },
});
const fakeParent = "FAKEPARENT0001";
const result = await executeJsonCasThread({
workflowHash,
input: "nested task",
moderatorRules: SINGLE_ROLE_WORKFLOW.moderator,
io: makeIo(store, typeHashes, "THREAD04"),
options: makeOptions({ parentThread: fakeParent, depth: 1 }),
agentFn,
logger: noLogger(),
workflow: null,
});
const endPayload = store.get(result.rootHash)!.payload as ThreadEndPayload;
const startPayload = store.get(endPayload.start)!.payload as ThreadStartPayload;
expect(startPayload.parentThread).toBe(fakeParent);
expect(startPayload.depth).toBe(1);
});
test("each thread-step has content, react, start, and previous refs", async () => {
const { store, typeHashes } = await setupStore();
const { workflowHash } = await setupWorkflow(store, typeHashes, SIMPLE_WORKFLOW);
const agentFn = createMockAgent({
planner: { text: "plan text", meta: { plan: "p1" } },
coder: { text: "code text", meta: { code: "c1" } },
});
const result = await executeJsonCasThread({
workflowHash,
input: "go",
moderatorRules: SIMPLE_WORKFLOW.moderator,
io: makeIo(store, typeHashes, "THREAD05"),
options: makeOptions(),
agentFn,
logger: noLogger(),
workflow: null,
});
const endPayload = store.get(result.rootHash)!.payload as ThreadEndPayload;
const startHash = endPayload.start;
const step2 = store.get(endPayload.lastStep)!.payload as ThreadStepPayload;
expect(step2.role).toBe("coder");
expect(step2.start).toBe(startHash);
expect(step2.previous).not.toBeNull();
const contentNode2 = store.get(step2.content);
expect(contentNode2).not.toBeNull();
expect((contentNode2!.payload as ContentPayload).text).toBe("code text");
const reactNode2 = store.get(step2.react);
expect(reactNode2).not.toBeNull();
const step1 = store.get(step2.previous!)!.payload as ThreadStepPayload;
expect(step1.role).toBe("planner");
expect(step1.start).toBe(startHash);
expect(step1.previous).toBeNull();
const contentNode1 = store.get(step1.content);
expect(contentNode1).not.toBeNull();
expect((contentNode1!.payload as ContentPayload).text).toBe("plan text");
});
test("thread-end references start and last step", async () => {
const { store, typeHashes } = await setupStore();
const { workflowHash } = await setupWorkflow(store, typeHashes, SIMPLE_WORKFLOW);
const agentFn = createMockAgent({
planner: { text: "plan", meta: { plan: "x" } },
coder: { text: "code", meta: { code: "x" } },
});
const result = await executeJsonCasThread({
workflowHash,
input: "test",
moderatorRules: SIMPLE_WORKFLOW.moderator,
io: makeIo(store, typeHashes, "THREAD06"),
options: makeOptions(),
agentFn,
logger: noLogger(),
workflow: null,
});
const endPayload = store.get(result.rootHash)!.payload as ThreadEndPayload;
expect(endPayload.returnCode).toBe(0);
expect(endPayload.summary).toBeTruthy();
const startNode = store.get(endPayload.start);
expect(startNode).not.toBeNull();
expect((startNode!.payload as ThreadStartPayload).workflow).toBe(workflowHash);
const lastStepNode = store.get(endPayload.lastStep);
expect(lastStepNode).not.toBeNull();
expect((lastStepNode!.payload as ThreadStepPayload).role).toBe("coder");
});
test("content nodes store the agent text verbatim", async () => {
const { store, typeHashes } = await setupStore();
const { workflowHash } = await setupWorkflow(store, typeHashes, SINGLE_ROLE_WORKFLOW);
const longText = "This is a longer text with\nnewlines\nand special chars: <>&\"'";
const agentFn = createMockAgent({
worker: { text: longText, meta: { result: "done" } },
});
const result = await executeJsonCasThread({
workflowHash,
input: "process this",
moderatorRules: SINGLE_ROLE_WORKFLOW.moderator,
io: makeIo(store, typeHashes, "THREAD07"),
options: makeOptions(),
agentFn,
logger: noLogger(),
workflow: null,
});
const endPayload = store.get(result.rootHash)!.payload as ThreadEndPayload;
const stepPayload = store.get(endPayload.lastStep)!.payload as ThreadStepPayload;
const contentPayload = store.get(stepPayload.content)!.payload as ContentPayload;
expect(contentPayload.text).toBe(longText);
});
test("meta is stored in thread-step payload", async () => {
const { store, typeHashes } = await setupStore();
const { workflowHash } = await setupWorkflow(store, typeHashes, SIMPLE_WORKFLOW);
const complexMeta = {
plan: "phase-1",
phases: [{ hash: "abc", title: "first" }],
nested: { deep: true },
};
const agentFn = createMockAgent({
planner: { text: "plan", meta: complexMeta },
coder: { text: "code", meta: { code: "done" } },
});
const result = await executeJsonCasThread({
workflowHash,
input: "go",
moderatorRules: SIMPLE_WORKFLOW.moderator,
io: makeIo(store, typeHashes, "THREAD08"),
options: makeOptions(),
agentFn,
logger: noLogger(),
workflow: null,
});
const endPayload = store.get(result.rootHash)!.payload as ThreadEndPayload;
const step2 = store.get(endPayload.lastStep)!.payload as ThreadStepPayload;
const step1 = store.get(step2.previous!)!.payload as ThreadStepPayload;
expect(step1.meta).toEqual(complexMeta);
expect(step2.meta).toEqual({ code: "done" });
});
});
describe("moderator routing", () => {
test("conditional moderator routes based on agent meta", async () => {
const { store, typeHashes } = await setupStore();
const { workflowHash } = await setupWorkflow(store, typeHashes, CONDITIONAL_WORKFLOW);
let checkerCallCount = 0;
const agentFn: JsonCasAgentFn = async (role, _sp, _snap) => {
if (role === "checker") {
checkerCallCount++;
if (checkerCallCount === 1) {
return { text: "found issue", meta: { status: "bad" } };
}
return { text: "all good now", meta: { status: "ok" } };
}
return { text: "fixed it", meta: { fix: "patched" } };
};
const result = await executeJsonCasThread({
workflowHash,
input: "check and fix",
moderatorRules: CONDITIONAL_WORKFLOW.moderator,
io: makeIo(store, typeHashes, "THREAD09"),
options: makeOptions(),
agentFn,
logger: noLogger(),
workflow: null,
});
expect(result.returnCode).toBe(0);
expect(checkerCallCount).toBe(2);
const endPayload = store.get(result.rootHash)!.payload as ThreadEndPayload;
const lastStep = store.get(endPayload.lastStep)!.payload as ThreadStepPayload;
expect(lastStep.role).toBe("checker");
const step2 = store.get(lastStep.previous!)!.payload as ThreadStepPayload;
expect(step2.role).toBe("fixer");
const step1 = store.get(step2.previous!)!.payload as ThreadStepPayload;
expect(step1.role).toBe("checker");
expect(step1.previous).toBeNull();
});
test("immediate END from moderator still produces a valid thread", async () => {
const { store, typeHashes } = await setupStore();
const immediateEnd: WorkflowInput = {
name: "test-immediate-end",
description: "Ends immediately",
roles: {
worker: {
description: "Never called",
systemPrompt: "N/A",
extractPrompt: "N/A",
schema: { type: "object" },
},
},
moderator: [{ from: START, to: END, when: null }],
};
const { workflowHash } = await setupWorkflow(store, typeHashes, immediateEnd);
const agentFn: JsonCasAgentFn = async () => {
throw new Error("should not be called");
};
const result = await executeJsonCasThread({
workflowHash,
input: "skip",
moderatorRules: immediateEnd.moderator,
io: makeIo(store, typeHashes, "THREAD10"),
options: makeOptions(),
agentFn,
logger: noLogger(),
workflow: null,
});
expect(result.returnCode).toBe(0);
const endNode = store.get(result.rootHash);
expect(endNode).not.toBeNull();
const endPayload = endNode!.payload as ThreadEndPayload;
expect(endPayload.start).toBeTruthy();
expect(endPayload.lastStep).toBeTruthy();
});
});
describe("abort handling", () => {
test("aborted signal produces returnCode 130", async () => {
const { store, typeHashes } = await setupStore();
const { workflowHash } = await setupWorkflow(store, typeHashes, SINGLE_ROLE_WORKFLOW);
const ac = new AbortController();
ac.abort();
const agentFn: JsonCasAgentFn = async () => {
throw new Error("should not be called");
};
const result = await executeJsonCasThread({
workflowHash,
input: "will abort",
moderatorRules: SINGLE_ROLE_WORKFLOW.moderator,
io: makeIo(store, typeHashes, "THREAD11"),
options: makeOptions({ signal: ac.signal }),
agentFn,
logger: noLogger(),
workflow: null,
});
expect(result.returnCode).toBe(130);
expect(result.summary).toContain("abort");
const endPayload = store.get(result.rootHash)!.payload as ThreadEndPayload;
expect(endPayload.returnCode).toBe(130);
});
});
describe("agent receives correct context", () => {
test("agent receives role name, system prompt, and accumulated steps", async () => {
const { store, typeHashes } = await setupStore();
const { workflowHash } = await setupWorkflow(store, typeHashes, SIMPLE_WORKFLOW);
const { loadWorkflow } = await import("@uncaged/workflow-json-def");
const hydrated = loadWorkflow(store, typeHashes, workflowHash);
const receivedCalls: Array<{
role: string;
systemPrompt: string;
stepCount: number;
input: string;
}> = [];
const agentFn: JsonCasAgentFn = async (role, systemPrompt, snapshot) => {
receivedCalls.push({
role,
systemPrompt,
stepCount: snapshot.steps.length,
input: snapshot.start.input,
});
return { text: `output for ${role}`, meta: {} };
};
await executeJsonCasThread({
workflowHash,
input: "my prompt",
moderatorRules: SIMPLE_WORKFLOW.moderator,
io: makeIo(store, typeHashes, "THREAD12"),
options: makeOptions(),
agentFn,
logger: noLogger(),
workflow: hydrated,
});
expect(receivedCalls.length).toBe(2);
expect(receivedCalls[0]!.role).toBe("planner");
expect(receivedCalls[0]!.systemPrompt).toBe("You are a planner.");
expect(receivedCalls[0]!.stepCount).toBe(0);
expect(receivedCalls[0]!.input).toBe("my prompt");
expect(receivedCalls[1]!.role).toBe("coder");
expect(receivedCalls[1]!.systemPrompt).toBe("You are a coder.");
expect(receivedCalls[1]!.stepCount).toBe(1);
});
test("snapshot accumulates step meta from previous rounds", async () => {
const { store, typeHashes } = await setupStore();
const { workflowHash } = await setupWorkflow(store, typeHashes, CONDITIONAL_WORKFLOW);
let round = 0;
const snapshots: Array<{ role: string; steps: readonly { role: string; meta: Record<string, unknown> }[] }> = [];
const agentFn: JsonCasAgentFn = async (role, _sp, snapshot) => {
snapshots.push({ role, steps: [...snapshot.steps] });
round++;
if (role === "checker") {
return round === 1
? { text: "bad", meta: { status: "bad" } }
: { text: "ok", meta: { status: "ok" } };
}
return { text: "fixed", meta: { fix: "yes" } };
};
await executeJsonCasThread({
workflowHash,
input: "go",
moderatorRules: CONDITIONAL_WORKFLOW.moderator,
io: makeIo(store, typeHashes, "THREAD13"),
options: makeOptions(),
agentFn,
logger: noLogger(),
workflow: null,
});
expect(snapshots.length).toBe(3);
expect(snapshots[0]!.steps.length).toBe(0);
expect(snapshots[1]!.steps.length).toBe(1);
expect(snapshots[1]!.steps[0]!.role).toBe("checker");
expect(snapshots[1]!.steps[0]!.meta).toEqual({ status: "bad" });
expect(snapshots[2]!.steps.length).toBe(2);
expect(snapshots[2]!.steps[0]!.role).toBe("checker");
expect(snapshots[2]!.steps[1]!.role).toBe("fixer");
});
});
});
describe("buildJsonCasThreadSnapshot", () => {
test("builds snapshot from start + step chain", async () => {
const { store, typeHashes } = await setupStore();
const { workflowHash } = await setupWorkflow(store, typeHashes, SIMPLE_WORKFLOW);
const agentFn = createMockAgent({
planner: { text: "plan text", meta: { plan: "alpha" } },
coder: { text: "code text", meta: { code: "beta" } },
});
const result = await executeJsonCasThread({
workflowHash,
input: "build it",
moderatorRules: SIMPLE_WORKFLOW.moderator,
io: makeIo(store, typeHashes, "THREAD_SNAP"),
options: makeOptions(),
agentFn,
logger: noLogger(),
workflow: null,
});
const endPayload = store.get(result.rootHash)!.payload as ThreadEndPayload;
const startHash = endPayload.start;
const lastStepHash = endPayload.lastStep;
const snapshot = buildJsonCasThreadSnapshot(
store, typeHashes, startHash, lastStepHash, "THREAD_SNAP",
);
expect(snapshot.threadId).toBe("THREAD_SNAP");
expect(snapshot.start.input).toBe("build it");
expect(snapshot.start.workflowHash).toBe(workflowHash);
expect(snapshot.steps.length).toBe(2);
expect(snapshot.steps[0]!.role).toBe("planner");
expect(snapshot.steps[0]!.meta).toEqual({ plan: "alpha" });
expect(snapshot.steps[1]!.role).toBe("coder");
expect(snapshot.steps[1]!.meta).toEqual({ code: "beta" });
});
test("builds snapshot with null headStepHash (start only)", async () => {
const { store, typeHashes } = await setupStore();
const { workflowHash } = await setupWorkflow(store, typeHashes, SINGLE_ROLE_WORKFLOW);
const startHash = await store.put(typeHashes.threadStart, {
workflow: workflowHash,
input: "just started",
depth: 0,
parentThread: null,
agents: {},
});
const snapshot = buildJsonCasThreadSnapshot(
store, typeHashes, startHash, null, "THREAD_SNAP2",
);
expect(snapshot.threadId).toBe("THREAD_SNAP2");
expect(snapshot.start.input).toBe("just started");
expect(snapshot.steps.length).toBe(0);
});
});
describe("buildJsonCasThreadContext", () => {
test("builds a protocol-compatible ThreadContext", async () => {
const { store, typeHashes } = await setupStore();
const { workflowHash } = await setupWorkflow(store, typeHashes, SIMPLE_WORKFLOW);
const agentFn = createMockAgent({
planner: { text: "plan text", meta: { plan: "ctx-test" } },
coder: { text: "code text", meta: { code: "ctx-done" } },
});
const result = await executeJsonCasThread({
workflowHash,
input: "context test",
moderatorRules: SIMPLE_WORKFLOW.moderator,
io: makeIo(store, typeHashes, "THREAD_CTX"),
options: makeOptions({ depth: 3 }),
agentFn,
logger: noLogger(),
workflow: null,
});
const endPayload = store.get(result.rootHash)!.payload as ThreadEndPayload;
const ctx = buildJsonCasThreadContext(
store, typeHashes, endPayload.start, endPayload.lastStep,
);
expect(ctx.threadId).toBe("");
expect(ctx.depth).toBe(3);
expect(ctx.bundleHash).toBe(workflowHash);
expect(ctx.start.role).toBe("__start__");
expect(ctx.start.content).toBe("context test");
expect(ctx.steps.length).toBe(2);
expect(ctx.steps[0]!.role).toBe("planner");
expect(ctx.steps[0]!.meta).toEqual({ plan: "ctx-test" });
expect(ctx.steps[1]!.role).toBe("coder");
expect(ctx.steps[1]!.meta).toEqual({ code: "ctx-done" });
});
test("context from start-only thread has empty steps", async () => {
const { store, typeHashes } = await setupStore();
const { workflowHash } = await setupWorkflow(store, typeHashes, SINGLE_ROLE_WORKFLOW);
const startHash = await store.put(typeHashes.threadStart, {
workflow: workflowHash,
input: "start only",
depth: 0,
parentThread: null,
agents: {},
});
const ctx = buildJsonCasThreadContext(store, typeHashes, startHash, null);
expect(ctx.start.content).toBe("start only");
expect(ctx.steps.length).toBe(0);
});
});
describe("readContentText", () => {
test("reads text from a content node", async () => {
const { store, typeHashes } = await setupStore();
const hash = await store.put(typeHashes.content, { text: "hello world" });
const text = readContentText(store, hash);
expect(text).toBe("hello world");
});
test("returns null for missing hash", async () => {
const { store } = await setupStore();
const text = readContentText(store, "NONEXISTENT0001");
expect(text).toBeNull();
});
});
describe("CAS graph integrity", () => {
test("all nodes are reachable via walk from thread-end", async () => {
const { store, typeHashes } = await setupStore();
const { workflowHash } = await setupWorkflow(store, typeHashes, SIMPLE_WORKFLOW);
const agentFn = createMockAgent({
planner: { text: "plan", meta: { plan: "x" } },
coder: { text: "code", meta: { code: "y" } },
});
const result = await executeJsonCasThread({
workflowHash,
input: "walk test",
moderatorRules: SIMPLE_WORKFLOW.moderator,
io: makeIo(store, typeHashes, "THREAD_WALK"),
options: makeOptions(),
agentFn,
logger: noLogger(),
workflow: null,
});
const visited = new Set<string>();
walk(store, result.rootHash, (hash) => {
visited.add(hash);
});
expect(visited.has(result.rootHash)).toBe(true);
const endPayload = store.get(result.rootHash)!.payload as ThreadEndPayload;
expect(visited.has(endPayload.start)).toBe(true);
expect(visited.has(endPayload.lastStep)).toBe(true);
const step2 = store.get(endPayload.lastStep)!.payload as ThreadStepPayload;
expect(visited.has(step2.content)).toBe(true);
expect(visited.has(step2.react)).toBe(true);
expect(visited.has(step2.start)).toBe(true);
if (step2.previous !== null) {
expect(visited.has(step2.previous)).toBe(true);
const step1 = store.get(step2.previous)!.payload as ThreadStepPayload;
expect(visited.has(step1.content)).toBe(true);
expect(visited.has(step1.react)).toBe(true);
}
});
test("react session nodes have placeholder structure", async () => {
const { store, typeHashes } = await setupStore();
const { workflowHash } = await setupWorkflow(store, typeHashes, SINGLE_ROLE_WORKFLOW);
const agentFn = createMockAgent({
worker: { text: "w", meta: { result: "r" } },
});
const result = await executeJsonCasThread({
workflowHash,
input: "react check",
moderatorRules: SINGLE_ROLE_WORKFLOW.moderator,
io: makeIo(store, typeHashes, "THREAD_REACT"),
options: makeOptions(),
agentFn,
logger: noLogger(),
workflow: null,
});
const endPayload = store.get(result.rootHash)!.payload as ThreadEndPayload;
const stepPayload = store.get(endPayload.lastStep)!.payload as ThreadStepPayload;
const reactNode = store.get(stepPayload.react);
expect(reactNode).not.toBeNull();
const reactPayload = reactNode!.payload as Record<string, unknown>;
expect(reactPayload.turns).toEqual([]);
expect(reactPayload.totalTokens).toBe(0);
expect(reactPayload.durationMs).toBe(0);
expect(reactPayload.role).toBe("worker");
expect(typeof reactPayload.agent).toBe("string");
});
});
+3
View File
@@ -24,6 +24,9 @@
"@uncaged/workflow-cas": "workspace:^",
"@uncaged/workflow-reactor": "workspace:^",
"@uncaged/workflow-register": "workspace:^",
"@uncaged/json-cas": "file:../../../json-cas/packages/json-cas",
"@uncaged/json-cas-workflow": "file:../../../json-cas/packages/json-cas-workflow",
"@uncaged/workflow-json-def": "workspace:^",
"yaml": "^2.7.1"
},
"peerDependencies": {
@@ -7,6 +7,18 @@ export {
walkStateFramesNewestFirst,
} from "./fork-thread.js";
export { garbageCollectCas } from "./gc.js";
export { buildJsonCasThreadContext, buildJsonCasThreadSnapshot, readContentText } from "./json-cas-context.js";
export { executeJsonCasThread } from "./json-cas-engine.js";
export type {
AgentBindings,
JsonCasAgentFn,
JsonCasEngineIo,
JsonCasEngineOptions,
JsonCasStartSnapshot,
JsonCasStepSnapshot,
JsonCasThreadPauseGate,
JsonCasThreadSnapshot,
} from "./json-cas-types.js";
export { createThreadPauseGate } from "./thread-pause-gate.js";
export type { ThreadHistoryEntry, ThreadIndex, ThreadIndexEntry } from "./threads-index.js";
export {
@@ -0,0 +1,130 @@
import type { Hash, Store } from "@uncaged/json-cas";
import type {
ContentPayload,
ThreadStartPayload,
ThreadStepPayload,
WorkflowSchemaHashes,
} from "@uncaged/json-cas-workflow";
import type { ThreadContext } from "@uncaged/workflow-protocol";
import { START } from "@uncaged/workflow-protocol";
import type { JsonCasStepSnapshot, JsonCasThreadSnapshot } from "./json-cas-types.js";
// ── Snapshot builder (lightweight, for agent & moderator) ─────────────
/**
* Walk the thread-step chain backwards via `previous` refs, then reverse
* to get chronological order. Returns a {@link JsonCasThreadSnapshot}.
*/
export function buildJsonCasThreadSnapshot(
store: Store,
_typeHashes: WorkflowSchemaHashes,
startHash: Hash,
headStepHash: Hash | null,
threadId: string,
): JsonCasThreadSnapshot {
const startNode = store.get(startHash);
if (startNode === null) {
throw new Error(`buildJsonCasThreadSnapshot: missing thread-start node at ${startHash}`);
}
const startPayload = startNode.payload as ThreadStartPayload;
const steps: JsonCasStepSnapshot[] = [];
let cursor: Hash | null = headStepHash;
while (cursor !== null) {
const stepNode = store.get(cursor);
if (stepNode === null) {
throw new Error(`buildJsonCasThreadSnapshot: missing thread-step node at ${cursor}`);
}
const stepPayload = stepNode.payload as ThreadStepPayload;
steps.push({
role: stepPayload.role,
meta: stepPayload.meta,
contentHash: stepPayload.content,
});
cursor = stepPayload.previous;
}
steps.reverse();
return {
threadId,
start: {
input: startPayload.input,
depth: startPayload.depth,
workflowHash: startPayload.workflow,
},
steps,
};
}
// ── ThreadContext builder (protocol-compatible) ───────────────────────
/**
* Build a full {@link ThreadContext} from a json-cas thread chain.
* Reads the thread-start node, walks thread-step backwards, and resolves
* content text from each step's content node.
*
* `bundleHash` is set from the workflow ref in the thread-start payload.
* `threadId` is set to `""` — callers should overwrite when known.
*/
export function buildJsonCasThreadContext(
store: Store,
_typeHashes: WorkflowSchemaHashes,
startHash: Hash,
headStepHash: Hash | null,
): ThreadContext {
const startNode = store.get(startHash);
if (startNode === null) {
throw new Error(`buildJsonCasThreadContext: missing thread-start node at ${startHash}`);
}
const startPayload = startNode.payload as ThreadStartPayload;
const rawSteps: ThreadStepPayload[] = [];
let cursor: Hash | null = headStepHash;
while (cursor !== null) {
const stepNode = store.get(cursor);
if (stepNode === null) {
throw new Error(`buildJsonCasThreadContext: missing thread-step node at ${cursor}`);
}
const payload = stepNode.payload as ThreadStepPayload;
rawSteps.push(payload);
cursor = payload.previous;
}
rawSteps.reverse();
const steps = rawSteps.map((sp) => ({
role: sp.role,
meta: sp.meta,
contentHash: sp.content,
refs: [] as string[],
timestamp: 0,
}));
return {
threadId: "",
depth: startPayload.depth,
bundleHash: startPayload.workflow,
start: {
role: START,
content: startPayload.input,
meta: {},
timestamp: 0,
parentState: startPayload.parentThread,
},
steps,
};
}
/**
* Read the text payload from a content node.
*/
export function readContentText(store: Store, contentHash: Hash): string | null {
const node = store.get(contentHash);
if (node === null) {
return null;
}
const payload = node.payload as ContentPayload;
return payload.text;
}
@@ -0,0 +1,317 @@
import type { Hash, Store } from "@uncaged/json-cas";
import type {
ContentPayload,
ThreadEndPayload,
ThreadStartPayload,
ThreadStepPayload,
WorkflowSchemaHashes,
} from "@uncaged/json-cas-workflow";
import type { HydratedWorkflow } from "@uncaged/workflow-json-def";
import type { ModeratorRule, WorkflowResult } from "@uncaged/workflow-protocol";
import { END, evaluateModerator, START } from "@uncaged/workflow-protocol";
import type { LogFn } from "@uncaged/workflow-util";
import type {
AgentBindings,
JsonCasAgentFn,
JsonCasEngineIo,
JsonCasEngineOptions,
JsonCasStepSnapshot,
JsonCasThreadSnapshot,
} from "./json-cas-types.js";
// ── Helpers: CAS node writers ─────────────────────────────────────────
async function writeContent(
store: Store,
typeHashes: WorkflowSchemaHashes,
text: string,
): Promise<Hash> {
const payload: ContentPayload = { text };
return store.put(typeHashes.content, payload);
}
async function writePlaceholderReactSession(
store: Store,
typeHashes: WorkflowSchemaHashes,
role: string,
agentHash: Hash,
): Promise<Hash> {
return store.put(typeHashes.reactSession, {
agent: agentHash,
role,
turns: [],
totalTokens: 0,
durationMs: 0,
});
}
async function writeThreadStart(
store: Store,
typeHashes: WorkflowSchemaHashes,
params: {
workflowHash: Hash;
input: string;
depth: number;
parentThread: Hash | null;
agents: AgentBindings;
},
): Promise<Hash> {
const payload: ThreadStartPayload = {
workflow: params.workflowHash,
input: params.input,
depth: params.depth,
parentThread: params.parentThread,
agents: params.agents,
};
return store.put(typeHashes.threadStart, payload);
}
async function writeThreadStep(
store: Store,
typeHashes: WorkflowSchemaHashes,
params: {
role: string;
meta: Record<string, unknown>;
contentHash: Hash;
reactHash: Hash;
startHash: Hash;
previousHash: Hash | null;
},
): Promise<Hash> {
const payload: ThreadStepPayload = {
role: params.role,
meta: params.meta,
content: params.contentHash,
react: params.reactHash,
start: params.startHash,
previous: params.previousHash,
};
return store.put(typeHashes.threadStep, payload);
}
async function writeThreadEnd(
store: Store,
typeHashes: WorkflowSchemaHashes,
params: {
returnCode: number;
summary: string;
startHash: Hash;
lastStepHash: Hash;
},
): Promise<Hash> {
const payload: ThreadEndPayload = {
returnCode: params.returnCode,
summary: params.summary,
start: params.startHash,
lastStep: params.lastStepHash,
};
return store.put(typeHashes.threadEnd, payload);
}
// ── Placeholder agent ─────────────────────────────────────────────────
async function ensurePlaceholderAgent(
store: Store,
typeHashes: WorkflowSchemaHashes,
): Promise<Hash> {
return store.put(typeHashes.agent, {
package: "placeholder",
version: "0.0.0",
config: {},
});
}
// ── JSONata moderator adapter ─────────────────────────────────────────
function snapshotToModeratorContext(
snapshot: JsonCasThreadSnapshot,
): Parameters<typeof evaluateModerator>[1] {
return {
threadId: snapshot.threadId,
depth: snapshot.start.depth,
bundleHash: snapshot.start.workflowHash,
start: {
role: START,
content: snapshot.start.input,
meta: {},
timestamp: 0,
parentState: null,
},
steps: snapshot.steps.map((s) => ({
role: s.role,
meta: s.meta,
contentHash: s.contentHash,
refs: [],
timestamp: 0,
})),
};
}
// ── Main engine ───────────────────────────────────────────────────────
/**
* Execute a workflow thread using json-cas as the storage layer.
*
* Drives the moderator→agent loop:
* 1. Writes a thread-start node.
* 2. On each round: evaluates the moderator, invokes the agent, writes
* content + thread-step nodes (react is a placeholder for now).
* 3. On END: writes a thread-end node and returns the result.
*
* The `agentFn` callback is invoked for each role step. It receives the
* role name, system prompt, and current thread snapshot, and returns the
* agent's text output plus structured meta.
*/
export async function executeJsonCasThread(params: {
workflowHash: Hash;
input: string;
moderatorRules: readonly ModeratorRule[];
io: JsonCasEngineIo;
options: JsonCasEngineOptions;
agentFn: JsonCasAgentFn;
logger: LogFn;
/** Hydrated workflow for role system prompts. Null disables prompt forwarding. */
workflow: HydratedWorkflow | null;
}): Promise<WorkflowResult> {
const { io, options, agentFn, logger, moderatorRules, workflow } = params;
const { store, typeHashes, threadId } = io;
const placeholderAgentHash = await ensurePlaceholderAgent(store, typeHashes);
const startHash = await writeThreadStart(store, typeHashes, {
workflowHash: params.workflowHash,
input: params.input,
depth: options.depth,
parentThread: options.parentThread,
agents: options.agents,
});
logger("X3RK7QWN", `json-cas thread ${threadId} started`);
let previousStepHash: Hash | null = null;
let headStepHash: Hash | null = null;
const stepSnapshots: JsonCasStepSnapshot[] = [];
while (true) {
if (options.signal.aborted) {
return abortThread(store, typeHashes, startHash, headStepHash, logger, threadId);
}
const snapshot: JsonCasThreadSnapshot = {
threadId,
start: {
input: params.input,
depth: options.depth,
workflowHash: params.workflowHash,
},
steps: stepSnapshots,
};
const modCtx = snapshotToModeratorContext(snapshot);
const nextRole = await evaluateModerator(moderatorRules, modCtx);
if (nextRole === END) {
logger("Y5TN8RVK", `json-cas thread ${threadId} moderator returned END`);
if (headStepHash === null) {
const dummyContentHash = await writeContent(store, typeHashes, "no-op");
const dummyReactHash = await writePlaceholderReactSession(
store,
typeHashes,
END,
placeholderAgentHash,
);
headStepHash = await writeThreadStep(store, typeHashes, {
role: END,
meta: {},
contentHash: dummyContentHash,
reactHash: dummyReactHash,
startHash,
previousHash: null,
});
}
const endHash = await writeThreadEnd(store, typeHashes, {
returnCode: 0,
summary: "completed: moderator returned END",
startHash,
lastStepHash: headStepHash,
});
return { returnCode: 0, summary: "completed: moderator returned END", rootHash: endHash };
}
const roleSystemPrompt =
workflow !== null && workflow.roles[nextRole] !== undefined
? workflow.roles[nextRole].systemPrompt
: "";
const agentResult = await agentFn(nextRole, roleSystemPrompt, snapshot);
const contentHash = await writeContent(store, typeHashes, agentResult.text);
const agentHash = options.agents[nextRole] ?? placeholderAgentHash;
const reactHash = await writePlaceholderReactSession(store, typeHashes, nextRole, agentHash);
const stepHash = await writeThreadStep(store, typeHashes, {
role: nextRole,
meta: agentResult.meta,
contentHash,
reactHash,
startHash,
previousHash: previousStepHash,
});
previousStepHash = stepHash;
headStepHash = stepHash;
stepSnapshots.push({
role: nextRole,
meta: agentResult.meta,
contentHash,
});
logger("Z7WP4NHK", `json-cas thread ${threadId} wrote role ${nextRole}`);
}
}
async function abortThread(
store: Store,
typeHashes: WorkflowSchemaHashes,
startHash: Hash,
headStepHash: Hash | null,
logger: LogFn,
threadId: string,
): Promise<WorkflowResult> {
logger("A8QK3VNR", `json-cas thread ${threadId} aborted`);
const placeholderAgentHash = await ensurePlaceholderAgent(store, typeHashes);
let lastStep = headStepHash;
if (lastStep === null) {
const dummyContentHash = await writeContent(store, typeHashes, "thread aborted");
const dummyReactHash = await writePlaceholderReactSession(
store,
typeHashes,
END,
placeholderAgentHash,
);
lastStep = await writeThreadStep(store, typeHashes, {
role: END,
meta: {},
contentHash: dummyContentHash,
reactHash: dummyReactHash,
startHash,
previousHash: null,
});
}
const endHash = await writeThreadEnd(store, typeHashes, {
returnCode: 130,
summary: "thread aborted",
startHash,
lastStepHash: lastStep,
});
return { returnCode: 130, summary: "thread aborted", rootHash: endHash };
}
@@ -0,0 +1,71 @@
import type { Hash, Store } from "@uncaged/json-cas";
import type { WorkflowSchemaHashes } from "@uncaged/json-cas-workflow";
import type { Result } from "@uncaged/workflow-util";
// ── Engine IO ─────────────────────────────────────────────────────────
export type JsonCasEngineIo = {
threadId: string;
store: Store;
typeHashes: WorkflowSchemaHashes;
};
// ── Agent binding ─────────────────────────────────────────────────────
/**
* Maps each role name to a CAS hash referencing an agent node.
* Phase 4 uses a simple role→hash mapping; full agent resolution comes later.
*/
export type AgentBindings = Record<string, Hash>;
// ── Engine options ────────────────────────────────────────────────────
export type JsonCasEngineOptions = {
depth: number;
parentThread: Hash | null;
signal: AbortSignal;
agents: AgentBindings;
};
// ── Agent function (mock-friendly) ────────────────────────────────────
/**
* Invoked for each role step. Returns the agent's raw text output and
* structured meta. The engine stores the text in a content node and the
* meta inside the thread-step node.
*/
export type JsonCasAgentFn = (
role: string,
systemPrompt: string,
context: JsonCasThreadSnapshot,
) => Promise<{ text: string; meta: Record<string, unknown> }>;
// ── Thread snapshot (read-only view for agents & moderator) ───────────
export type JsonCasStartSnapshot = {
input: string;
depth: number;
workflowHash: Hash;
};
export type JsonCasStepSnapshot = {
role: string;
meta: Record<string, unknown>;
contentHash: Hash;
};
export type JsonCasThreadSnapshot = {
threadId: string;
start: JsonCasStartSnapshot;
steps: readonly JsonCasStepSnapshot[];
};
// ── Thread pause gate (re-use from existing types) ────────────────────
export type JsonCasThreadPauseGate = {
awaitAfterYield: () => Promise<void>;
pause: () => Result<void, string>;
resume: () => Result<void, string>;
isPaused: () => boolean;
};
+12
View File
@@ -4,6 +4,18 @@ export {
walkStateFramesNewestFirst,
} from "./engine/fork-thread.js";
export { garbageCollectCas } from "./engine/gc.js";
export { buildJsonCasThreadContext, buildJsonCasThreadSnapshot, readContentText } from "./engine/json-cas-context.js";
export { executeJsonCasThread } from "./engine/json-cas-engine.js";
export type {
AgentBindings,
JsonCasAgentFn,
JsonCasEngineIo,
JsonCasEngineOptions,
JsonCasStartSnapshot,
JsonCasStepSnapshot,
JsonCasThreadPauseGate,
JsonCasThreadSnapshot,
} from "./engine/json-cas-types.js";
export type {
ThreadHistoryEntry,
ThreadIndex,
+2 -1
View File
@@ -11,6 +11,7 @@
{ "path": "../workflow-util" },
{ "path": "../workflow-cas" },
{ "path": "../workflow-reactor" },
{ "path": "../workflow-register" }
{ "path": "../workflow-register" },
{ "path": "../workflow-json-def" }
]
}