refactor: extract shared CAS reactor pattern into cas-reactor.ts

Deduplicate CAS_GET_TOOL_DEFINITION, isRecord, toolHandler, and
structuredToolFromSchema between summarizer.ts and extract-fn.ts.

Both now use createCasReactor(provider, cas, opts) and only provide
their own systemPrompt.

小橘 <xiaoju@shazhou.work>
This commit is contained in:
2026-05-11 10:25:31 +00:00
parent b93f6e736f
commit b509d1715e
3 changed files with 84 additions and 121 deletions
@@ -0,0 +1,79 @@
import type { CasStore } from "@uncaged/workflow-cas";
import type { ThreadReactorFn } from "@uncaged/workflow-reactor";
import { createLlmFn, createThreadReactor } from "@uncaged/workflow-reactor";
import type { LlmProvider } from "@uncaged/workflow-runtime";
import { extractFunctionToolFromZodSchema } from "./extract/index.js";
export type CasReactorThread = {
cas: CasStore;
};
const CAS_GET_TOOL_DEFINITION = {
type: "function" as const,
function: {
name: "cas_get",
description:
"Read a Merkle DAG node from content-addressed storage by its hash. Returns YAML-formatted node with type, payload, and refs or children fields (content nodes use refs).",
parameters: {
type: "object",
properties: {
hash: { type: "string", description: "The CAS hash to retrieve" },
},
required: ["hash"],
},
},
};
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
export type CasReactorOpts = {
maxRounds: number;
systemPromptForStructuredTool: (structuredToolName: string) => string;
};
export function createCasReactor(
provider: LlmProvider,
cas: CasStore,
opts: CasReactorOpts,
): ThreadReactorFn<CasReactorThread> {
return createThreadReactor<CasReactorThread>({
llm: createLlmFn(provider),
maxRounds: opts.maxRounds,
staticTools: [CAS_GET_TOOL_DEFINITION],
structuredToolFromSchema: (schema) => {
const t = extractFunctionToolFromZodSchema(schema);
return {
name: t.name,
tool: {
type: "function" as const,
function: {
name: t.name,
description: t.description,
parameters: t.parameters,
},
},
};
},
systemPromptForStructuredTool: opts.systemPromptForStructuredTool,
toolHandler: async (call, _thread) => {
if (call.function.name !== "cas_get") {
return `Unknown tool: ${call.function.name}`;
}
let hash: string;
try {
const ta = JSON.parse(call.function.arguments) as unknown;
if (!isRecord(ta) || typeof ta.hash !== "string") {
return 'cas_get requires {"hash": "<cas-hash>"}.';
}
hash = ta.hash;
} catch {
return "cas_get arguments were not valid JSON.";
}
const blob = await cas.get(hash);
return blob === null ? "null" : blob;
},
});
}
@@ -1,10 +1,9 @@
import type { CasStore } from "@uncaged/workflow-cas";
import { createLlmFn, createThreadReactor } from "@uncaged/workflow-reactor";
import type { LlmProvider } from "@uncaged/workflow-runtime";
import type { LogFn } from "@uncaged/workflow-util";
import * as z from "zod/v4";
import { extractFunctionToolFromZodSchema } from "../extract/index.js";
import { createCasReactor } from "../cas-reactor.js";
const SUMMARIZER_MAX_REACT_ROUNDS = 4;
const SUMMARIZER_RECENT_STEP_LIMIT = 20;
@@ -14,30 +13,6 @@ const summarySchema = z.object({ summary: z.string() }).meta({
description: "A concise summary of the completed workflow's results and outcome.",
});
type SummarizerThreadContext = {
cas: CasStore;
};
const CAS_GET_TOOL_DEFINITION = {
type: "function" as const,
function: {
name: "cas_get",
description:
"Read a Merkle DAG node from content-addressed storage by its hash. Returns YAML-formatted node with type, payload, and refs.",
parameters: {
type: "object",
properties: {
hash: { type: "string", description: "The CAS hash to retrieve" },
},
required: ["hash"],
},
},
};
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
function buildSummarizerInput(args: {
prompt: string;
recentSteps: readonly { role: string; contentHash: string }[];
@@ -57,43 +32,10 @@ export type SummarizeFn = (args: {
}) => Promise<string>;
export function createSummarizer(provider: LlmProvider, cas: CasStore): SummarizeFn {
const reactor = createThreadReactor<SummarizerThreadContext>({
llm: createLlmFn(provider),
const reactor = createCasReactor(provider, cas, {
maxRounds: SUMMARIZER_MAX_REACT_ROUNDS,
staticTools: [CAS_GET_TOOL_DEFINITION],
structuredToolFromSchema: (schema) => {
const t = extractFunctionToolFromZodSchema(schema);
return {
name: t.name,
tool: {
type: "function" as const,
function: {
name: t.name,
description: t.description,
parameters: t.parameters,
},
},
};
},
systemPromptForStructuredTool: (structuredToolName) =>
`You summarize completed workflow threads. You have access to cas_get to read step content by hash. After reviewing the steps, call the ${structuredToolName} tool with a concise summary of the workflow outcome and results. Or reply with only a JSON object such as {"summary":"..."}.`,
toolHandler: async (call, thread) => {
if (call.function.name !== "cas_get") {
return `Unknown tool: ${call.function.name}`;
}
let hash: string;
try {
const ta = JSON.parse(call.function.arguments) as unknown;
if (!isRecord(ta) || typeof ta.hash !== "string") {
return 'cas_get requires {"hash": "<cas-hash>"}.';
}
hash = ta.hash;
} catch {
return "cas_get arguments were not valid JSON.";
}
const blob = await thread.cas.get(hash);
return blob === null ? "null" : blob;
},
});
return async (args) => {
@@ -1,8 +1,8 @@
import { type CasStore, getContentMerklePayload } from "@uncaged/workflow-cas";
import { createLlmFn, createThreadReactor } from "@uncaged/workflow-reactor";
import type { ExtractFn, ExtractResult, LlmProvider } from "@uncaged/workflow-runtime";
import type * as z from "zod/v4";
import { extractFunctionToolFromZodSchema } from "./llm-extract.js";
import { createCasReactor } from "../cas-reactor.js";
export type ExtractDeps = {
cas: CasStore;
@@ -10,30 +10,6 @@ export type ExtractDeps = {
const MAX_REACT_ROUNDS = 10;
const CAS_GET_TOOL_DEFINITION = {
type: "function" as const,
function: {
name: "cas_get",
description:
"Read a Merkle DAG node from content-addressed storage by its hash. Returns YAML-formatted node with type, payload, and refs or children fields (content nodes use refs).",
parameters: {
type: "object",
properties: {
hash: { type: "string", description: "The CAS hash to retrieve" },
},
required: ["hash"],
},
},
};
type ExtractThreadContext = {
cas: CasStore;
};
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
/**
* Create an ExtractFn backed by an LLM provider.
*
@@ -42,44 +18,10 @@ function isRecord(value: unknown): value is Record<string, unknown> {
* assistant reply as a short-circuit, which covers the legacy "single" extraction path.
*/
export function createExtract(provider: LlmProvider, deps: ExtractDeps): ExtractFn {
const llm = createLlmFn(provider);
const reactor = createThreadReactor<ExtractThreadContext>({
llm,
const reactor = createCasReactor(provider, deps.cas, {
maxRounds: MAX_REACT_ROUNDS,
staticTools: [CAS_GET_TOOL_DEFINITION],
structuredToolFromSchema: (schema) => {
const t = extractFunctionToolFromZodSchema(schema);
return {
name: t.name,
tool: {
type: "function" as const,
function: {
name: t.name,
description: t.description,
parameters: t.parameters,
},
},
};
},
systemPromptForStructuredTool: (structuredToolName) =>
`You extract structured metadata from content. The content is from a CAS node. Use cas_get to read referenced nodes if needed. When ready, call the ${structuredToolName} tool with JSON matching the schema. You may instead reply with only a JSON object (no prose) when no tools are needed.`,
toolHandler: async (call, thread) => {
if (call.function.name !== "cas_get") {
return `Unexpected tool routed to handler: ${call.function.name}`;
}
let hash: string;
try {
const ta = JSON.parse(call.function.arguments) as unknown;
if (!isRecord(ta) || typeof ta.hash !== "string") {
return 'cas_get requires a JSON object with a string "hash" field.';
}
hash = ta.hash;
} catch {
return 'cas_get arguments were not valid JSON. Provide {"hash": "<cas-hash>"}.';
}
const blob = await thread.cas.get(hash);
return blob === null ? "null" : blob;
},
});
return async <T extends Record<string, unknown>>(