Merge pull request 'fix(uwf-hermes): read turn data from session file instead of ACP stream' (#520) from fix/519-read-session-file into main

This commit is contained in:
2026-05-25 14:24:41 +00:00
7 changed files with 199 additions and 230 deletions
@@ -453,7 +453,78 @@ describe("step read", () => {
expect(markdown).not.toContain("## Turn"); expect(markdown).not.toContain("## Turn");
}); });
test("test 6: turn content with special characters", async () => { test("test 6: displays role and tool calls in turn body", async () => {
const casDir = join(tmpDir, "cas");
await mkdir(casDir, { recursive: true });
const store = createFsStore(casDir);
const schemas = await registerUwfSchemas(store);
const detailSchemas = await registerDetailSchemas(store);
const workflowHash = await store.put(schemas.workflow, {
name: "test-wf",
description: "desc",
roles: {
worker: {
description: "Worker",
goal: "You are a worker agent.",
capabilities: [],
procedure: "Do the work.",
output: "Summarize the work.",
meta: "placeholder00" as CasRef,
},
},
conditions: {},
graph: {},
});
const startHash = await store.put(schemas.startNode, {
workflow: workflowHash,
prompt: "Test task",
});
const outputHash = await store.put(schemas.workflow, {
name: "out",
description: "",
roles: {},
conditions: {},
graph: {},
});
const turnHash = await store.put(detailSchemas.turn, {
index: 0,
role: "assistant",
content: "",
toolCalls: [{ name: "terminal", args: '{"command":"echo hi"}' }],
reasoning: null,
});
const detailHash = await store.put(detailSchemas.detail, {
sessionId: "session-1",
model: "test-model",
duration: 1000,
turnCount: 1,
turns: [turnHash],
});
const stepHash = await store.put(schemas.stepNode, {
start: startHash,
prev: null,
role: "worker",
output: outputHash,
detail: detailHash,
agent: "uwf-hermes",
startedAtMs: 1000000000000,
completedAtMs: 1000000005000,
});
const markdown = await cmdStepRead(tmpDir, stepHash, 4000);
expect(markdown).toContain("**Turn role:** assistant");
expect(markdown).toContain("**terminal**");
expect(markdown).toContain('{"command":"echo hi"}');
});
test("test 7: turn content with special characters", async () => {
const casDir = join(tmpDir, "cas"); const casDir = join(tmpDir, "cas");
await mkdir(casDir, { recursive: true }); await mkdir(casDir, { recursive: true });
const store = createFsStore(casDir); const store = createFsStore(casDir);
+79 -16
View File
@@ -19,9 +19,16 @@ import {
walkChain, walkChain,
} from "./shared.js"; } from "./shared.js";
type TurnToolCall = {
name: string;
args: string;
};
type TurnData = { type TurnData = {
index: number; index: number;
role: string;
content: string; content: string;
toolCalls: TurnToolCall[] | null;
}; };
/** /**
@@ -128,8 +135,74 @@ function loadStepDetail(store: BootstrapCapableStore, detailRef: CasRef): Record
return detailNode.payload as Record<string, unknown>; return detailNode.payload as Record<string, unknown>;
} }
function parseTurnToolCalls(raw: unknown): TurnToolCall[] | null {
if (!Array.isArray(raw) || raw.length === 0) {
return null;
}
const calls: TurnToolCall[] = [];
for (const entry of raw) {
if (typeof entry !== "object" || entry === null) {
continue;
}
const record = entry as Record<string, unknown>;
const name = record.name;
const args = record.args;
if (typeof name === "string") {
calls.push({ name, args: typeof args === "string" ? args : "" });
}
}
return calls.length > 0 ? calls : null;
}
function formatTurnBody(turn: TurnData): string {
const parts: string[] = [];
parts.push(`**Turn role:** ${turn.role}`);
if (turn.toolCalls !== null) {
for (const call of turn.toolCalls) {
const argsSuffix = call.args !== "" ? `\`${call.args}\`` : "";
parts.push(`- **${call.name}**${argsSuffix}`);
}
}
if (turn.content !== "") {
if (parts.length > 0) {
parts.push("");
}
parts.push(turn.content);
}
return parts.join("\n");
}
function parseSingleTurn(
store: BootstrapCapableStore,
turnRef: unknown,
fallbackIndex: number,
): TurnData | null {
if (typeof turnRef !== "string") {
return null;
}
const turnNode = store.get(turnRef as CasRef);
if (turnNode === null) {
return null;
}
const turn = turnNode.payload as Record<string, unknown>;
const content = typeof turn.content === "string" ? turn.content : "";
const toolCalls = parseTurnToolCalls(turn.toolCalls);
if (content === "" && toolCalls === null) {
return null;
}
return {
index: typeof turn.index === "number" ? turn.index : fallbackIndex,
role: typeof turn.role === "string" ? turn.role : "assistant",
content,
toolCalls,
};
}
/** /**
* Load all turn nodes from CAS store and extract content * Load all turn nodes from CAS store and extract display fields
*/ */
function loadTurnData(store: BootstrapCapableStore, turns: unknown): TurnData[] { function loadTurnData(store: BootstrapCapableStore, turns: unknown): TurnData[] {
if (!Array.isArray(turns) || turns.length === 0) { if (!Array.isArray(turns) || turns.length === 0) {
@@ -138,19 +211,9 @@ function loadTurnData(store: BootstrapCapableStore, turns: unknown): TurnData[]
const turnData: TurnData[] = []; const turnData: TurnData[] = [];
for (const turnRef of turns) { for (const turnRef of turns) {
if (typeof turnRef !== "string") { const parsed = parseSingleTurn(store, turnRef, turnData.length);
continue; if (parsed !== null) {
} turnData.push(parsed);
const turnNode = store.get(turnRef as CasRef);
if (turnNode === null) {
continue;
}
const turn = turnNode.payload as Record<string, unknown>;
if (typeof turn.content === "string") {
turnData.push({
index: typeof turn.index === "number" ? turn.index : turnData.length,
content: turn.content,
});
} }
} }
return turnData; return turnData;
@@ -168,7 +231,7 @@ function selectTurnsForQuota(turnData: TurnData[], availableQuota: number): Turn
if (turn === undefined) continue; if (turn === undefined) continue;
const turnHeader = `## Turn ${turn.index + 1}\n\n`; const turnHeader = `## Turn ${turn.index + 1}\n\n`;
const turnBlock = turnHeader + turn.content; const turnBlock = turnHeader + formatTurnBody(turn);
const separatorCost = selectedTurns.length > 0 ? 2 : 0; const separatorCost = selectedTurns.length > 0 ? 2 : 0;
const addCost = turnBlock.length + separatorCost; const addCost = turnBlock.length + separatorCost;
@@ -213,7 +276,7 @@ function formatStepMarkdown(
parts.push(""); parts.push("");
parts.push(`## Turn ${turn.index + 1}`); parts.push(`## Turn ${turn.index + 1}`);
parts.push(""); parts.push("");
parts.push(turn.content); parts.push(formatTurnBody(turn));
} }
return parts.join("\n"); return parts.join("\n");
+9
View File
@@ -18,6 +18,15 @@ bun add -g @uncaged/workflow-agent-hermes
Requires the `hermes` CLI on `PATH`. Requires the `hermes` CLI on `PATH`.
Hermes must write session JSON snapshots so `uwf-hermes` can load structured tool calls from disk. Add this to `~/.hermes/config.yaml`:
```yaml
sessions:
write_json_snapshots: true
```
Session files are stored at `~/.hermes/sessions/session_{sessionId}.json`.
## CLI Usage ## CLI Usage
Invoked by `uwf thread step` (not typically run directly): Invoked by `uwf thread step` (not typically run directly):
@@ -2,7 +2,7 @@ import { afterEach, beforeEach, describe, expect, it } from "bun:test";
import { HermesAcpClient } from "../src/acp-client.js"; import { HermesAcpClient } from "../src/acp-client.js";
describe("handleSessionUpdate — helper extraction", () => { describe("handleSessionUpdate — text extraction", () => {
let client: HermesAcpClient; let client: HermesAcpClient;
beforeEach(() => { beforeEach(() => {
@@ -14,80 +14,41 @@ describe("handleSessionUpdate — helper extraction", () => {
}); });
it("agent_message_chunk accumulates text in messageChunks", () => { it("agent_message_chunk accumulates text in messageChunks", () => {
(client as any).handleSessionUpdate({ (
client as unknown as { handleSessionUpdate: (u: Record<string, unknown>) => void }
).handleSessionUpdate({
sessionUpdate: "agent_message_chunk", sessionUpdate: "agent_message_chunk",
content: { type: "text", text: "hello" }, content: { type: "text", text: "hello" },
}); });
(client as any).handleSessionUpdate({ (
client as unknown as { handleSessionUpdate: (u: Record<string, unknown>) => void }
).handleSessionUpdate({
sessionUpdate: "agent_message_chunk", sessionUpdate: "agent_message_chunk",
content: { type: "text", text: " world" }, content: { type: "text", text: " world" },
}); });
expect((client as any).messageChunks).toEqual(["hello", " world"]); expect((client as unknown as { messageChunks: string[] }).messageChunks).toEqual([
"hello",
" world",
]);
}); });
it("agent_thought_chunk accumulates reasoning in reasoningChunks", () => { it("non-text chunks and other update types are ignored", () => {
(client as any).handleSessionUpdate({ (
sessionUpdate: "agent_thought_chunk", client as unknown as { handleSessionUpdate: (u: Record<string, unknown>) => void }
content: { type: "text", text: "thinking" }, ).handleSessionUpdate({
sessionUpdate: "agent_message_chunk",
content: { type: "image", text: "ignored" },
}); });
expect((client as any).reasoningChunks).toEqual(["thinking"]); (
}); client as unknown as { handleSessionUpdate: (u: Record<string, unknown>) => void }
).handleSessionUpdate({
it("tool_call registers a pending tool and flushes message chunks", () => {
(client as any).messageChunks = ["pre-tool text"];
(client as any).handleSessionUpdate({
sessionUpdate: "tool_call", sessionUpdate: "tool_call",
title: "Bash", title: "Bash",
rawInput: { command: "ls" },
toolCallId: "tc-1", toolCallId: "tc-1",
}); });
expect((client as any).pendingTools.get("tc-1")).toEqual({ (
name: "Bash", client as unknown as { handleSessionUpdate: (u: Record<string, unknown>) => void }
args: JSON.stringify({ command: "ls" }), ).handleSessionUpdate({ sessionUpdate: "unknown_type", data: {} });
}); expect((client as unknown as { messageChunks: string[] }).messageChunks).toHaveLength(0);
expect((client as any).messageChunks).toEqual([]);
expect((client as any).messages).toHaveLength(1);
expect((client as any).messages[0].role).toBe("assistant");
});
it("tool_call_update completed pushes tool_call and tool messages", () => {
(client as any).pendingTools.set("tc-2", { name: "Read", args: '{"path":"/foo"}' });
(client as any).handleSessionUpdate({
sessionUpdate: "tool_call_update",
status: "completed",
toolCallId: "tc-2",
rawOutput: "file contents",
});
const msgs = (client as any).messages as Array<{
role: string;
tool_calls: unknown;
content: string | null;
}>;
expect(msgs).toHaveLength(2);
expect(msgs[0].role).toBe("assistant");
expect(msgs[0].tool_calls).toEqual([
{ function: { name: "Read", arguments: '{"path":"/foo"}' } },
]);
expect(msgs[1].role).toBe("tool");
expect(msgs[1].content).toBe("file contents");
expect((client as any).pendingTools.has("tc-2")).toBe(false);
});
it("tool_call_update with non-string rawOutput JSON-stringifies it", () => {
(client as any).pendingTools.set("tc-3", { name: "Fetch", args: "" });
(client as any).handleSessionUpdate({
sessionUpdate: "tool_call_update",
status: "completed",
toolCallId: "tc-3",
rawOutput: { html: "<p>page</p>" },
});
const msgs = (client as any).messages as Array<{ role: string; content: string | null }>;
expect(msgs[1].content).toBe(JSON.stringify({ html: "<p>page</p>" }));
});
it("unknown updateType is a no-op", () => {
(client as any).handleSessionUpdate({ sessionUpdate: "unknown_type", data: {} });
expect((client as any).messages).toHaveLength(0);
expect((client as any).messageChunks).toHaveLength(0);
}); });
}); });
@@ -53,23 +53,4 @@ describe("HermesAcpClient", () => {
}, },
{ timeout: 2 * 60 * 1000 }, { timeout: 2 * 60 * 1000 },
); );
// TODO(#435): flaky — depends on live LLM; mock or move to integration suite
it.skip(
"prompt() collects structured messages including tool calls",
async () => {
await client.connect(process.cwd());
const result = await client.prompt("Run this command: echo TOOL_DETAIL_TEST");
expect(result.messages.length).toBeGreaterThan(0);
const toolMessages = result.messages.filter((m) => m.role === "tool");
expect(toolMessages.length).toBeGreaterThan(0);
const toolContent = toolMessages[0]?.content ?? "";
expect(toolContent).toContain("TOOL_DETAIL_TEST");
const assistantWithTools = result.messages.filter(
(m) => m.role === "assistant" && m.tool_calls !== null,
);
expect(assistantWithTools.length).toBeGreaterThan(0);
},
{ timeout: 2 * 60 * 1000 },
);
}); });
@@ -2,8 +2,6 @@ import type { ChildProcess } from "node:child_process";
import { spawn } from "node:child_process"; import { spawn } from "node:child_process";
import { createInterface } from "node:readline"; import { createInterface } from "node:readline";
import type { HermesSessionMessage } from "./types.js";
const HERMES_COMMAND = "hermes"; const HERMES_COMMAND = "hermes";
const PROTOCOL_VERSION = 1; const PROTOCOL_VERSION = 1;
@@ -19,16 +17,9 @@ type PendingRequest = {
reject: (reason: Error) => void; reject: (reason: Error) => void;
}; };
/** Tracks in-flight tool calls so we can build complete messages when they finish. */
type PendingToolCall = {
name: string;
args: string;
};
export type AcpPromptResult = { export type AcpPromptResult = {
text: string; text: string;
sessionId: string; sessionId: string;
messages: HermesSessionMessage[];
}; };
export class HermesAcpClient { export class HermesAcpClient {
@@ -38,11 +29,8 @@ export class HermesAcpClient {
private stderrBuffer = ""; private stderrBuffer = "";
private pending = new Map<number, PendingRequest>(); private pending = new Map<number, PendingRequest>();
// Message collection state /** Accumulated assistant text chunks from agent_message_chunk updates. */
private messageChunks: string[] = []; private messageChunks: string[] = [];
private reasoningChunks: string[] = [];
private pendingTools = new Map<string, PendingToolCall>();
messages: HermesSessionMessage[] = [];
/** Spawn hermes acp, initialize, create session */ /** Spawn hermes acp, initialize, create session */
async connect(cwd: string): Promise<string> { async connect(cwd: string): Promise<string> {
@@ -84,14 +72,13 @@ export class HermesAcpClient {
return sessionId; return sessionId;
} }
/** Send prompt and collect full response text + structured messages. */ /** Send prompt and collect final assistant text from ACP stream chunks. */
async prompt(text: string): Promise<AcpPromptResult> { async prompt(text: string): Promise<AcpPromptResult> {
if (this.sessionId === null) { if (this.sessionId === null) {
throw new Error("Not connected — call connect() first"); throw new Error("Not connected — call connect() first");
} }
this.messageChunks = []; this.messageChunks = [];
this.reasoningChunks = [];
const response = await this.sendRequest("session/prompt", { const response = await this.sendRequest("session/prompt", {
sessionId: this.sessionId, sessionId: this.sessionId,
@@ -104,28 +91,9 @@ export class HermesAcpClient {
); );
} }
// Flush any trailing assistant text that wasn't followed by a tool call.
this.flushAssistantMessage();
// Extract the final assistant text from collected messages.
let finalText = "";
for (let i = this.messages.length - 1; i >= 0; i--) {
const msg = this.messages[i];
if (
msg !== undefined &&
msg.role === "assistant" &&
msg.content !== null &&
msg.content.trim() !== ""
) {
finalText = msg.content;
break;
}
}
return { return {
text: finalText, text: this.messageChunks.join(""),
sessionId: this.sessionId, sessionId: this.sessionId,
messages: this.messages,
}; };
} }
@@ -242,94 +210,16 @@ export class HermesAcpClient {
} }
} }
// ---- Session update → structured messages ----
private handleSessionUpdate(update: Record<string, unknown>): void { private handleSessionUpdate(update: Record<string, unknown>): void {
switch (update.sessionUpdate as string) { if (update.sessionUpdate !== "agent_message_chunk") {
case "agent_message_chunk": return;
this.handleAgentMessageChunk(update);
break;
case "agent_thought_chunk":
this.handleAgentThoughtChunk(update);
break;
case "tool_call":
this.handleToolCall(update);
break;
case "tool_call_update":
this.handleToolCallUpdate(update);
break;
default:
break;
} }
}
private handleAgentMessageChunk(update: Record<string, unknown>): void {
const content = update.content as { type?: string; text?: string } | undefined; const content = update.content as { type?: string; text?: string } | undefined;
if (content?.type === "text" && typeof content.text === "string") { if (content?.type === "text" && typeof content.text === "string") {
this.messageChunks.push(content.text); this.messageChunks.push(content.text);
} }
} }
private handleAgentThoughtChunk(update: Record<string, unknown>): void {
const content = update.content as { type?: string; text?: string } | undefined;
if (content?.type === "text" && typeof content.text === "string") {
this.reasoningChunks.push(content.text);
}
}
private handleToolCall(update: Record<string, unknown>): void {
const title = (update.title as string) ?? "";
const rawInput = update.rawInput;
const args = rawInput !== undefined && rawInput !== null ? JSON.stringify(rawInput) : "";
const toolCallId = update.toolCallId as string;
this.pendingTools.set(toolCallId, { name: title, args });
this.flushAssistantMessage();
}
private handleToolCallUpdate(update: Record<string, unknown>): void {
const status = update.status as string | undefined;
if (status !== "completed" && status !== "failed") return;
const toolCallId = update.toolCallId as string;
const pending = this.pendingTools.get(toolCallId);
const toolName = pending?.name ?? toolCallId;
const rawOutput = update.rawOutput;
const outputStr =
rawOutput !== undefined && rawOutput !== null
? typeof rawOutput === "string"
? rawOutput
: JSON.stringify(rawOutput)
: "";
this.messages.push({
role: "assistant",
content: null,
reasoning: null,
tool_calls: [{ function: { name: toolName, arguments: pending?.args ?? "" } }],
});
this.messages.push({
role: "tool",
content: outputStr,
reasoning: null,
tool_calls: null,
});
this.pendingTools.delete(toolCallId);
}
/** Flush any accumulated text/reasoning into an assistant message. */
private flushAssistantMessage(): void {
const text = this.messageChunks.join("");
const reasoning = this.reasoningChunks.join("");
if (text !== "" || reasoning !== "") {
this.messages.push({
role: "assistant",
content: text || null,
reasoning: reasoning || null,
tool_calls: null,
});
}
this.messageChunks = [];
this.reasoningChunks = [];
}
private rejectAll(err: Error): void { private rejectAll(err: Error): void {
for (const handler of this.pending.values()) { for (const handler of this.pending.values()) {
handler.reject(err); handler.reject(err);
+10 -16
View File
@@ -10,7 +10,7 @@ import {
import { HermesAcpClient } from "./acp-client.js"; import { HermesAcpClient } from "./acp-client.js";
import { getCachedSessionId, isResumeDisabled, setCachedSessionId } from "./session-cache.js"; import { getCachedSessionId, isResumeDisabled, setCachedSessionId } from "./session-cache.js";
import { storeHermesSessionDetail } from "./session-detail.js"; import { loadHermesSession, storeHermesSessionDetail } from "./session-detail.js";
const log = createLogger({ sink: { kind: "stderr" } }); const log = createLogger({ sink: { kind: "stderr" } });
@@ -49,17 +49,11 @@ export function buildHermesPrompt(ctx: AgentContext): string {
return parts.join("\n"); return parts.join("\n");
} }
async function storePromptResult( async function storePromptResult(store: Store, sessionId: string): Promise<{ detailHash: string }> {
store: Store, const session = await loadHermesSession(sessionId);
sessionId: string, if (session === null) {
messages: Awaited<ReturnType<HermesAcpClient["prompt"]>>["messages"], throw new Error(`Hermes session file not found: ${sessionId}`);
): Promise<{ detailHash: string }> { }
const session = {
session_id: sessionId,
model: "",
session_start: new Date().toISOString(),
messages,
};
return storeHermesSessionDetail(store, session); return storeHermesSessionDetail(store, session);
} }
@@ -116,8 +110,8 @@ export function createHermesAgent(): () => Promise<void> {
async function runPrompt(ctx: AgentContext, useContinuation: boolean): Promise<AgentRunResult> { async function runPrompt(ctx: AgentContext, useContinuation: boolean): Promise<AgentRunResult> {
const effectiveCtx = useContinuation ? ctx : { ...ctx, isFirstVisit: true }; const effectiveCtx = useContinuation ? ctx : { ...ctx, isFirstVisit: true };
const fullPrompt = buildHermesPrompt(effectiveCtx); const fullPrompt = buildHermesPrompt(effectiveCtx);
const { text, sessionId, messages } = await client.prompt(fullPrompt); const { text, sessionId } = await client.prompt(fullPrompt);
const { detailHash } = await storePromptResult(ctx.store, sessionId, messages); const { detailHash } = await storePromptResult(ctx.store, sessionId);
if (!isResumeDisabled()) { if (!isResumeDisabled()) {
await setCachedSessionId(ctx.threadId, ctx.role, sessionId); await setCachedSessionId(ctx.threadId, ctx.role, sessionId);
@@ -152,8 +146,8 @@ export function createHermesAgent(): () => Promise<void> {
): Promise<AgentRunResult> { ): Promise<AgentRunResult> {
// Client is already connected from runHermes — same ACP session, // Client is already connected from runHermes — same ACP session,
// so the agent sees the full conversation history (crucial for retries). // so the agent sees the full conversation history (crucial for retries).
const { text, sessionId, messages } = await client.prompt(message); const { text, sessionId } = await client.prompt(message);
const { detailHash } = await storePromptResult(store, sessionId, messages); const { detailHash } = await storePromptResult(store, sessionId);
return { output: text, detailHash, sessionId }; return { output: text, detailHash, sessionId };
} }