From b93f6e736fb47cb0ea9575e099265d0ade4d306a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Mon, 11 May 2026 10:11:31 +0000 Subject: [PATCH] feat: generate LLM summary in __end__ node via ReAct loop MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Instead of hardcoding 'completed: moderator returned END', the engine now calls a summarizer (ReAct loop with cas_get tool) to produce a meaningful summary of the workflow outcome before writing the __end__ node. - New summarizer.ts following supervisor.ts pattern - Uses extract scene LLM provider (falls back to raw completion.summary on failure) - Tracks step contentHashes for summarizer context - Schema: z.object({ summary: z.string() }) Refs #187, #188 小橘 --- .../workflow-execute/src/engine/engine.ts | 25 +++- .../workflow-execute/src/engine/summarizer.ts | 112 ++++++++++++++++++ 2 files changed, 135 insertions(+), 2 deletions(-) create mode 100644 packages/workflow-execute/src/engine/summarizer.ts diff --git a/packages/workflow-execute/src/engine/engine.ts b/packages/workflow-execute/src/engine/engine.ts index c2222a9..6198e78 100644 --- a/packages/workflow-execute/src/engine/engine.ts +++ b/packages/workflow-execute/src/engine/engine.ts @@ -26,6 +26,7 @@ import { END, START } from "@uncaged/workflow-runtime"; import { err, type LogFn, ok, type Result } from "@uncaged/workflow-util"; import { createExtract } from "../extract/index.js"; +import { createSummarizer, type SummarizeFn } from "./summarizer.js"; import { runSupervisor } from "./supervisor.js"; import { appendThreadHistoryEntry, @@ -53,6 +54,7 @@ async function resolveEngineRegistryRuntime( Result< { extract: ReturnType; + summarize: SummarizeFn; workflowConfig: WorkflowConfig; }, string @@ -76,7 +78,11 @@ async function resolveEngineRegistryRuntime( apiKey: ex.apiKey, model: ex.model, }; - return ok({ extract: createExtract(llmProvider, { cas }), workflowConfig: cfg }); + return ok({ + extract: createExtract(llmProvider, { cas }), + summarize: createSummarizer(llmProvider, cas), + workflowConfig: cfg, + }); } async function appendStateForStep(params: { @@ -250,6 +256,7 @@ async function driveWorkflowGenerator(params: { bundleDir: string; startHash: string; chain: ChainState; + summarize: SummarizeFn; }): Promise { const { fn, @@ -262,6 +269,7 @@ async function driveWorkflowGenerator(params: { cas, bundleDir, startHash, + summarize, } = params; let chain: ChainState = params.chain; const gen = fn(thread, runtime); @@ -270,6 +278,10 @@ async function driveWorkflowGenerator(params: { role: s.role, summary: JSON.stringify(s.meta), })); + const summarizerSteps: { role: string; contentHash: string }[] = thread.steps.map((s) => ({ + role: s.role, + contentHash: s.contentHash, + })); while (true) { if (executeOptions.signal.aborted) { @@ -288,13 +300,20 @@ async function driveWorkflowGenerator(params: { if (iterResult.done) { logger("F3HN8QKP", `thread ${threadId} generator finished`); + const rawCompletion = iterResult.value; + const llmSummary = await summarize({ + prompt: thread.start.content, + recentSteps: summarizerSteps, + fallback: rawCompletion.summary, + logger, + }); return await finalizeThread({ cas, bundleDir, threadId, startHash, chain, - completion: iterResult.value, + completion: { ...rawCompletion, summary: llmSummary }, }); } @@ -320,6 +339,7 @@ async function driveWorkflowGenerator(params: { role: step.role, summary: JSON.stringify(step.meta), }); + summarizerSteps.push({ role: step.role, contentHash: step.contentHash }); await Promise.race([ executeOptions.awaitAfterEachYield(), @@ -499,5 +519,6 @@ export async function executeThread( bundleDir, startHash, chain, + summarize: registryRuntime.value.summarize, }); } diff --git a/packages/workflow-execute/src/engine/summarizer.ts b/packages/workflow-execute/src/engine/summarizer.ts new file mode 100644 index 0000000..d5dca16 --- /dev/null +++ b/packages/workflow-execute/src/engine/summarizer.ts @@ -0,0 +1,112 @@ +import type { CasStore } from "@uncaged/workflow-cas"; +import { createLlmFn, createThreadReactor } from "@uncaged/workflow-reactor"; +import type { LlmProvider } from "@uncaged/workflow-runtime"; +import type { LogFn } from "@uncaged/workflow-util"; +import * as z from "zod/v4"; + +import { extractFunctionToolFromZodSchema } from "../extract/index.js"; + +const SUMMARIZER_MAX_REACT_ROUNDS = 4; +const SUMMARIZER_RECENT_STEP_LIMIT = 20; + +const summarySchema = z.object({ summary: z.string() }).meta({ + title: "workflow_summary", + description: "A concise summary of the completed workflow's results and outcome.", +}); + +type SummarizerThreadContext = { + cas: CasStore; +}; + +const CAS_GET_TOOL_DEFINITION = { + type: "function" as const, + function: { + name: "cas_get", + description: + "Read a Merkle DAG node from content-addressed storage by its hash. Returns YAML-formatted node with type, payload, and refs.", + parameters: { + type: "object", + properties: { + hash: { type: "string", description: "The CAS hash to retrieve" }, + }, + required: ["hash"], + }, + }, +}; + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} + +function buildSummarizerInput(args: { + prompt: string; + recentSteps: readonly { role: string; contentHash: string }[]; +}): string { + const recent = args.recentSteps.slice(-SUMMARIZER_RECENT_STEP_LIMIT); + const stepsBlock = recent + .map((s, i) => `${i + 1}. [${s.role}] contentHash: ${s.contentHash}`) + .join("\n"); + return `Original task:\n${args.prompt}\n\nCompleted steps (oldest first):\n${stepsBlock === "" ? "(none)" : stepsBlock}\n\nUse cas_get to read step content if needed. Summarize the workflow outcome concisely.`; +} + +export type SummarizeFn = (args: { + prompt: string; + recentSteps: readonly { role: string; contentHash: string }[]; + fallback: string; + logger: LogFn; +}) => Promise; + +export function createSummarizer(provider: LlmProvider, cas: CasStore): SummarizeFn { + const reactor = createThreadReactor({ + llm: createLlmFn(provider), + maxRounds: SUMMARIZER_MAX_REACT_ROUNDS, + staticTools: [CAS_GET_TOOL_DEFINITION], + structuredToolFromSchema: (schema) => { + const t = extractFunctionToolFromZodSchema(schema); + return { + name: t.name, + tool: { + type: "function" as const, + function: { + name: t.name, + description: t.description, + parameters: t.parameters, + }, + }, + }; + }, + systemPromptForStructuredTool: (structuredToolName) => + `You summarize completed workflow threads. You have access to cas_get to read step content by hash. After reviewing the steps, call the ${structuredToolName} tool with a concise summary of the workflow outcome and results. Or reply with only a JSON object such as {"summary":"..."}.`, + toolHandler: async (call, thread) => { + if (call.function.name !== "cas_get") { + return `Unknown tool: ${call.function.name}`; + } + let hash: string; + try { + const ta = JSON.parse(call.function.arguments) as unknown; + if (!isRecord(ta) || typeof ta.hash !== "string") { + return 'cas_get requires {"hash": ""}.'; + } + hash = ta.hash; + } catch { + return "cas_get arguments were not valid JSON."; + } + const blob = await thread.cas.get(hash); + return blob === null ? "null" : blob; + }, + }); + + return async (args) => { + const result = await reactor({ + thread: { cas }, + input: buildSummarizerInput(args), + schema: summarySchema, + }); + if (!result.ok) { + args.logger("P2WX7KNR", `summarizer failed: ${result.error}`); + return args.fallback; + } + args.logger("Q5MT3VBF", "summarizer produced workflow summary"); + return result.value.summary; + }; +}