diff --git a/packages/workflow-cas/package.json b/packages/workflow-cas/package.json new file mode 100644 index 0000000..f1ef5fa --- /dev/null +++ b/packages/workflow-cas/package.json @@ -0,0 +1,20 @@ +{ + "name": "@uncaged/workflow-cas", + "version": "0.1.0", + "type": "module", + "exports": { + ".": { + "types": "./src/index.ts", + "default": "./src/index.ts" + } + }, + "dependencies": { + "@uncaged/workflow-protocol": "workspace:*", + "@uncaged/workflow-util": "workspace:*", + "xxhashjs": "^0.2.2", + "yaml": "^2.7.1" + }, + "devDependencies": { + "@types/bun": "latest" + } +} diff --git a/packages/workflow-cas/src/cas.ts b/packages/workflow-cas/src/cas.ts new file mode 100644 index 0000000..b6a1cce --- /dev/null +++ b/packages/workflow-cas/src/cas.ts @@ -0,0 +1,76 @@ +import { mkdir, readdir, readFile, rename, unlink, writeFile } from "node:fs/promises"; +import { join } from "node:path"; + +import { hashString } from "./hash.js"; +import { createContentMerkleNode, parseMerkleNode, serializeMerkleNode } from "./merkle.js"; +import type { CasStore } from "./types.js"; + +/** Raw strings become content merkle YAML; already-valid merkle documents pass through. */ +function normalizeCasPutContent(content: string): string { + try { + parseMerkleNode(content); + return content; + } catch { + return serializeMerkleNode(createContentMerkleNode(content)); + } +} + +export function createCasStore(casDir: string): CasStore { + async function ensureDir(): Promise { + await mkdir(casDir, { recursive: true }); + } + + function filePath(hash: string): string { + return join(casDir, `${hash}.txt`); + } + + return { + async put(content: string): Promise { + const toStore = normalizeCasPutContent(content); + const hash = hashString(toStore); + await ensureDir(); + const target = filePath(hash); + const tmp = `${target}.tmp.${Date.now()}`; + await writeFile(tmp, toStore, "utf8"); + await rename(tmp, target); + return hash; + }, + + async get(hash: string): Promise { + try { + return await readFile(filePath(hash), "utf8"); + } catch (e) { + const errObj = e as NodeJS.ErrnoException; + if (errObj.code === "ENOENT") { + return null; + } + throw e; + } + }, + + async delete(hash: string): Promise { + try { + await unlink(filePath(hash)); + } catch (e) { + const errObj = e as NodeJS.ErrnoException; + if (errObj.code === "ENOENT") { + return; + } + throw e; + } + }, + + async list(): Promise { + try { + const entries = await readdir(casDir); + return entries.filter((name) => name.endsWith(".txt")).map((name) => name.slice(0, -4)); + } catch (e) { + const errObj = e as NodeJS.ErrnoException; + if (errObj.code === "ENOENT") { + return []; + } + throw e; + } + }, + }; +} diff --git a/packages/workflow-cas/src/hash.ts b/packages/workflow-cas/src/hash.ts new file mode 100644 index 0000000..9d1dc98 --- /dev/null +++ b/packages/workflow-cas/src/hash.ts @@ -0,0 +1,24 @@ +import { Buffer } from "node:buffer"; + +import XXH from "xxhashjs"; + +import { encodeUint64AsCrockford } from "@uncaged/workflow-util"; + +function digestToUint64(digest: { toString(radix?: number): string }): bigint { + const hex = digest.toString(16).padStart(16, "0"); + return BigInt(`0x${hex}`); +} + +/** XXH64 (seed 0) over bundle bytes, encoded as 13-char Crockford Base32. */ +export function hashWorkflowBundleBytes(data: Uint8Array): string { + const buf = Buffer.from(data.buffer, data.byteOffset, data.byteLength); + const digest = XXH.h64(0).update(buf).digest(); + return encodeUint64AsCrockford(digestToUint64(digest)); +} + +/** XXH64 (seed 0) over a UTF-8 string, encoded as 13-char Crockford Base32. */ +export function hashString(content: string): string { + const buf = Buffer.from(content, "utf8"); + const digest = XXH.h64(0).update(buf).digest(); + return encodeUint64AsCrockford(digestToUint64(digest)); +} diff --git a/packages/workflow-cas/src/index.ts b/packages/workflow-cas/src/index.ts new file mode 100644 index 0000000..68aa997 --- /dev/null +++ b/packages/workflow-cas/src/index.ts @@ -0,0 +1,18 @@ +export { createCasStore } from "./cas.js"; +export { hashString, hashWorkflowBundleBytes } from "./hash.js"; +export { + createContentMerkleNode, + getContentMerklePayload, + parseMerkleNode, + putContentMerkleNode, + putStepMerkleNode, + putThreadMerkleNode, + serializeMerkleNode, +} from "./merkle.js"; +export type { + CasStore, + MerkleNode, + MerkleNodeType, + StepMerklePayload, + ThreadMerklePayload, +} from "./types.js"; diff --git a/packages/workflow-cas/src/merkle.ts b/packages/workflow-cas/src/merkle.ts new file mode 100644 index 0000000..178f51e --- /dev/null +++ b/packages/workflow-cas/src/merkle.ts @@ -0,0 +1,99 @@ +import { parse, stringify } from "yaml"; + +import type { CasStore, MerkleNode, StepMerklePayload, ThreadMerklePayload } from "./types.js"; + +export function serializeMerkleNode(node: MerkleNode): string { + return stringify( + { type: node.type, payload: node.payload, children: node.children }, + { indent: 2 }, + ); +} + +export function parseMerkleNode(yamlText: string): MerkleNode { + const raw = parse(yamlText) as unknown; + if (raw === null || typeof raw !== "object") { + throw new Error("merkle: YAML root must be an object"); + } + const rec = raw as Record; + const type = rec.type; + const payload = rec.payload; + const children = rec.children; + if (type !== "content" && type !== "step" && type !== "thread") { + throw new Error("merkle: invalid or missing type"); + } + if (typeof payload !== "string" && (payload === null || typeof payload !== "object")) { + throw new Error("merkle: payload must be a string or object"); + } + if (!Array.isArray(children)) { + throw new Error("merkle: children must be an array"); + } + const childHashes: string[] = []; + for (const c of children) { + if (typeof c !== "string") { + throw new Error("merkle: child hash must be a string"); + } + childHashes.push(c); + } + return { + type, + payload: typeof payload === "string" ? payload : (payload as Record), + children: childHashes, + }; +} + +export function createContentMerkleNode(payload: string): MerkleNode { + return { type: "content", payload, children: [] }; +} + +/** Serializes a step Merkle node (role + meta + content child) and stores it in CAS. */ +export async function putStepMerkleNode( + store: CasStore, + payload: StepMerklePayload, + contentHash: string, +): Promise { + const node: MerkleNode = { + type: "step", + payload: { role: payload.role, meta: payload.meta }, + children: [contentHash], + }; + return store.put(serializeMerkleNode(node)); +} + +/** Serializes the thread root Merkle node and stores it in CAS. */ +export async function putThreadMerkleNode( + store: CasStore, + payload: ThreadMerklePayload, + stepHashes: readonly string[], +): Promise { + const node: MerkleNode = { + type: "thread", + payload: { + workflow: payload.workflow, + threadId: payload.threadId, + result: payload.result, + }, + children: [...stepHashes], + }; + return store.put(serializeMerkleNode(node)); +} + +/** Stores agent/content text via CAS; {@link createCasStore} wraps raw strings as merkle content nodes. */ +export async function putContentMerkleNode(store: CasStore, content: string): Promise { + return store.put(content); +} + +/** Loads a CAS blob and returns the payload string for a `content` Merkle node. */ +export async function getContentMerklePayload( + store: CasStore, + hash: string, +): Promise { + const yamlText = await store.get(hash); + if (yamlText === null) { + return null; + } + const node = parseMerkleNode(yamlText); + if (node.type !== "content" || typeof node.payload !== "string") { + return null; + } + return node.payload; +} diff --git a/packages/workflow-cas/src/types.ts b/packages/workflow-cas/src/types.ts new file mode 100644 index 0000000..fd706cf --- /dev/null +++ b/packages/workflow-cas/src/types.ts @@ -0,0 +1,23 @@ +export type { CasStore } from "@uncaged/workflow-protocol"; + +export type MerkleNodeType = "content" | "step" | "thread"; + +export type MerkleNode = { + type: MerkleNodeType; + payload: string | Record; + children: string[]; +}; + +export type StepMerklePayload = { + role: string; + meta: Record; +}; + +export type ThreadMerklePayload = { + workflow: string; + threadId: string; + result: { + returnCode: number; + summary: string; + }; +}; diff --git a/packages/workflow-cas/tsconfig.json b/packages/workflow-cas/tsconfig.json new file mode 100644 index 0000000..0e1e428 --- /dev/null +++ b/packages/workflow-cas/tsconfig.json @@ -0,0 +1,12 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "rootDir": "src", + "outDir": "dist" + }, + "include": ["src"], + "references": [ + { "path": "../workflow-protocol" }, + { "path": "../workflow-util" } + ] +} diff --git a/packages/workflow-reactor/package.json b/packages/workflow-reactor/package.json new file mode 100644 index 0000000..2f338b7 --- /dev/null +++ b/packages/workflow-reactor/package.json @@ -0,0 +1,21 @@ +{ + "name": "@uncaged/workflow-reactor", + "version": "0.1.0", + "type": "module", + "exports": { + ".": { + "types": "./dist/index.d.ts", + "import": "./src/index.ts" + } + }, + "dependencies": { + "@uncaged/workflow-protocol": "workspace:*" + }, + "peerDependencies": { + "zod": "^4.0.0" + }, + "devDependencies": { + "zod": "^4.0.0", + "typescript": "^5.8.3" + } +} diff --git a/packages/workflow-reactor/src/index.ts b/packages/workflow-reactor/src/index.ts new file mode 100644 index 0000000..80f2ab2 --- /dev/null +++ b/packages/workflow-reactor/src/index.ts @@ -0,0 +1,12 @@ +export { createLlmFn } from "./llm-fn.js"; +export { createThreadReactor } from "./thread-reactor.js"; +export type { + ChatMessage, + LlmFn, + StructuredToolSpec, + ThreadReactorConfig, + ThreadReactorFn, + ThreadReactorInvokeArgs, + ToolCall, + ToolDefinition, +} from "./types.js"; diff --git a/packages/workflow-reactor/src/llm-fn.ts b/packages/workflow-reactor/src/llm-fn.ts new file mode 100644 index 0000000..26f406d --- /dev/null +++ b/packages/workflow-reactor/src/llm-fn.ts @@ -0,0 +1,48 @@ +import type { LlmProvider } from "@uncaged/workflow-protocol"; + +import { err, ok } from "@uncaged/workflow-protocol"; + +import type { ChatMessage, LlmFn, ToolDefinition } from "./types.js"; + +function chatCompletionsUrl(baseUrl: string): string { + const trimmed = baseUrl.replace(/\/+$/, ""); + return `${trimmed}/chat/completions`; +} + +/** + * Wraps provider credentials into an {@link LlmFn}: single POST to chat/completions, + * returns raw JSON body text or a {@link Result} error. Callers parse assistant messages. + */ +export function createLlmFn(provider: LlmProvider): LlmFn { + return async ({ + messages, + tools, + }: { + messages: ChatMessage[]; + tools: readonly ToolDefinition[]; + }) => { + try { + const response = await fetch(chatCompletionsUrl(provider.baseUrl), { + method: "POST", + headers: { + Authorization: `Bearer ${provider.apiKey}`, + "Content-Type": "application/json", + }, + body: JSON.stringify({ + model: provider.model, + messages, + tools, + tool_choice: "auto", + }), + }); + const responseText = await response.text(); + if (!response.ok) { + return err(`http_error:${String(response.status)}:${responseText.slice(0, 4000)}`); + } + return ok(responseText); + } catch (cause) { + const message = cause instanceof Error ? cause.message : String(cause); + return err(`network_error:${message}`); + } + }; +} diff --git a/packages/workflow-reactor/src/thread-reactor.ts b/packages/workflow-reactor/src/thread-reactor.ts new file mode 100644 index 0000000..ae7f291 --- /dev/null +++ b/packages/workflow-reactor/src/thread-reactor.ts @@ -0,0 +1,317 @@ +import type * as z from "zod/v4"; + +import { err, ok, type Result } from "@uncaged/workflow-protocol"; + +import type { + ChatMessage, + StructuredToolSpec, + ThreadReactorConfig, + ThreadReactorFn, + ToolCall, + ToolDefinition, +} from "./types.js"; + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} + +function tryParseJsonContent(content: string): unknown | null { + const trimmed = content.trim(); + const fenceMatch = /^```(?:json)?\s*([\s\S]*?)```$/m.exec(trimmed); + const payload = fenceMatch !== null ? fenceMatch[1].trim() : trimmed; + try { + return JSON.parse(payload) as unknown; + } catch { + return null; + } +} + +function firstAssistantMessage(responseText: string): Result, string> { + let parsed: unknown; + try { + parsed = JSON.parse(responseText) as unknown; + } catch (cause) { + const message = cause instanceof Error ? cause.message : String(cause); + return err(`invalid_response_json:${message}`); + } + if (!isRecord(parsed)) { + return err("invalid_response_top_level"); + } + const choices = parsed.choices; + if (!Array.isArray(choices) || choices.length === 0) { + return err("no_choices_in_response"); + } + const firstChoice = choices[0]; + if (!isRecord(firstChoice)) { + return err("invalid_choice"); + } + const messageObj = firstChoice.message; + if (!isRecord(messageObj)) { + return err("invalid_message"); + } + return ok(messageObj); +} + +function normalizeToolCalls(toolCallsRaw: unknown[]): Result { + const toolCalls: ToolCall[] = []; + for (const tc of toolCallsRaw) { + if (!isRecord(tc)) { + return err("invalid_tool_call"); + } + const id = tc.id; + const tcType = tc.type; + const fn = tc.function; + if (typeof id !== "string" || tcType !== "function" || !isRecord(fn)) { + return err("invalid_tool_call_shape"); + } + const name = fn.name; + const argumentsStr = fn.arguments; + if (typeof name !== "string" || typeof argumentsStr !== "string") { + return err("invalid_tool_call_function"); + } + toolCalls.push({ id, type: "function", function: { name, arguments: argumentsStr } }); + } + return ok(toolCalls); +} + +type AssistantTurn = + | { kind: "plain_json"; value: T } + | { kind: "tool_calls"; calls: ToolCall[]; assistantContent: string | null }; + +type AssistantTurnOrCorrection = + | AssistantTurn + | { kind: "plain_json_invalid"; rawContent: string; correction: string }; + +function classifyAssistantTurn( + messageObj: Record, + schema: z.ZodType, + structuredToolName: string, +): Result, string> { + const toolCallsRaw = messageObj.tool_calls; + if (!Array.isArray(toolCallsRaw) || toolCallsRaw.length === 0) { + const content = messageObj.content; + if (typeof content !== "string") { + return err("no_tool_calls_and_no_string_content"); + } + const jsonParsed = tryParseJsonContent(content); + if (jsonParsed === null) { + return ok({ + kind: "plain_json_invalid", + rawContent: content, + correction: `Your previous reply was not valid JSON and contained no tool calls. Reply with a single JSON object that matches the schema, or call the ${structuredToolName} tool with the structured arguments.`, + }); + } + const validated = schema.safeParse(jsonParsed); + if (!validated.success) { + return ok({ + kind: "plain_json_invalid", + rawContent: content, + correction: `Your previous JSON reply did not satisfy the schema: ${validated.error.message}. Reply again with a JSON object that matches the schema, or call the ${structuredToolName} tool with the structured arguments.`, + }); + } + return ok({ kind: "plain_json", value: validated.data }); + } + const callsResult = normalizeToolCalls(toolCallsRaw); + if (!callsResult.ok) { + return err(callsResult.error); + } + const assistantContent = messageObj.content; + return ok({ + kind: "tool_calls", + calls: callsResult.value, + assistantContent: typeof assistantContent === "string" ? assistantContent : null, + }); +} + +function toolNamesFromDefinitions(tools: readonly { function: { name: string } }[]): Set { + return new Set(tools.map((t) => t.function.name)); +} + +function appendStructuredToolResult( + tc: ToolCall, + schema: z.ZodType, + messages: ChatMessage[], +): T | null { + let parsedArgs: unknown; + try { + parsedArgs = JSON.parse(tc.function.arguments) as unknown; + } catch { + messages.push({ + role: "tool", + tool_call_id: tc.id, + content: + "Tool arguments were not valid JSON. Provide valid JSON object arguments matching the schema.", + }); + return null; + } + const validated = schema.safeParse(parsedArgs); + if (!validated.success) { + messages.push({ + role: "tool", + tool_call_id: tc.id, + content: `Schema validation failed: ${validated.error.message}. Fix the arguments and call the tool again with a JSON object that matches the schema.`, + }); + return null; + } + messages.push({ + role: "tool", + tool_call_id: tc.id, + content: '{"ok":true}', + }); + return validated.data; +} + +async function dispatchToolCall( + tc: ToolCall, + spec: StructuredToolSpec, + knownNames: Set, + schema: z.ZodType, + thread: TThread, + toolHandler: ThreadReactorConfig["toolHandler"], + messages: ChatMessage[], +): Promise { + if (!knownNames.has(tc.function.name)) { + messages.push({ + role: "tool", + tool_call_id: tc.id, + content: `Unknown tool: ${tc.function.name}. Use one of the declared tools only.`, + }); + return null; + } + if (tc.function.name === spec.name) { + return appendStructuredToolResult(tc, schema, messages); + } + let toolContent: string; + try { + toolContent = await toolHandler(tc, thread); + } catch (cause) { + const message = cause instanceof Error ? cause.message : String(cause); + toolContent = `Tool execution failed: ${message}`; + } + messages.push({ + role: "tool", + tool_call_id: tc.id, + content: toolContent, + }); + return null; +} + +async function resolveToolCallRound( + turn: Extract, { kind: "tool_calls" }>, + spec: StructuredToolSpec, + knownNames: Set, + schema: z.ZodType, + thread: TThread, + toolHandler: ThreadReactorConfig["toolHandler"], + messages: ChatMessage[], +): Promise | null> { + messages.push({ + role: "assistant", + content: turn.assistantContent, + tool_calls: turn.calls, + }); + let extractedRound: T | null = null; + for (const tc of turn.calls) { + const extracted = await dispatchToolCall( + tc, + spec, + knownNames, + schema, + thread, + toolHandler, + messages, + ); + if (extracted !== null) { + extractedRound = extracted; + } + } + if (extractedRound !== null) { + return ok(extractedRound); + } + return null; +} + +async function runOneReactRound( + config: ThreadReactorConfig, + args: { thread: TThread; schema: z.ZodType }, + tools: readonly ToolDefinition[], + knownNames: Set, + spec: StructuredToolSpec, + messages: ChatMessage[], +): Promise | null> { + const bodyResult = await config.llm({ messages, tools }); + if (!bodyResult.ok) { + return bodyResult; + } + + const msgResult = firstAssistantMessage(bodyResult.value); + if (!msgResult.ok) { + return msgResult; + } + + const classified = classifyAssistantTurn(msgResult.value, args.schema, spec.name); + if (!classified.ok) { + return classified; + } + + const turn = classified.value; + if (turn.kind === "plain_json") { + return ok(turn.value); + } + + if (turn.kind === "plain_json_invalid") { + messages.push({ role: "assistant", content: turn.rawContent }); + messages.push({ role: "user", content: turn.correction }); + return null; + } + + return resolveToolCallRound( + turn, + spec, + knownNames, + args.schema, + args.thread, + config.toolHandler, + messages, + ); +} + +/** + * Generic ReAct loop: LLM round-trips with tools until structured output validates, + * plain JSON matches schema, or {@link ThreadReactorConfig.maxRounds} is exceeded. + */ +export function createThreadReactor( + config: ThreadReactorConfig, +): ThreadReactorFn { + return async (args: { + thread: TThread; + input: string; + schema: z.ZodType; + }): Promise> => { + const spec = config.structuredToolFromSchema(args.schema); + const tools = [...config.staticTools, spec.tool]; + const knownNames = toolNamesFromDefinitions(tools); + const systemPrompt = config.systemPromptForStructuredTool(spec.name); + + const messages: ChatMessage[] = [ + { role: "system", content: systemPrompt }, + { role: "user", content: args.input }, + ]; + + for (let round = 0; round < config.maxRounds; round++) { + const step = await runOneReactRound( + config, + { thread: args.thread, schema: args.schema }, + tools, + knownNames, + spec, + messages, + ); + if (step !== null) { + return step; + } + } + + return err("max_react_rounds_exceeded"); + }; +} diff --git a/packages/workflow-reactor/src/types.ts b/packages/workflow-reactor/src/types.ts new file mode 100644 index 0000000..158d179 --- /dev/null +++ b/packages/workflow-reactor/src/types.ts @@ -0,0 +1,62 @@ +import type * as z from "zod/v4"; + +import type { Result } from "@uncaged/workflow-protocol"; + +export type ToolCall = { + id: string; + type: "function"; + function: { name: string; arguments: string }; +}; + +export type ToolDefinition = { + type: "function"; + function: { + name: string; + description: string; + parameters: Record; + }; +}; + +export type ChatMessage = + | { role: "system"; content: string } + | { role: "user"; content: string } + | { + role: "assistant"; + content: string | null; + tool_calls: ToolCall[]; + } + | { role: "assistant"; content: string } + | { role: "tool"; tool_call_id: string; content: string }; + +export type LlmFn = (input: { + messages: ChatMessage[]; + tools: readonly ToolDefinition[]; +}) => Promise>; + +/** Structured tool derived from the per-invocation Zod schema (e.g. extract tool). */ +export type StructuredToolSpec = { + name: string; + tool: ToolDefinition; +}; + +export type ThreadReactorConfig = { + llm: LlmFn; + /** Static tools (e.g. cas_get); structured tool is appended per invocation. */ + staticTools: readonly ToolDefinition[]; + /** Builds the schema-shaped tool and its OpenAI name for this invocation. */ + structuredToolFromSchema: (schema: z.ZodType) => StructuredToolSpec; + /** System prompt for this run; include the structured tool name for cache stability per schema. */ + systemPromptForStructuredTool: (structuredToolName: string) => string; + toolHandler: (call: ToolCall, thread: TThread) => Promise; + maxRounds: number; +}; + +export type ThreadReactorInvokeArgs = { + thread: TThread; + input: string; + schema: z.ZodType; +}; + +export type ThreadReactorFn = ( + args: ThreadReactorInvokeArgs, +) => Promise>; diff --git a/packages/workflow-reactor/tsconfig.json b/packages/workflow-reactor/tsconfig.json new file mode 100644 index 0000000..778d958 --- /dev/null +++ b/packages/workflow-reactor/tsconfig.json @@ -0,0 +1,11 @@ +{ + "extends": "../../tsconfig.json", + "compilerOptions": { + "rootDir": "src", + "outDir": "dist" + }, + "include": ["src"], + "references": [ + { "path": "../workflow-protocol" } + ] +}