From b93f6e736fb47cb0ea9575e099265d0ade4d306a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Mon, 11 May 2026 10:11:31 +0000 Subject: [PATCH 1/3] feat: generate LLM summary in __end__ node via ReAct loop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Instead of hardcoding 'completed: moderator returned END', the engine now calls a summarizer (ReAct loop with cas_get tool) to produce a meaningful summary of the workflow outcome before writing the __end__ node. - New summarizer.ts following supervisor.ts pattern - Uses extract scene LLM provider (falls back to raw completion.summary on failure) - Tracks step contentHashes for summarizer context - Schema: z.object({ summary: z.string() }) Refs #187, #188 小橘 --- .../workflow-execute/src/engine/engine.ts | 25 +++- .../workflow-execute/src/engine/summarizer.ts | 112 ++++++++++++++++++ 2 files changed, 135 insertions(+), 2 deletions(-) create mode 100644 packages/workflow-execute/src/engine/summarizer.ts diff --git a/packages/workflow-execute/src/engine/engine.ts b/packages/workflow-execute/src/engine/engine.ts index c2222a9..6198e78 100644 --- a/packages/workflow-execute/src/engine/engine.ts +++ b/packages/workflow-execute/src/engine/engine.ts @@ -26,6 +26,7 @@ import { END, START } from "@uncaged/workflow-runtime"; import { err, type LogFn, ok, type Result } from "@uncaged/workflow-util"; import { createExtract } from "../extract/index.js"; +import { createSummarizer, type SummarizeFn } from "./summarizer.js"; import { runSupervisor } from "./supervisor.js"; import { appendThreadHistoryEntry, @@ -53,6 +54,7 @@ async function resolveEngineRegistryRuntime( Result< { extract: ReturnType; + summarize: SummarizeFn; workflowConfig: WorkflowConfig; }, string @@ -76,7 +78,11 @@ async function resolveEngineRegistryRuntime( apiKey: ex.apiKey, model: ex.model, }; - return ok({ extract: createExtract(llmProvider, { cas }), workflowConfig: cfg }); + return ok({ + extract: createExtract(llmProvider, { cas }), + summarize: createSummarizer(llmProvider, cas), + workflowConfig: cfg, + }); } async function appendStateForStep(params: { @@ -250,6 +256,7 @@ async function driveWorkflowGenerator(params: { bundleDir: string; startHash: string; chain: ChainState; + summarize: SummarizeFn; }): Promise { const { fn, @@ -262,6 +269,7 @@ async function driveWorkflowGenerator(params: { cas, bundleDir, startHash, + summarize, } = params; let chain: ChainState = params.chain; const gen = fn(thread, runtime); @@ -270,6 +278,10 @@ async function driveWorkflowGenerator(params: { role: s.role, summary: JSON.stringify(s.meta), })); + const summarizerSteps: { role: string; contentHash: string }[] = thread.steps.map((s) => ({ + role: s.role, + contentHash: s.contentHash, + })); while (true) { if (executeOptions.signal.aborted) { @@ -288,13 +300,20 @@ async function driveWorkflowGenerator(params: { if (iterResult.done) { logger("F3HN8QKP", `thread ${threadId} generator finished`); + const rawCompletion = iterResult.value; + const llmSummary = await summarize({ + prompt: thread.start.content, + recentSteps: summarizerSteps, + fallback: rawCompletion.summary, + logger, + }); return await finalizeThread({ cas, bundleDir, threadId, startHash, chain, - completion: iterResult.value, + completion: { ...rawCompletion, summary: llmSummary }, }); } @@ -320,6 +339,7 @@ async function driveWorkflowGenerator(params: { role: step.role, summary: JSON.stringify(step.meta), }); + summarizerSteps.push({ role: step.role, contentHash: step.contentHash }); await Promise.race([ executeOptions.awaitAfterEachYield(), @@ -499,5 +519,6 @@ export async function executeThread( bundleDir, startHash, chain, + summarize: registryRuntime.value.summarize, }); } diff --git a/packages/workflow-execute/src/engine/summarizer.ts b/packages/workflow-execute/src/engine/summarizer.ts new file mode 100644 index 0000000..d5dca16 --- /dev/null +++ b/packages/workflow-execute/src/engine/summarizer.ts @@ -0,0 +1,112 @@ +import type { CasStore } from "@uncaged/workflow-cas"; +import { createLlmFn, createThreadReactor } from "@uncaged/workflow-reactor"; +import type { LlmProvider } from "@uncaged/workflow-runtime"; +import type { LogFn } from "@uncaged/workflow-util"; +import * as z from "zod/v4"; + +import { extractFunctionToolFromZodSchema } from "../extract/index.js"; + +const SUMMARIZER_MAX_REACT_ROUNDS = 4; +const SUMMARIZER_RECENT_STEP_LIMIT = 20; + +const summarySchema = z.object({ summary: z.string() }).meta({ + title: "workflow_summary", + description: "A concise summary of the completed workflow's results and outcome.", +}); + +type SummarizerThreadContext = { + cas: CasStore; +}; + +const CAS_GET_TOOL_DEFINITION = { + type: "function" as const, + function: { + name: "cas_get", + description: + "Read a Merkle DAG node from content-addressed storage by its hash. Returns YAML-formatted node with type, payload, and refs.", + parameters: { + type: "object", + properties: { + hash: { type: "string", description: "The CAS hash to retrieve" }, + }, + required: ["hash"], + }, + }, +}; + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} + +function buildSummarizerInput(args: { + prompt: string; + recentSteps: readonly { role: string; contentHash: string }[]; +}): string { + const recent = args.recentSteps.slice(-SUMMARIZER_RECENT_STEP_LIMIT); + const stepsBlock = recent + .map((s, i) => `${i + 1}. [${s.role}] contentHash: ${s.contentHash}`) + .join("\n"); + return `Original task:\n${args.prompt}\n\nCompleted steps (oldest first):\n${stepsBlock === "" ? "(none)" : stepsBlock}\n\nUse cas_get to read step content if needed. Summarize the workflow outcome concisely.`; +} + +export type SummarizeFn = (args: { + prompt: string; + recentSteps: readonly { role: string; contentHash: string }[]; + fallback: string; + logger: LogFn; +}) => Promise; + +export function createSummarizer(provider: LlmProvider, cas: CasStore): SummarizeFn { + const reactor = createThreadReactor({ + llm: createLlmFn(provider), + maxRounds: SUMMARIZER_MAX_REACT_ROUNDS, + staticTools: [CAS_GET_TOOL_DEFINITION], + structuredToolFromSchema: (schema) => { + const t = extractFunctionToolFromZodSchema(schema); + return { + name: t.name, + tool: { + type: "function" as const, + function: { + name: t.name, + description: t.description, + parameters: t.parameters, + }, + }, + }; + }, + systemPromptForStructuredTool: (structuredToolName) => + `You summarize completed workflow threads. You have access to cas_get to read step content by hash. After reviewing the steps, call the ${structuredToolName} tool with a concise summary of the workflow outcome and results. Or reply with only a JSON object such as {"summary":"..."}.`, + toolHandler: async (call, thread) => { + if (call.function.name !== "cas_get") { + return `Unknown tool: ${call.function.name}`; + } + let hash: string; + try { + const ta = JSON.parse(call.function.arguments) as unknown; + if (!isRecord(ta) || typeof ta.hash !== "string") { + return 'cas_get requires {"hash": ""}.'; + } + hash = ta.hash; + } catch { + return "cas_get arguments were not valid JSON."; + } + const blob = await thread.cas.get(hash); + return blob === null ? "null" : blob; + }, + }); + + return async (args) => { + const result = await reactor({ + thread: { cas }, + input: buildSummarizerInput(args), + schema: summarySchema, + }); + if (!result.ok) { + args.logger("P2WX7KNR", `summarizer failed: ${result.error}`); + return args.fallback; + } + args.logger("Q5MT3VBF", "summarizer produced workflow summary"); + return result.value.summary; + }; +} From b509d1715e3f0262d3107369edb9aecc4275d872 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Mon, 11 May 2026 10:25:31 +0000 Subject: [PATCH 2/3] refactor: extract shared CAS reactor pattern into cas-reactor.ts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Deduplicate CAS_GET_TOOL_DEFINITION, isRecord, toolHandler, and structuredToolFromSchema between summarizer.ts and extract-fn.ts. Both now use createCasReactor(provider, cas, opts) and only provide their own systemPrompt. 小橘 --- packages/workflow-execute/src/cas-reactor.ts | 79 +++++++++++++++++++ .../workflow-execute/src/engine/summarizer.ts | 62 +-------------- .../src/extract/extract-fn.ts | 64 +-------------- 3 files changed, 84 insertions(+), 121 deletions(-) create mode 100644 packages/workflow-execute/src/cas-reactor.ts diff --git a/packages/workflow-execute/src/cas-reactor.ts b/packages/workflow-execute/src/cas-reactor.ts new file mode 100644 index 0000000..a1a69a3 --- /dev/null +++ b/packages/workflow-execute/src/cas-reactor.ts @@ -0,0 +1,79 @@ +import type { CasStore } from "@uncaged/workflow-cas"; +import type { ThreadReactorFn } from "@uncaged/workflow-reactor"; +import { createLlmFn, createThreadReactor } from "@uncaged/workflow-reactor"; +import type { LlmProvider } from "@uncaged/workflow-runtime"; + +import { extractFunctionToolFromZodSchema } from "./extract/index.js"; + +export type CasReactorThread = { + cas: CasStore; +}; + +const CAS_GET_TOOL_DEFINITION = { + type: "function" as const, + function: { + name: "cas_get", + description: + "Read a Merkle DAG node from content-addressed storage by its hash. Returns YAML-formatted node with type, payload, and refs or children fields (content nodes use refs).", + parameters: { + type: "object", + properties: { + hash: { type: "string", description: "The CAS hash to retrieve" }, + }, + required: ["hash"], + }, + }, +}; + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} + +export type CasReactorOpts = { + maxRounds: number; + systemPromptForStructuredTool: (structuredToolName: string) => string; +}; + +export function createCasReactor( + provider: LlmProvider, + cas: CasStore, + opts: CasReactorOpts, +): ThreadReactorFn { + return createThreadReactor({ + llm: createLlmFn(provider), + maxRounds: opts.maxRounds, + staticTools: [CAS_GET_TOOL_DEFINITION], + structuredToolFromSchema: (schema) => { + const t = extractFunctionToolFromZodSchema(schema); + return { + name: t.name, + tool: { + type: "function" as const, + function: { + name: t.name, + description: t.description, + parameters: t.parameters, + }, + }, + }; + }, + systemPromptForStructuredTool: opts.systemPromptForStructuredTool, + toolHandler: async (call, _thread) => { + if (call.function.name !== "cas_get") { + return `Unknown tool: ${call.function.name}`; + } + let hash: string; + try { + const ta = JSON.parse(call.function.arguments) as unknown; + if (!isRecord(ta) || typeof ta.hash !== "string") { + return 'cas_get requires {"hash": ""}.'; + } + hash = ta.hash; + } catch { + return "cas_get arguments were not valid JSON."; + } + const blob = await cas.get(hash); + return blob === null ? "null" : blob; + }, + }); +} diff --git a/packages/workflow-execute/src/engine/summarizer.ts b/packages/workflow-execute/src/engine/summarizer.ts index d5dca16..56503dc 100644 --- a/packages/workflow-execute/src/engine/summarizer.ts +++ b/packages/workflow-execute/src/engine/summarizer.ts @@ -1,10 +1,9 @@ import type { CasStore } from "@uncaged/workflow-cas"; -import { createLlmFn, createThreadReactor } from "@uncaged/workflow-reactor"; import type { LlmProvider } from "@uncaged/workflow-runtime"; import type { LogFn } from "@uncaged/workflow-util"; import * as z from "zod/v4"; -import { extractFunctionToolFromZodSchema } from "../extract/index.js"; +import { createCasReactor } from "../cas-reactor.js"; const SUMMARIZER_MAX_REACT_ROUNDS = 4; const SUMMARIZER_RECENT_STEP_LIMIT = 20; @@ -14,30 +13,6 @@ const summarySchema = z.object({ summary: z.string() }).meta({ description: "A concise summary of the completed workflow's results and outcome.", }); -type SummarizerThreadContext = { - cas: CasStore; -}; - -const CAS_GET_TOOL_DEFINITION = { - type: "function" as const, - function: { - name: "cas_get", - description: - "Read a Merkle DAG node from content-addressed storage by its hash. Returns YAML-formatted node with type, payload, and refs.", - parameters: { - type: "object", - properties: { - hash: { type: "string", description: "The CAS hash to retrieve" }, - }, - required: ["hash"], - }, - }, -}; - -function isRecord(value: unknown): value is Record { - return typeof value === "object" && value !== null && !Array.isArray(value); -} - function buildSummarizerInput(args: { prompt: string; recentSteps: readonly { role: string; contentHash: string }[]; @@ -57,43 +32,10 @@ export type SummarizeFn = (args: { }) => Promise; export function createSummarizer(provider: LlmProvider, cas: CasStore): SummarizeFn { - const reactor = createThreadReactor({ - llm: createLlmFn(provider), + const reactor = createCasReactor(provider, cas, { maxRounds: SUMMARIZER_MAX_REACT_ROUNDS, - staticTools: [CAS_GET_TOOL_DEFINITION], - structuredToolFromSchema: (schema) => { - const t = extractFunctionToolFromZodSchema(schema); - return { - name: t.name, - tool: { - type: "function" as const, - function: { - name: t.name, - description: t.description, - parameters: t.parameters, - }, - }, - }; - }, systemPromptForStructuredTool: (structuredToolName) => `You summarize completed workflow threads. You have access to cas_get to read step content by hash. After reviewing the steps, call the ${structuredToolName} tool with a concise summary of the workflow outcome and results. Or reply with only a JSON object such as {"summary":"..."}.`, - toolHandler: async (call, thread) => { - if (call.function.name !== "cas_get") { - return `Unknown tool: ${call.function.name}`; - } - let hash: string; - try { - const ta = JSON.parse(call.function.arguments) as unknown; - if (!isRecord(ta) || typeof ta.hash !== "string") { - return 'cas_get requires {"hash": ""}.'; - } - hash = ta.hash; - } catch { - return "cas_get arguments were not valid JSON."; - } - const blob = await thread.cas.get(hash); - return blob === null ? "null" : blob; - }, }); return async (args) => { diff --git a/packages/workflow-execute/src/extract/extract-fn.ts b/packages/workflow-execute/src/extract/extract-fn.ts index cbd919f..b577b3b 100644 --- a/packages/workflow-execute/src/extract/extract-fn.ts +++ b/packages/workflow-execute/src/extract/extract-fn.ts @@ -1,8 +1,8 @@ import { type CasStore, getContentMerklePayload } from "@uncaged/workflow-cas"; -import { createLlmFn, createThreadReactor } from "@uncaged/workflow-reactor"; import type { ExtractFn, ExtractResult, LlmProvider } from "@uncaged/workflow-runtime"; import type * as z from "zod/v4"; -import { extractFunctionToolFromZodSchema } from "./llm-extract.js"; + +import { createCasReactor } from "../cas-reactor.js"; export type ExtractDeps = { cas: CasStore; @@ -10,30 +10,6 @@ export type ExtractDeps = { const MAX_REACT_ROUNDS = 10; -const CAS_GET_TOOL_DEFINITION = { - type: "function" as const, - function: { - name: "cas_get", - description: - "Read a Merkle DAG node from content-addressed storage by its hash. Returns YAML-formatted node with type, payload, and refs or children fields (content nodes use refs).", - parameters: { - type: "object", - properties: { - hash: { type: "string", description: "The CAS hash to retrieve" }, - }, - required: ["hash"], - }, - }, -}; - -type ExtractThreadContext = { - cas: CasStore; -}; - -function isRecord(value: unknown): value is Record { - return typeof value === "object" && value !== null && !Array.isArray(value); -} - /** * Create an ExtractFn backed by an LLM provider. * @@ -42,44 +18,10 @@ function isRecord(value: unknown): value is Record { * assistant reply as a short-circuit, which covers the legacy "single" extraction path. */ export function createExtract(provider: LlmProvider, deps: ExtractDeps): ExtractFn { - const llm = createLlmFn(provider); - const reactor = createThreadReactor({ - llm, + const reactor = createCasReactor(provider, deps.cas, { maxRounds: MAX_REACT_ROUNDS, - staticTools: [CAS_GET_TOOL_DEFINITION], - structuredToolFromSchema: (schema) => { - const t = extractFunctionToolFromZodSchema(schema); - return { - name: t.name, - tool: { - type: "function" as const, - function: { - name: t.name, - description: t.description, - parameters: t.parameters, - }, - }, - }; - }, systemPromptForStructuredTool: (structuredToolName) => `You extract structured metadata from content. The content is from a CAS node. Use cas_get to read referenced nodes if needed. When ready, call the ${structuredToolName} tool with JSON matching the schema. You may instead reply with only a JSON object (no prose) when no tools are needed.`, - toolHandler: async (call, thread) => { - if (call.function.name !== "cas_get") { - return `Unexpected tool routed to handler: ${call.function.name}`; - } - let hash: string; - try { - const ta = JSON.parse(call.function.arguments) as unknown; - if (!isRecord(ta) || typeof ta.hash !== "string") { - return 'cas_get requires a JSON object with a string "hash" field.'; - } - hash = ta.hash; - } catch { - return 'cas_get arguments were not valid JSON. Provide {"hash": ""}.'; - } - const blob = await thread.cas.get(hash); - return blob === null ? "null" : blob; - }, }); return async >( From ff3e19fd22c641c32780bcd6a0ee681c6634af26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Mon, 11 May 2026 10:28:54 +0000 Subject: [PATCH 3/3] docs: add comments explaining summarizer constants MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 小橘 --- packages/workflow-execute/src/engine/summarizer.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/workflow-execute/src/engine/summarizer.ts b/packages/workflow-execute/src/engine/summarizer.ts index 56503dc..9b4a32c 100644 --- a/packages/workflow-execute/src/engine/summarizer.ts +++ b/packages/workflow-execute/src/engine/summarizer.ts @@ -5,7 +5,9 @@ import * as z from "zod/v4"; import { createCasReactor } from "../cas-reactor.js"; +/** Max ReAct rounds: 3 cas_get reads + 1 structured output = 4 rounds is sufficient. */ const SUMMARIZER_MAX_REACT_ROUNDS = 4; +/** Only pass the last N steps; each step is just a role+contentHash reference (~60 chars), not full content. */ const SUMMARIZER_RECENT_STEP_LIMIT = 20; const summarySchema = z.object({ summary: z.string() }).meta({