feat: generate LLM summary in __end__ node via ReAct loop

Instead of hardcoding 'completed: moderator returned END', the engine now
calls a summarizer (ReAct loop with cas_get tool) to produce a meaningful
summary of the workflow outcome before writing the __end__ node.

- New summarizer.ts following supervisor.ts pattern
- Uses extract scene LLM provider (falls back to raw completion.summary on failure)
- Tracks step contentHashes for summarizer context
- Schema: z.object({ summary: z.string() })

Refs #187, #188

小橘 <xiaoju@shazhou.work>
This commit is contained in:
2026-05-11 10:11:31 +00:00
parent ec13c19505
commit b93f6e736f
2 changed files with 135 additions and 2 deletions
+23 -2
View File
@@ -26,6 +26,7 @@ import { END, START } from "@uncaged/workflow-runtime";
import { err, type LogFn, ok, type Result } from "@uncaged/workflow-util";
import { createExtract } from "../extract/index.js";
import { createSummarizer, type SummarizeFn } from "./summarizer.js";
import { runSupervisor } from "./supervisor.js";
import {
appendThreadHistoryEntry,
@@ -53,6 +54,7 @@ async function resolveEngineRegistryRuntime(
Result<
{
extract: ReturnType<typeof createExtract>;
summarize: SummarizeFn;
workflowConfig: WorkflowConfig;
},
string
@@ -76,7 +78,11 @@ async function resolveEngineRegistryRuntime(
apiKey: ex.apiKey,
model: ex.model,
};
return ok({ extract: createExtract(llmProvider, { cas }), workflowConfig: cfg });
return ok({
extract: createExtract(llmProvider, { cas }),
summarize: createSummarizer(llmProvider, cas),
workflowConfig: cfg,
});
}
async function appendStateForStep(params: {
@@ -250,6 +256,7 @@ async function driveWorkflowGenerator(params: {
bundleDir: string;
startHash: string;
chain: ChainState;
summarize: SummarizeFn;
}): Promise<WorkflowResult> {
const {
fn,
@@ -262,6 +269,7 @@ async function driveWorkflowGenerator(params: {
cas,
bundleDir,
startHash,
summarize,
} = params;
let chain: ChainState = params.chain;
const gen = fn(thread, runtime);
@@ -270,6 +278,10 @@ async function driveWorkflowGenerator(params: {
role: s.role,
summary: JSON.stringify(s.meta),
}));
const summarizerSteps: { role: string; contentHash: string }[] = thread.steps.map((s) => ({
role: s.role,
contentHash: s.contentHash,
}));
while (true) {
if (executeOptions.signal.aborted) {
@@ -288,13 +300,20 @@ async function driveWorkflowGenerator(params: {
if (iterResult.done) {
logger("F3HN8QKP", `thread ${threadId} generator finished`);
const rawCompletion = iterResult.value;
const llmSummary = await summarize({
prompt: thread.start.content,
recentSteps: summarizerSteps,
fallback: rawCompletion.summary,
logger,
});
return await finalizeThread({
cas,
bundleDir,
threadId,
startHash,
chain,
completion: iterResult.value,
completion: { ...rawCompletion, summary: llmSummary },
});
}
@@ -320,6 +339,7 @@ async function driveWorkflowGenerator(params: {
role: step.role,
summary: JSON.stringify(step.meta),
});
summarizerSteps.push({ role: step.role, contentHash: step.contentHash });
await Promise.race([
executeOptions.awaitAfterEachYield(),
@@ -499,5 +519,6 @@ export async function executeThread(
bundleDir,
startHash,
chain,
summarize: registryRuntime.value.summarize,
});
}
@@ -0,0 +1,112 @@
import type { CasStore } from "@uncaged/workflow-cas";
import { createLlmFn, createThreadReactor } from "@uncaged/workflow-reactor";
import type { LlmProvider } from "@uncaged/workflow-runtime";
import type { LogFn } from "@uncaged/workflow-util";
import * as z from "zod/v4";
import { extractFunctionToolFromZodSchema } from "../extract/index.js";
const SUMMARIZER_MAX_REACT_ROUNDS = 4;
const SUMMARIZER_RECENT_STEP_LIMIT = 20;
const summarySchema = z.object({ summary: z.string() }).meta({
title: "workflow_summary",
description: "A concise summary of the completed workflow's results and outcome.",
});
type SummarizerThreadContext = {
cas: CasStore;
};
const CAS_GET_TOOL_DEFINITION = {
type: "function" as const,
function: {
name: "cas_get",
description:
"Read a Merkle DAG node from content-addressed storage by its hash. Returns YAML-formatted node with type, payload, and refs.",
parameters: {
type: "object",
properties: {
hash: { type: "string", description: "The CAS hash to retrieve" },
},
required: ["hash"],
},
},
};
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
function buildSummarizerInput(args: {
prompt: string;
recentSteps: readonly { role: string; contentHash: string }[];
}): string {
const recent = args.recentSteps.slice(-SUMMARIZER_RECENT_STEP_LIMIT);
const stepsBlock = recent
.map((s, i) => `${i + 1}. [${s.role}] contentHash: ${s.contentHash}`)
.join("\n");
return `Original task:\n${args.prompt}\n\nCompleted steps (oldest first):\n${stepsBlock === "" ? "(none)" : stepsBlock}\n\nUse cas_get to read step content if needed. Summarize the workflow outcome concisely.`;
}
export type SummarizeFn = (args: {
prompt: string;
recentSteps: readonly { role: string; contentHash: string }[];
fallback: string;
logger: LogFn;
}) => Promise<string>;
export function createSummarizer(provider: LlmProvider, cas: CasStore): SummarizeFn {
const reactor = createThreadReactor<SummarizerThreadContext>({
llm: createLlmFn(provider),
maxRounds: SUMMARIZER_MAX_REACT_ROUNDS,
staticTools: [CAS_GET_TOOL_DEFINITION],
structuredToolFromSchema: (schema) => {
const t = extractFunctionToolFromZodSchema(schema);
return {
name: t.name,
tool: {
type: "function" as const,
function: {
name: t.name,
description: t.description,
parameters: t.parameters,
},
},
};
},
systemPromptForStructuredTool: (structuredToolName) =>
`You summarize completed workflow threads. You have access to cas_get to read step content by hash. After reviewing the steps, call the ${structuredToolName} tool with a concise summary of the workflow outcome and results. Or reply with only a JSON object such as {"summary":"..."}.`,
toolHandler: async (call, thread) => {
if (call.function.name !== "cas_get") {
return `Unknown tool: ${call.function.name}`;
}
let hash: string;
try {
const ta = JSON.parse(call.function.arguments) as unknown;
if (!isRecord(ta) || typeof ta.hash !== "string") {
return 'cas_get requires {"hash": "<cas-hash>"}.';
}
hash = ta.hash;
} catch {
return "cas_get arguments were not valid JSON.";
}
const blob = await thread.cas.get(hash);
return blob === null ? "null" : blob;
},
});
return async (args) => {
const result = await reactor({
thread: { cas },
input: buildSummarizerInput(args),
schema: summarySchema,
});
if (!result.ok) {
args.logger("P2WX7KNR", `summarizer failed: ${result.error}`);
return args.fallback;
}
args.logger("Q5MT3VBF", "summarizer produced workflow summary");
return result.value.summary;
};
}