feat(cas,reactor): create @uncaged/workflow-cas and @uncaged/workflow-reactor

Phase 4: CAS module extracted with Merkle types, hash functions,
and fs-backed store. Imports CasStore type from protocol.

Phase 5: Reactor (ReAct loop) extracted as independent package.
Only depends on protocol — no cas or engine dependency.

Ref: #143, closes #147, closes #148
This commit is contained in:
2026-05-09 11:11:33 +08:00
parent 39d2a61686
commit 1a1e8b3398
13 changed files with 743 additions and 0 deletions
+20
View File
@@ -0,0 +1,20 @@
{
"name": "@uncaged/workflow-cas",
"version": "0.1.0",
"type": "module",
"exports": {
".": {
"types": "./src/index.ts",
"default": "./src/index.ts"
}
},
"dependencies": {
"@uncaged/workflow-protocol": "workspace:*",
"@uncaged/workflow-util": "workspace:*",
"xxhashjs": "^0.2.2",
"yaml": "^2.7.1"
},
"devDependencies": {
"@types/bun": "latest"
}
}
+76
View File
@@ -0,0 +1,76 @@
import { mkdir, readdir, readFile, rename, unlink, writeFile } from "node:fs/promises";
import { join } from "node:path";
import { hashString } from "./hash.js";
import { createContentMerkleNode, parseMerkleNode, serializeMerkleNode } from "./merkle.js";
import type { CasStore } from "./types.js";
/** Raw strings become content merkle YAML; already-valid merkle documents pass through. */
function normalizeCasPutContent(content: string): string {
try {
parseMerkleNode(content);
return content;
} catch {
return serializeMerkleNode(createContentMerkleNode(content));
}
}
export function createCasStore(casDir: string): CasStore {
async function ensureDir(): Promise<void> {
await mkdir(casDir, { recursive: true });
}
function filePath(hash: string): string {
return join(casDir, `${hash}.txt`);
}
return {
async put(content: string): Promise<string> {
const toStore = normalizeCasPutContent(content);
const hash = hashString(toStore);
await ensureDir();
const target = filePath(hash);
const tmp = `${target}.tmp.${Date.now()}`;
await writeFile(tmp, toStore, "utf8");
await rename(tmp, target);
return hash;
},
async get(hash: string): Promise<string | null> {
try {
return await readFile(filePath(hash), "utf8");
} catch (e) {
const errObj = e as NodeJS.ErrnoException;
if (errObj.code === "ENOENT") {
return null;
}
throw e;
}
},
async delete(hash: string): Promise<void> {
try {
await unlink(filePath(hash));
} catch (e) {
const errObj = e as NodeJS.ErrnoException;
if (errObj.code === "ENOENT") {
return;
}
throw e;
}
},
async list(): Promise<string[]> {
try {
const entries = await readdir(casDir);
return entries.filter((name) => name.endsWith(".txt")).map((name) => name.slice(0, -4));
} catch (e) {
const errObj = e as NodeJS.ErrnoException;
if (errObj.code === "ENOENT") {
return [];
}
throw e;
}
},
};
}
+24
View File
@@ -0,0 +1,24 @@
import { Buffer } from "node:buffer";
import XXH from "xxhashjs";
import { encodeUint64AsCrockford } from "@uncaged/workflow-util";
function digestToUint64(digest: { toString(radix?: number): string }): bigint {
const hex = digest.toString(16).padStart(16, "0");
return BigInt(`0x${hex}`);
}
/** XXH64 (seed 0) over bundle bytes, encoded as 13-char Crockford Base32. */
export function hashWorkflowBundleBytes(data: Uint8Array): string {
const buf = Buffer.from(data.buffer, data.byteOffset, data.byteLength);
const digest = XXH.h64(0).update(buf).digest();
return encodeUint64AsCrockford(digestToUint64(digest));
}
/** XXH64 (seed 0) over a UTF-8 string, encoded as 13-char Crockford Base32. */
export function hashString(content: string): string {
const buf = Buffer.from(content, "utf8");
const digest = XXH.h64(0).update(buf).digest();
return encodeUint64AsCrockford(digestToUint64(digest));
}
+18
View File
@@ -0,0 +1,18 @@
export { createCasStore } from "./cas.js";
export { hashString, hashWorkflowBundleBytes } from "./hash.js";
export {
createContentMerkleNode,
getContentMerklePayload,
parseMerkleNode,
putContentMerkleNode,
putStepMerkleNode,
putThreadMerkleNode,
serializeMerkleNode,
} from "./merkle.js";
export type {
CasStore,
MerkleNode,
MerkleNodeType,
StepMerklePayload,
ThreadMerklePayload,
} from "./types.js";
+99
View File
@@ -0,0 +1,99 @@
import { parse, stringify } from "yaml";
import type { CasStore, MerkleNode, StepMerklePayload, ThreadMerklePayload } from "./types.js";
export function serializeMerkleNode(node: MerkleNode): string {
return stringify(
{ type: node.type, payload: node.payload, children: node.children },
{ indent: 2 },
);
}
export function parseMerkleNode(yamlText: string): MerkleNode {
const raw = parse(yamlText) as unknown;
if (raw === null || typeof raw !== "object") {
throw new Error("merkle: YAML root must be an object");
}
const rec = raw as Record<string, unknown>;
const type = rec.type;
const payload = rec.payload;
const children = rec.children;
if (type !== "content" && type !== "step" && type !== "thread") {
throw new Error("merkle: invalid or missing type");
}
if (typeof payload !== "string" && (payload === null || typeof payload !== "object")) {
throw new Error("merkle: payload must be a string or object");
}
if (!Array.isArray(children)) {
throw new Error("merkle: children must be an array");
}
const childHashes: string[] = [];
for (const c of children) {
if (typeof c !== "string") {
throw new Error("merkle: child hash must be a string");
}
childHashes.push(c);
}
return {
type,
payload: typeof payload === "string" ? payload : (payload as Record<string, unknown>),
children: childHashes,
};
}
export function createContentMerkleNode(payload: string): MerkleNode {
return { type: "content", payload, children: [] };
}
/** Serializes a step Merkle node (role + meta + content child) and stores it in CAS. */
export async function putStepMerkleNode(
store: CasStore,
payload: StepMerklePayload,
contentHash: string,
): Promise<string> {
const node: MerkleNode = {
type: "step",
payload: { role: payload.role, meta: payload.meta },
children: [contentHash],
};
return store.put(serializeMerkleNode(node));
}
/** Serializes the thread root Merkle node and stores it in CAS. */
export async function putThreadMerkleNode(
store: CasStore,
payload: ThreadMerklePayload,
stepHashes: readonly string[],
): Promise<string> {
const node: MerkleNode = {
type: "thread",
payload: {
workflow: payload.workflow,
threadId: payload.threadId,
result: payload.result,
},
children: [...stepHashes],
};
return store.put(serializeMerkleNode(node));
}
/** Stores agent/content text via CAS; {@link createCasStore} wraps raw strings as merkle content nodes. */
export async function putContentMerkleNode(store: CasStore, content: string): Promise<string> {
return store.put(content);
}
/** Loads a CAS blob and returns the payload string for a `content` Merkle node. */
export async function getContentMerklePayload(
store: CasStore,
hash: string,
): Promise<string | null> {
const yamlText = await store.get(hash);
if (yamlText === null) {
return null;
}
const node = parseMerkleNode(yamlText);
if (node.type !== "content" || typeof node.payload !== "string") {
return null;
}
return node.payload;
}
+23
View File
@@ -0,0 +1,23 @@
export type { CasStore } from "@uncaged/workflow-protocol";
export type MerkleNodeType = "content" | "step" | "thread";
export type MerkleNode = {
type: MerkleNodeType;
payload: string | Record<string, unknown>;
children: string[];
};
export type StepMerklePayload = {
role: string;
meta: Record<string, unknown>;
};
export type ThreadMerklePayload = {
workflow: string;
threadId: string;
result: {
returnCode: number;
summary: string;
};
};
+12
View File
@@ -0,0 +1,12 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"rootDir": "src",
"outDir": "dist"
},
"include": ["src"],
"references": [
{ "path": "../workflow-protocol" },
{ "path": "../workflow-util" }
]
}
+21
View File
@@ -0,0 +1,21 @@
{
"name": "@uncaged/workflow-reactor",
"version": "0.1.0",
"type": "module",
"exports": {
".": {
"types": "./dist/index.d.ts",
"import": "./src/index.ts"
}
},
"dependencies": {
"@uncaged/workflow-protocol": "workspace:*"
},
"peerDependencies": {
"zod": "^4.0.0"
},
"devDependencies": {
"zod": "^4.0.0",
"typescript": "^5.8.3"
}
}
+12
View File
@@ -0,0 +1,12 @@
export { createLlmFn } from "./llm-fn.js";
export { createThreadReactor } from "./thread-reactor.js";
export type {
ChatMessage,
LlmFn,
StructuredToolSpec,
ThreadReactorConfig,
ThreadReactorFn,
ThreadReactorInvokeArgs,
ToolCall,
ToolDefinition,
} from "./types.js";
+48
View File
@@ -0,0 +1,48 @@
import type { LlmProvider } from "@uncaged/workflow-protocol";
import { err, ok } from "@uncaged/workflow-protocol";
import type { ChatMessage, LlmFn, ToolDefinition } from "./types.js";
function chatCompletionsUrl(baseUrl: string): string {
const trimmed = baseUrl.replace(/\/+$/, "");
return `${trimmed}/chat/completions`;
}
/**
* Wraps provider credentials into an {@link LlmFn}: single POST to chat/completions,
* returns raw JSON body text or a {@link Result} error. Callers parse assistant messages.
*/
export function createLlmFn(provider: LlmProvider): LlmFn {
return async ({
messages,
tools,
}: {
messages: ChatMessage[];
tools: readonly ToolDefinition[];
}) => {
try {
const response = await fetch(chatCompletionsUrl(provider.baseUrl), {
method: "POST",
headers: {
Authorization: `Bearer ${provider.apiKey}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
model: provider.model,
messages,
tools,
tool_choice: "auto",
}),
});
const responseText = await response.text();
if (!response.ok) {
return err(`http_error:${String(response.status)}:${responseText.slice(0, 4000)}`);
}
return ok(responseText);
} catch (cause) {
const message = cause instanceof Error ? cause.message : String(cause);
return err(`network_error:${message}`);
}
};
}
@@ -0,0 +1,317 @@
import type * as z from "zod/v4";
import { err, ok, type Result } from "@uncaged/workflow-protocol";
import type {
ChatMessage,
StructuredToolSpec,
ThreadReactorConfig,
ThreadReactorFn,
ToolCall,
ToolDefinition,
} from "./types.js";
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
function tryParseJsonContent(content: string): unknown | null {
const trimmed = content.trim();
const fenceMatch = /^```(?:json)?\s*([\s\S]*?)```$/m.exec(trimmed);
const payload = fenceMatch !== null ? fenceMatch[1].trim() : trimmed;
try {
return JSON.parse(payload) as unknown;
} catch {
return null;
}
}
function firstAssistantMessage(responseText: string): Result<Record<string, unknown>, string> {
let parsed: unknown;
try {
parsed = JSON.parse(responseText) as unknown;
} catch (cause) {
const message = cause instanceof Error ? cause.message : String(cause);
return err(`invalid_response_json:${message}`);
}
if (!isRecord(parsed)) {
return err("invalid_response_top_level");
}
const choices = parsed.choices;
if (!Array.isArray(choices) || choices.length === 0) {
return err("no_choices_in_response");
}
const firstChoice = choices[0];
if (!isRecord(firstChoice)) {
return err("invalid_choice");
}
const messageObj = firstChoice.message;
if (!isRecord(messageObj)) {
return err("invalid_message");
}
return ok(messageObj);
}
function normalizeToolCalls(toolCallsRaw: unknown[]): Result<ToolCall[], string> {
const toolCalls: ToolCall[] = [];
for (const tc of toolCallsRaw) {
if (!isRecord(tc)) {
return err("invalid_tool_call");
}
const id = tc.id;
const tcType = tc.type;
const fn = tc.function;
if (typeof id !== "string" || tcType !== "function" || !isRecord(fn)) {
return err("invalid_tool_call_shape");
}
const name = fn.name;
const argumentsStr = fn.arguments;
if (typeof name !== "string" || typeof argumentsStr !== "string") {
return err("invalid_tool_call_function");
}
toolCalls.push({ id, type: "function", function: { name, arguments: argumentsStr } });
}
return ok(toolCalls);
}
type AssistantTurn<T> =
| { kind: "plain_json"; value: T }
| { kind: "tool_calls"; calls: ToolCall[]; assistantContent: string | null };
type AssistantTurnOrCorrection<T> =
| AssistantTurn<T>
| { kind: "plain_json_invalid"; rawContent: string; correction: string };
function classifyAssistantTurn<T>(
messageObj: Record<string, unknown>,
schema: z.ZodType<T>,
structuredToolName: string,
): Result<AssistantTurnOrCorrection<T>, string> {
const toolCallsRaw = messageObj.tool_calls;
if (!Array.isArray(toolCallsRaw) || toolCallsRaw.length === 0) {
const content = messageObj.content;
if (typeof content !== "string") {
return err("no_tool_calls_and_no_string_content");
}
const jsonParsed = tryParseJsonContent(content);
if (jsonParsed === null) {
return ok({
kind: "plain_json_invalid",
rawContent: content,
correction: `Your previous reply was not valid JSON and contained no tool calls. Reply with a single JSON object that matches the schema, or call the ${structuredToolName} tool with the structured arguments.`,
});
}
const validated = schema.safeParse(jsonParsed);
if (!validated.success) {
return ok({
kind: "plain_json_invalid",
rawContent: content,
correction: `Your previous JSON reply did not satisfy the schema: ${validated.error.message}. Reply again with a JSON object that matches the schema, or call the ${structuredToolName} tool with the structured arguments.`,
});
}
return ok({ kind: "plain_json", value: validated.data });
}
const callsResult = normalizeToolCalls(toolCallsRaw);
if (!callsResult.ok) {
return err(callsResult.error);
}
const assistantContent = messageObj.content;
return ok({
kind: "tool_calls",
calls: callsResult.value,
assistantContent: typeof assistantContent === "string" ? assistantContent : null,
});
}
function toolNamesFromDefinitions(tools: readonly { function: { name: string } }[]): Set<string> {
return new Set(tools.map((t) => t.function.name));
}
function appendStructuredToolResult<T>(
tc: ToolCall,
schema: z.ZodType<T>,
messages: ChatMessage[],
): T | null {
let parsedArgs: unknown;
try {
parsedArgs = JSON.parse(tc.function.arguments) as unknown;
} catch {
messages.push({
role: "tool",
tool_call_id: tc.id,
content:
"Tool arguments were not valid JSON. Provide valid JSON object arguments matching the schema.",
});
return null;
}
const validated = schema.safeParse(parsedArgs);
if (!validated.success) {
messages.push({
role: "tool",
tool_call_id: tc.id,
content: `Schema validation failed: ${validated.error.message}. Fix the arguments and call the tool again with a JSON object that matches the schema.`,
});
return null;
}
messages.push({
role: "tool",
tool_call_id: tc.id,
content: '{"ok":true}',
});
return validated.data;
}
async function dispatchToolCall<T, TThread>(
tc: ToolCall,
spec: StructuredToolSpec,
knownNames: Set<string>,
schema: z.ZodType<T>,
thread: TThread,
toolHandler: ThreadReactorConfig<TThread>["toolHandler"],
messages: ChatMessage[],
): Promise<T | null> {
if (!knownNames.has(tc.function.name)) {
messages.push({
role: "tool",
tool_call_id: tc.id,
content: `Unknown tool: ${tc.function.name}. Use one of the declared tools only.`,
});
return null;
}
if (tc.function.name === spec.name) {
return appendStructuredToolResult(tc, schema, messages);
}
let toolContent: string;
try {
toolContent = await toolHandler(tc, thread);
} catch (cause) {
const message = cause instanceof Error ? cause.message : String(cause);
toolContent = `Tool execution failed: ${message}`;
}
messages.push({
role: "tool",
tool_call_id: tc.id,
content: toolContent,
});
return null;
}
async function resolveToolCallRound<T, TThread>(
turn: Extract<AssistantTurn<T>, { kind: "tool_calls" }>,
spec: StructuredToolSpec,
knownNames: Set<string>,
schema: z.ZodType<T>,
thread: TThread,
toolHandler: ThreadReactorConfig<TThread>["toolHandler"],
messages: ChatMessage[],
): Promise<Result<T, string> | null> {
messages.push({
role: "assistant",
content: turn.assistantContent,
tool_calls: turn.calls,
});
let extractedRound: T | null = null;
for (const tc of turn.calls) {
const extracted = await dispatchToolCall(
tc,
spec,
knownNames,
schema,
thread,
toolHandler,
messages,
);
if (extracted !== null) {
extractedRound = extracted;
}
}
if (extractedRound !== null) {
return ok(extractedRound);
}
return null;
}
async function runOneReactRound<T, TThread>(
config: ThreadReactorConfig<TThread>,
args: { thread: TThread; schema: z.ZodType<T> },
tools: readonly ToolDefinition[],
knownNames: Set<string>,
spec: StructuredToolSpec,
messages: ChatMessage[],
): Promise<Result<T, string> | null> {
const bodyResult = await config.llm({ messages, tools });
if (!bodyResult.ok) {
return bodyResult;
}
const msgResult = firstAssistantMessage(bodyResult.value);
if (!msgResult.ok) {
return msgResult;
}
const classified = classifyAssistantTurn(msgResult.value, args.schema, spec.name);
if (!classified.ok) {
return classified;
}
const turn = classified.value;
if (turn.kind === "plain_json") {
return ok(turn.value);
}
if (turn.kind === "plain_json_invalid") {
messages.push({ role: "assistant", content: turn.rawContent });
messages.push({ role: "user", content: turn.correction });
return null;
}
return resolveToolCallRound(
turn,
spec,
knownNames,
args.schema,
args.thread,
config.toolHandler,
messages,
);
}
/**
* Generic ReAct loop: LLM round-trips with tools until structured output validates,
* plain JSON matches schema, or {@link ThreadReactorConfig.maxRounds} is exceeded.
*/
export function createThreadReactor<TThread>(
config: ThreadReactorConfig<TThread>,
): ThreadReactorFn<TThread> {
return async <T>(args: {
thread: TThread;
input: string;
schema: z.ZodType<T>;
}): Promise<Result<T, string>> => {
const spec = config.structuredToolFromSchema(args.schema);
const tools = [...config.staticTools, spec.tool];
const knownNames = toolNamesFromDefinitions(tools);
const systemPrompt = config.systemPromptForStructuredTool(spec.name);
const messages: ChatMessage[] = [
{ role: "system", content: systemPrompt },
{ role: "user", content: args.input },
];
for (let round = 0; round < config.maxRounds; round++) {
const step = await runOneReactRound(
config,
{ thread: args.thread, schema: args.schema },
tools,
knownNames,
spec,
messages,
);
if (step !== null) {
return step;
}
}
return err("max_react_rounds_exceeded");
};
}
+62
View File
@@ -0,0 +1,62 @@
import type * as z from "zod/v4";
import type { Result } from "@uncaged/workflow-protocol";
export type ToolCall = {
id: string;
type: "function";
function: { name: string; arguments: string };
};
export type ToolDefinition = {
type: "function";
function: {
name: string;
description: string;
parameters: Record<string, unknown>;
};
};
export type ChatMessage =
| { role: "system"; content: string }
| { role: "user"; content: string }
| {
role: "assistant";
content: string | null;
tool_calls: ToolCall[];
}
| { role: "assistant"; content: string }
| { role: "tool"; tool_call_id: string; content: string };
export type LlmFn = (input: {
messages: ChatMessage[];
tools: readonly ToolDefinition[];
}) => Promise<Result<string, string>>;
/** Structured tool derived from the per-invocation Zod schema (e.g. extract tool). */
export type StructuredToolSpec = {
name: string;
tool: ToolDefinition;
};
export type ThreadReactorConfig<TThread> = {
llm: LlmFn;
/** Static tools (e.g. cas_get); structured tool is appended per invocation. */
staticTools: readonly ToolDefinition[];
/** Builds the schema-shaped tool and its OpenAI name for this invocation. */
structuredToolFromSchema: (schema: z.ZodType<unknown>) => StructuredToolSpec;
/** System prompt for this run; include the structured tool name for cache stability per schema. */
systemPromptForStructuredTool: (structuredToolName: string) => string;
toolHandler: (call: ToolCall, thread: TThread) => Promise<string>;
maxRounds: number;
};
export type ThreadReactorInvokeArgs<TThread, T> = {
thread: TThread;
input: string;
schema: z.ZodType<T>;
};
export type ThreadReactorFn<TThread> = <T>(
args: ThreadReactorInvokeArgs<TThread, T>,
) => Promise<Result<T, string>>;
+11
View File
@@ -0,0 +1,11 @@
{
"extends": "../../tsconfig.json",
"compilerOptions": {
"rootDir": "src",
"outDir": "dist"
},
"include": ["src"],
"references": [
{ "path": "../workflow-protocol" }
]
}