Merge pull request 'feat: generate LLM summary in __end__ node via ReAct loop' (#190) from feat/187-end-node-llm-summary into main

This commit was merged in pull request #190.
This commit is contained in:
2026-05-11 10:31:24 +00:00
4 changed files with 161 additions and 63 deletions
@@ -0,0 +1,79 @@
import type { CasStore } from "@uncaged/workflow-cas";
import type { ThreadReactorFn } from "@uncaged/workflow-reactor";
import { createLlmFn, createThreadReactor } from "@uncaged/workflow-reactor";
import type { LlmProvider } from "@uncaged/workflow-runtime";
import { extractFunctionToolFromZodSchema } from "./extract/index.js";
export type CasReactorThread = {
cas: CasStore;
};
const CAS_GET_TOOL_DEFINITION = {
type: "function" as const,
function: {
name: "cas_get",
description:
"Read a Merkle DAG node from content-addressed storage by its hash. Returns YAML-formatted node with type, payload, and refs or children fields (content nodes use refs).",
parameters: {
type: "object",
properties: {
hash: { type: "string", description: "The CAS hash to retrieve" },
},
required: ["hash"],
},
},
};
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
export type CasReactorOpts = {
maxRounds: number;
systemPromptForStructuredTool: (structuredToolName: string) => string;
};
export function createCasReactor(
provider: LlmProvider,
cas: CasStore,
opts: CasReactorOpts,
): ThreadReactorFn<CasReactorThread> {
return createThreadReactor<CasReactorThread>({
llm: createLlmFn(provider),
maxRounds: opts.maxRounds,
staticTools: [CAS_GET_TOOL_DEFINITION],
structuredToolFromSchema: (schema) => {
const t = extractFunctionToolFromZodSchema(schema);
return {
name: t.name,
tool: {
type: "function" as const,
function: {
name: t.name,
description: t.description,
parameters: t.parameters,
},
},
};
},
systemPromptForStructuredTool: opts.systemPromptForStructuredTool,
toolHandler: async (call, _thread) => {
if (call.function.name !== "cas_get") {
return `Unknown tool: ${call.function.name}`;
}
let hash: string;
try {
const ta = JSON.parse(call.function.arguments) as unknown;
if (!isRecord(ta) || typeof ta.hash !== "string") {
return 'cas_get requires {"hash": "<cas-hash>"}.';
}
hash = ta.hash;
} catch {
return "cas_get arguments were not valid JSON.";
}
const blob = await cas.get(hash);
return blob === null ? "null" : blob;
},
});
}
+23 -2
View File
@@ -26,6 +26,7 @@ import { END, START } from "@uncaged/workflow-runtime";
import { err, type LogFn, ok, type Result } from "@uncaged/workflow-util";
import { createExtract } from "../extract/index.js";
import { createSummarizer, type SummarizeFn } from "./summarizer.js";
import { runSupervisor } from "./supervisor.js";
import {
appendThreadHistoryEntry,
@@ -53,6 +54,7 @@ async function resolveEngineRegistryRuntime(
Result<
{
extract: ReturnType<typeof createExtract>;
summarize: SummarizeFn;
workflowConfig: WorkflowConfig;
},
string
@@ -76,7 +78,11 @@ async function resolveEngineRegistryRuntime(
apiKey: ex.apiKey,
model: ex.model,
};
return ok({ extract: createExtract(llmProvider, { cas }), workflowConfig: cfg });
return ok({
extract: createExtract(llmProvider, { cas }),
summarize: createSummarizer(llmProvider, cas),
workflowConfig: cfg,
});
}
async function appendStateForStep(params: {
@@ -250,6 +256,7 @@ async function driveWorkflowGenerator(params: {
bundleDir: string;
startHash: string;
chain: ChainState;
summarize: SummarizeFn;
}): Promise<WorkflowResult> {
const {
fn,
@@ -262,6 +269,7 @@ async function driveWorkflowGenerator(params: {
cas,
bundleDir,
startHash,
summarize,
} = params;
let chain: ChainState = params.chain;
const gen = fn(thread, runtime);
@@ -270,6 +278,10 @@ async function driveWorkflowGenerator(params: {
role: s.role,
summary: JSON.stringify(s.meta),
}));
const summarizerSteps: { role: string; contentHash: string }[] = thread.steps.map((s) => ({
role: s.role,
contentHash: s.contentHash,
}));
while (true) {
if (executeOptions.signal.aborted) {
@@ -288,13 +300,20 @@ async function driveWorkflowGenerator(params: {
if (iterResult.done) {
logger("F3HN8QKP", `thread ${threadId} generator finished`);
const rawCompletion = iterResult.value;
const llmSummary = await summarize({
prompt: thread.start.content,
recentSteps: summarizerSteps,
fallback: rawCompletion.summary,
logger,
});
return await finalizeThread({
cas,
bundleDir,
threadId,
startHash,
chain,
completion: iterResult.value,
completion: { ...rawCompletion, summary: llmSummary },
});
}
@@ -320,6 +339,7 @@ async function driveWorkflowGenerator(params: {
role: step.role,
summary: JSON.stringify(step.meta),
});
summarizerSteps.push({ role: step.role, contentHash: step.contentHash });
await Promise.race([
executeOptions.awaitAfterEachYield(),
@@ -499,5 +519,6 @@ export async function executeThread(
bundleDir,
startHash,
chain,
summarize: registryRuntime.value.summarize,
});
}
@@ -0,0 +1,56 @@
import type { CasStore } from "@uncaged/workflow-cas";
import type { LlmProvider } from "@uncaged/workflow-runtime";
import type { LogFn } from "@uncaged/workflow-util";
import * as z from "zod/v4";
import { createCasReactor } from "../cas-reactor.js";
/** Max ReAct rounds: 3 cas_get reads + 1 structured output = 4 rounds is sufficient. */
const SUMMARIZER_MAX_REACT_ROUNDS = 4;
/** Only pass the last N steps; each step is just a role+contentHash reference (~60 chars), not full content. */
const SUMMARIZER_RECENT_STEP_LIMIT = 20;
const summarySchema = z.object({ summary: z.string() }).meta({
title: "workflow_summary",
description: "A concise summary of the completed workflow's results and outcome.",
});
function buildSummarizerInput(args: {
prompt: string;
recentSteps: readonly { role: string; contentHash: string }[];
}): string {
const recent = args.recentSteps.slice(-SUMMARIZER_RECENT_STEP_LIMIT);
const stepsBlock = recent
.map((s, i) => `${i + 1}. [${s.role}] contentHash: ${s.contentHash}`)
.join("\n");
return `Original task:\n${args.prompt}\n\nCompleted steps (oldest first):\n${stepsBlock === "" ? "(none)" : stepsBlock}\n\nUse cas_get to read step content if needed. Summarize the workflow outcome concisely.`;
}
export type SummarizeFn = (args: {
prompt: string;
recentSteps: readonly { role: string; contentHash: string }[];
fallback: string;
logger: LogFn;
}) => Promise<string>;
export function createSummarizer(provider: LlmProvider, cas: CasStore): SummarizeFn {
const reactor = createCasReactor(provider, cas, {
maxRounds: SUMMARIZER_MAX_REACT_ROUNDS,
systemPromptForStructuredTool: (structuredToolName) =>
`You summarize completed workflow threads. You have access to cas_get to read step content by hash. After reviewing the steps, call the ${structuredToolName} tool with a concise summary of the workflow outcome and results. Or reply with only a JSON object such as {"summary":"..."}.`,
});
return async (args) => {
const result = await reactor({
thread: { cas },
input: buildSummarizerInput(args),
schema: summarySchema,
});
if (!result.ok) {
args.logger("P2WX7KNR", `summarizer failed: ${result.error}`);
return args.fallback;
}
args.logger("Q5MT3VBF", "summarizer produced workflow summary");
return result.value.summary;
};
}
@@ -1,8 +1,8 @@
import { type CasStore, getContentMerklePayload } from "@uncaged/workflow-cas";
import { createLlmFn, createThreadReactor } from "@uncaged/workflow-reactor";
import type { ExtractFn, ExtractResult, LlmProvider } from "@uncaged/workflow-runtime";
import type * as z from "zod/v4";
import { extractFunctionToolFromZodSchema } from "./llm-extract.js";
import { createCasReactor } from "../cas-reactor.js";
export type ExtractDeps = {
cas: CasStore;
@@ -10,30 +10,6 @@ export type ExtractDeps = {
const MAX_REACT_ROUNDS = 10;
const CAS_GET_TOOL_DEFINITION = {
type: "function" as const,
function: {
name: "cas_get",
description:
"Read a Merkle DAG node from content-addressed storage by its hash. Returns YAML-formatted node with type, payload, and refs or children fields (content nodes use refs).",
parameters: {
type: "object",
properties: {
hash: { type: "string", description: "The CAS hash to retrieve" },
},
required: ["hash"],
},
},
};
type ExtractThreadContext = {
cas: CasStore;
};
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
/**
* Create an ExtractFn backed by an LLM provider.
*
@@ -42,44 +18,10 @@ function isRecord(value: unknown): value is Record<string, unknown> {
* assistant reply as a short-circuit, which covers the legacy "single" extraction path.
*/
export function createExtract(provider: LlmProvider, deps: ExtractDeps): ExtractFn {
const llm = createLlmFn(provider);
const reactor = createThreadReactor<ExtractThreadContext>({
llm,
const reactor = createCasReactor(provider, deps.cas, {
maxRounds: MAX_REACT_ROUNDS,
staticTools: [CAS_GET_TOOL_DEFINITION],
structuredToolFromSchema: (schema) => {
const t = extractFunctionToolFromZodSchema(schema);
return {
name: t.name,
tool: {
type: "function" as const,
function: {
name: t.name,
description: t.description,
parameters: t.parameters,
},
},
};
},
systemPromptForStructuredTool: (structuredToolName) =>
`You extract structured metadata from content. The content is from a CAS node. Use cas_get to read referenced nodes if needed. When ready, call the ${structuredToolName} tool with JSON matching the schema. You may instead reply with only a JSON object (no prose) when no tools are needed.`,
toolHandler: async (call, thread) => {
if (call.function.name !== "cas_get") {
return `Unexpected tool routed to handler: ${call.function.name}`;
}
let hash: string;
try {
const ta = JSON.parse(call.function.arguments) as unknown;
if (!isRecord(ta) || typeof ta.hash !== "string") {
return 'cas_get requires a JSON object with a string "hash" field.';
}
hash = ta.hash;
} catch {
return 'cas_get arguments were not valid JSON. Provide {"hash": "<cas-hash>"}.';
}
const blob = await thread.cas.get(hash);
return blob === null ? "null" : blob;
},
});
return async <T extends Record<string, unknown>>(