From b509d1715e3f0262d3107369edb9aecc4275d872 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Mon, 11 May 2026 10:25:31 +0000 Subject: [PATCH] refactor: extract shared CAS reactor pattern into cas-reactor.ts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Deduplicate CAS_GET_TOOL_DEFINITION, isRecord, toolHandler, and structuredToolFromSchema between summarizer.ts and extract-fn.ts. Both now use createCasReactor(provider, cas, opts) and only provide their own systemPrompt. 小橘 --- packages/workflow-execute/src/cas-reactor.ts | 79 +++++++++++++++++++ .../workflow-execute/src/engine/summarizer.ts | 62 +-------------- .../src/extract/extract-fn.ts | 64 +-------------- 3 files changed, 84 insertions(+), 121 deletions(-) create mode 100644 packages/workflow-execute/src/cas-reactor.ts diff --git a/packages/workflow-execute/src/cas-reactor.ts b/packages/workflow-execute/src/cas-reactor.ts new file mode 100644 index 0000000..a1a69a3 --- /dev/null +++ b/packages/workflow-execute/src/cas-reactor.ts @@ -0,0 +1,79 @@ +import type { CasStore } from "@uncaged/workflow-cas"; +import type { ThreadReactorFn } from "@uncaged/workflow-reactor"; +import { createLlmFn, createThreadReactor } from "@uncaged/workflow-reactor"; +import type { LlmProvider } from "@uncaged/workflow-runtime"; + +import { extractFunctionToolFromZodSchema } from "./extract/index.js"; + +export type CasReactorThread = { + cas: CasStore; +}; + +const CAS_GET_TOOL_DEFINITION = { + type: "function" as const, + function: { + name: "cas_get", + description: + "Read a Merkle DAG node from content-addressed storage by its hash. Returns YAML-formatted node with type, payload, and refs or children fields (content nodes use refs).", + parameters: { + type: "object", + properties: { + hash: { type: "string", description: "The CAS hash to retrieve" }, + }, + required: ["hash"], + }, + }, +}; + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} + +export type CasReactorOpts = { + maxRounds: number; + systemPromptForStructuredTool: (structuredToolName: string) => string; +}; + +export function createCasReactor( + provider: LlmProvider, + cas: CasStore, + opts: CasReactorOpts, +): ThreadReactorFn { + return createThreadReactor({ + llm: createLlmFn(provider), + maxRounds: opts.maxRounds, + staticTools: [CAS_GET_TOOL_DEFINITION], + structuredToolFromSchema: (schema) => { + const t = extractFunctionToolFromZodSchema(schema); + return { + name: t.name, + tool: { + type: "function" as const, + function: { + name: t.name, + description: t.description, + parameters: t.parameters, + }, + }, + }; + }, + systemPromptForStructuredTool: opts.systemPromptForStructuredTool, + toolHandler: async (call, _thread) => { + if (call.function.name !== "cas_get") { + return `Unknown tool: ${call.function.name}`; + } + let hash: string; + try { + const ta = JSON.parse(call.function.arguments) as unknown; + if (!isRecord(ta) || typeof ta.hash !== "string") { + return 'cas_get requires {"hash": ""}.'; + } + hash = ta.hash; + } catch { + return "cas_get arguments were not valid JSON."; + } + const blob = await cas.get(hash); + return blob === null ? "null" : blob; + }, + }); +} diff --git a/packages/workflow-execute/src/engine/summarizer.ts b/packages/workflow-execute/src/engine/summarizer.ts index d5dca16..56503dc 100644 --- a/packages/workflow-execute/src/engine/summarizer.ts +++ b/packages/workflow-execute/src/engine/summarizer.ts @@ -1,10 +1,9 @@ import type { CasStore } from "@uncaged/workflow-cas"; -import { createLlmFn, createThreadReactor } from "@uncaged/workflow-reactor"; import type { LlmProvider } from "@uncaged/workflow-runtime"; import type { LogFn } from "@uncaged/workflow-util"; import * as z from "zod/v4"; -import { extractFunctionToolFromZodSchema } from "../extract/index.js"; +import { createCasReactor } from "../cas-reactor.js"; const SUMMARIZER_MAX_REACT_ROUNDS = 4; const SUMMARIZER_RECENT_STEP_LIMIT = 20; @@ -14,30 +13,6 @@ const summarySchema = z.object({ summary: z.string() }).meta({ description: "A concise summary of the completed workflow's results and outcome.", }); -type SummarizerThreadContext = { - cas: CasStore; -}; - -const CAS_GET_TOOL_DEFINITION = { - type: "function" as const, - function: { - name: "cas_get", - description: - "Read a Merkle DAG node from content-addressed storage by its hash. Returns YAML-formatted node with type, payload, and refs.", - parameters: { - type: "object", - properties: { - hash: { type: "string", description: "The CAS hash to retrieve" }, - }, - required: ["hash"], - }, - }, -}; - -function isRecord(value: unknown): value is Record { - return typeof value === "object" && value !== null && !Array.isArray(value); -} - function buildSummarizerInput(args: { prompt: string; recentSteps: readonly { role: string; contentHash: string }[]; @@ -57,43 +32,10 @@ export type SummarizeFn = (args: { }) => Promise; export function createSummarizer(provider: LlmProvider, cas: CasStore): SummarizeFn { - const reactor = createThreadReactor({ - llm: createLlmFn(provider), + const reactor = createCasReactor(provider, cas, { maxRounds: SUMMARIZER_MAX_REACT_ROUNDS, - staticTools: [CAS_GET_TOOL_DEFINITION], - structuredToolFromSchema: (schema) => { - const t = extractFunctionToolFromZodSchema(schema); - return { - name: t.name, - tool: { - type: "function" as const, - function: { - name: t.name, - description: t.description, - parameters: t.parameters, - }, - }, - }; - }, systemPromptForStructuredTool: (structuredToolName) => `You summarize completed workflow threads. You have access to cas_get to read step content by hash. After reviewing the steps, call the ${structuredToolName} tool with a concise summary of the workflow outcome and results. Or reply with only a JSON object such as {"summary":"..."}.`, - toolHandler: async (call, thread) => { - if (call.function.name !== "cas_get") { - return `Unknown tool: ${call.function.name}`; - } - let hash: string; - try { - const ta = JSON.parse(call.function.arguments) as unknown; - if (!isRecord(ta) || typeof ta.hash !== "string") { - return 'cas_get requires {"hash": ""}.'; - } - hash = ta.hash; - } catch { - return "cas_get arguments were not valid JSON."; - } - const blob = await thread.cas.get(hash); - return blob === null ? "null" : blob; - }, }); return async (args) => { diff --git a/packages/workflow-execute/src/extract/extract-fn.ts b/packages/workflow-execute/src/extract/extract-fn.ts index cbd919f..b577b3b 100644 --- a/packages/workflow-execute/src/extract/extract-fn.ts +++ b/packages/workflow-execute/src/extract/extract-fn.ts @@ -1,8 +1,8 @@ import { type CasStore, getContentMerklePayload } from "@uncaged/workflow-cas"; -import { createLlmFn, createThreadReactor } from "@uncaged/workflow-reactor"; import type { ExtractFn, ExtractResult, LlmProvider } from "@uncaged/workflow-runtime"; import type * as z from "zod/v4"; -import { extractFunctionToolFromZodSchema } from "./llm-extract.js"; + +import { createCasReactor } from "../cas-reactor.js"; export type ExtractDeps = { cas: CasStore; @@ -10,30 +10,6 @@ export type ExtractDeps = { const MAX_REACT_ROUNDS = 10; -const CAS_GET_TOOL_DEFINITION = { - type: "function" as const, - function: { - name: "cas_get", - description: - "Read a Merkle DAG node from content-addressed storage by its hash. Returns YAML-formatted node with type, payload, and refs or children fields (content nodes use refs).", - parameters: { - type: "object", - properties: { - hash: { type: "string", description: "The CAS hash to retrieve" }, - }, - required: ["hash"], - }, - }, -}; - -type ExtractThreadContext = { - cas: CasStore; -}; - -function isRecord(value: unknown): value is Record { - return typeof value === "object" && value !== null && !Array.isArray(value); -} - /** * Create an ExtractFn backed by an LLM provider. * @@ -42,44 +18,10 @@ function isRecord(value: unknown): value is Record { * assistant reply as a short-circuit, which covers the legacy "single" extraction path. */ export function createExtract(provider: LlmProvider, deps: ExtractDeps): ExtractFn { - const llm = createLlmFn(provider); - const reactor = createThreadReactor({ - llm, + const reactor = createCasReactor(provider, deps.cas, { maxRounds: MAX_REACT_ROUNDS, - staticTools: [CAS_GET_TOOL_DEFINITION], - structuredToolFromSchema: (schema) => { - const t = extractFunctionToolFromZodSchema(schema); - return { - name: t.name, - tool: { - type: "function" as const, - function: { - name: t.name, - description: t.description, - parameters: t.parameters, - }, - }, - }; - }, systemPromptForStructuredTool: (structuredToolName) => `You extract structured metadata from content. The content is from a CAS node. Use cas_get to read referenced nodes if needed. When ready, call the ${structuredToolName} tool with JSON matching the schema. You may instead reply with only a JSON object (no prose) when no tools are needed.`, - toolHandler: async (call, thread) => { - if (call.function.name !== "cas_get") { - return `Unexpected tool routed to handler: ${call.function.name}`; - } - let hash: string; - try { - const ta = JSON.parse(call.function.arguments) as unknown; - if (!isRecord(ta) || typeof ta.hash !== "string") { - return 'cas_get requires a JSON object with a string "hash" field.'; - } - hash = ta.hash; - } catch { - return 'cas_get arguments were not valid JSON. Provide {"hash": ""}.'; - } - const blob = await thread.cas.get(hash); - return blob === null ? "null" : blob; - }, }); return async >(