From 766ec7ddc2c14bccecd5984d5427bc93a2f56071 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Fri, 22 May 2026 12:15:09 +0000 Subject: [PATCH 1/5] feat: add HermesAcpClient for structured ACP communication MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements JSON-RPC client that communicates with `hermes acp` via stdin/stdout. Replaces fragile stdout/stderr parsing with structured protocol: initialize → session/new → session/prompt → collect chunks. Session ID comes directly from protocol response, eliminating the race condition in #380. Phase 1 of RFC #398 --- .../__tests__/acp-client.test.ts | 56 ++++ .../workflow-agent-hermes/src/acp-client.ts | 250 ++++++++++++++++++ packages/workflow-agent-hermes/src/index.ts | 1 + 3 files changed, 307 insertions(+) create mode 100644 packages/workflow-agent-hermes/__tests__/acp-client.test.ts create mode 100644 packages/workflow-agent-hermes/src/acp-client.ts diff --git a/packages/workflow-agent-hermes/__tests__/acp-client.test.ts b/packages/workflow-agent-hermes/__tests__/acp-client.test.ts new file mode 100644 index 0000000..ea3a7a5 --- /dev/null +++ b/packages/workflow-agent-hermes/__tests__/acp-client.test.ts @@ -0,0 +1,56 @@ +import { afterEach, beforeEach, describe, expect, it } from "bun:test"; + +import { HermesAcpClient } from "../src/acp-client.js"; + +const UUID_RE = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i; + +describe("HermesAcpClient", () => { + let client: HermesAcpClient; + + beforeEach(() => { + client = new HermesAcpClient(); + }); + + afterEach(async () => { + await client.close(); + }); + + it( + "connect() returns a UUID sessionId", + async () => { + const sessionId = await client.connect(process.cwd()); + expect(typeof sessionId).toBe("string"); + expect(sessionId).toMatch(UUID_RE); + }, + { timeout: 2 * 60 * 1000 }, + ); + + it( + "prompt() returns a non-empty text response", + async () => { + await client.connect(process.cwd()); + const result = await client.prompt("Reply with exactly the word: PONG"); + expect(typeof result.text).toBe("string"); + expect(result.text.length).toBeGreaterThan(0); + expect(typeof result.sessionId).toBe("string"); + expect(result.sessionId).toMatch(UUID_RE); + }, + { timeout: 2 * 60 * 1000 }, + ); + + it( + "prompt() can be called twice on the same session (resume)", + async () => { + await client.connect(process.cwd()); + + const first = await client.prompt("Say the word ALPHA and nothing else."); + expect(first.text.length).toBeGreaterThan(0); + + const second = await client.prompt("Now say the word BETA and nothing else."); + expect(second.text.length).toBeGreaterThan(0); + + expect(first.sessionId).toBe(second.sessionId); + }, + { timeout: 2 * 60 * 1000 }, + ); +}); diff --git a/packages/workflow-agent-hermes/src/acp-client.ts b/packages/workflow-agent-hermes/src/acp-client.ts new file mode 100644 index 0000000..5a41732 --- /dev/null +++ b/packages/workflow-agent-hermes/src/acp-client.ts @@ -0,0 +1,250 @@ +import type { ChildProcess } from "node:child_process"; +import { spawn } from "node:child_process"; +import { createInterface } from "node:readline"; + +const HERMES_COMMAND = "hermes"; +const PROMPT_TIMEOUT_MS = 10 * 60 * 1000; + +type JsonRpcRequest = { + jsonrpc: "2.0"; + id: number; + method: string; + params: Record; +}; + +type JsonRpcNotification = { + jsonrpc: "2.0"; + method: string; + params?: Record; +}; + +type JsonRpcResponse = { + jsonrpc: "2.0"; + id: number; + result: unknown; + error?: { code: number; message: string }; +}; + +type PendingRequest = { + resolve: (value: JsonRpcResponse) => void; + reject: (reason: Error) => void; +}; + +type SessionUpdateParams = { + update: { + sessionUpdate: string; + content?: { + text?: string; + }; + }; +}; + +function isSessionUpdateParams(params: unknown): params is SessionUpdateParams { + return ( + typeof params === "object" && + params !== null && + "update" in params && + typeof (params as Record).update === "object" + ); +} + +export class HermesAcpClient { + private process: ChildProcess | null = null; + private nextId = 1; + private sessionId: string | null = null; + private pending = new Map(); + private stderrBuffer = ""; + private messageChunks: string[] = []; + + /** Spawn hermes acp, initialize, create session */ + async connect(cwd: string): Promise { + const child = spawn(HERMES_COMMAND, ["acp"], { + env: process.env, + shell: false, + stdio: ["pipe", "pipe", "pipe"], + }); + + this.process = child; + + child.stderr?.on("data", (chunk: Buffer) => { + this.stderrBuffer += chunk.toString(); + }); + + child.on("error", (cause) => { + const message = cause instanceof Error ? cause.message : String(cause); + this.rejectAll(new Error(`hermes acp spawn failed: ${message}`)); + }); + + child.on("close", (code) => { + if (code !== 0 && this.pending.size > 0) { + const detail = this.stderrBuffer.trim() !== "" ? ` stderr=${this.stderrBuffer.trim()}` : ""; + this.rejectAll( + new Error(`hermes acp exited unexpectedly with code ${code ?? "null"}${detail}`), + ); + } + }); + + if (child.stdout === null) { + throw new Error("hermes acp process stdout is not available"); + } + const rl = createInterface({ input: child.stdout }); + rl.on("line", (line) => { + this.handleLine(line.trim()); + }); + + const initResponse = await this.sendRequest("initialize", { + protocolVersion: 1, + clientInfo: { name: "uwf", version: "0.1.0" }, + capabilities: {}, + }); + + if ((initResponse as { error?: unknown }).error !== undefined) { + throw new Error( + `initialize failed: ${JSON.stringify((initResponse as { error: unknown }).error)}`, + ); + } + + this.sendNotification("initialized"); + + const sessionResponse = (await this.sendRequest("session/new", { + cwd, + mcpServers: [], + })) as { result: { sessionId: string } }; + + const sessionId = sessionResponse.result?.sessionId; + if (typeof sessionId !== "string" || sessionId === "") { + throw new Error(`session/new did not return a sessionId: ${JSON.stringify(sessionResponse)}`); + } + + this.sessionId = sessionId; + return sessionId; + } + + /** Send prompt and collect full response text */ + async prompt(text: string): Promise<{ text: string; sessionId: string }> { + if (this.sessionId === null) { + throw new Error("Not connected — call connect() first"); + } + + this.messageChunks = []; + + const response = await this.sendRequest( + "session/prompt", + { + sessionId: this.sessionId, + prompt: [{ type: "text", text }], + }, + PROMPT_TIMEOUT_MS, + ); + + if ((response as { error?: unknown }).error !== undefined) { + throw new Error( + `session/prompt failed: ${JSON.stringify((response as { error: unknown }).error)}`, + ); + } + + return { + text: this.messageChunks.join(""), + sessionId: this.sessionId, + }; + } + + /** Close the connection */ + async close(): Promise { + if (this.process === null) { + return; + } + this.process.stdin?.end(); + const proc = this.process; + await new Promise((resolve) => { + proc.on("close", () => resolve()); + setTimeout(resolve, 5000); + }); + this.process = null; + } + + private sendRequest( + method: string, + params: Record, + timeoutMs = 30_000, + ): Promise { + const id = this.nextId++; + const message: JsonRpcRequest = { jsonrpc: "2.0", id, method, params }; + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + this.pending.delete(id); + reject(new Error(`Timeout waiting for response to ${method} (id=${id})`)); + }, timeoutMs); + + this.pending.set(id, { + resolve: (value) => { + clearTimeout(timer); + resolve(value); + }, + reject: (err) => { + clearTimeout(timer); + reject(err); + }, + }); + + this.writeLine(JSON.stringify(message)); + }); + } + + private sendNotification(method: string, params?: Record): void { + const message: JsonRpcNotification = { jsonrpc: "2.0", method }; + if (params !== undefined) { + message.params = params; + } + this.writeLine(JSON.stringify(message)); + } + + private writeLine(line: string): void { + if (this.process?.stdin === null || this.process?.stdin === undefined) { + throw new Error("Cannot write: hermes acp process stdin not available"); + } + this.process.stdin.write(`${line}\n`); + } + + private handleLine(line: string): void { + if (line === "") { + return; + } + + let parsed: unknown; + try { + parsed = JSON.parse(line); + } catch { + return; + } + + const msg = parsed as Record; + + if ("id" in msg && msg.id !== undefined && msg.id !== null) { + const response = msg as unknown as JsonRpcResponse; + const handler = this.pending.get(response.id); + if (handler !== undefined) { + this.pending.delete(response.id); + handler.resolve(response); + } + return; + } + + if (msg.method === "session/update" && isSessionUpdateParams(msg.params)) { + const updateType = msg.params.update.sessionUpdate; + if (updateType === "agent_message_chunk") { + const chunk = msg.params.update.content?.text; + if (typeof chunk === "string") { + this.messageChunks.push(chunk); + } + } + } + } + + private rejectAll(err: Error): void { + for (const handler of this.pending.values()) { + handler.reject(err); + } + this.pending.clear(); + } +} diff --git a/packages/workflow-agent-hermes/src/index.ts b/packages/workflow-agent-hermes/src/index.ts index b27f12a..3eb4d4a 100644 --- a/packages/workflow-agent-hermes/src/index.ts +++ b/packages/workflow-agent-hermes/src/index.ts @@ -1 +1,2 @@ export { buildHermesPrompt, createHermesAgent } from "./hermes.js"; +export { HermesAcpClient } from "./acp-client.js"; -- 2.43.0 From 96584e481fc17ef80655962007e5bc5b52097375 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Fri, 22 May 2026 12:18:14 +0000 Subject: [PATCH 2/5] refactor: replace spawnHermes with HermesAcpClient Remove spawnHermes, spawnHermesChat, spawnHermesResume, parseSessionId, and buildResultFromSession. runHermes and continueHermes now use HermesAcpClient for structured JSON-RPC communication. Session ID comes directly from ACP protocol, eliminating #380 race condition. Agent output collected via streaming chunks instead of session file loading. Phase 2 of RFC #398 Fixes #380 --- packages/workflow-agent-hermes/src/hermes.ts | 123 ++++--------------- packages/workflow-agent-hermes/src/index.ts | 2 +- 2 files changed, 23 insertions(+), 102 deletions(-) diff --git a/packages/workflow-agent-hermes/src/hermes.ts b/packages/workflow-agent-hermes/src/hermes.ts index 5264934..178d49f 100644 --- a/packages/workflow-agent-hermes/src/hermes.ts +++ b/packages/workflow-agent-hermes/src/hermes.ts @@ -1,4 +1,3 @@ -import { spawn } from "node:child_process"; import type { Store } from "@uncaged/json-cas"; import { @@ -8,14 +7,8 @@ import { createAgent, } from "@uncaged/workflow-agent-kit"; -import { - loadHermesSession, - parseSessionIdFromStdout, - storeHermesSessionDetail, -} from "./session-detail.js"; - -const HERMES_COMMAND = "hermes"; -const HERMES_MAX_TURNS = 90; +import { HermesAcpClient } from "./acp-client.js"; +import { storeHermesRawOutput } from "./session-detail.js"; function buildHistorySummary(steps: AgentContext["steps"]): string { if (steps.length === 0) { @@ -52,106 +45,34 @@ export function buildHermesPrompt(ctx: AgentContext): string { return parts.join("\n"); } -function spawnHermes(args: string[]): Promise<{ stdout: string; stderr: string }> { - return new Promise((resolve, reject) => { - const child = spawn(HERMES_COMMAND, args, { - env: process.env, - shell: false, - stdio: ["ignore", "pipe", "pipe"], - }); - - let stdout = ""; - let stderr = ""; - child.stdout?.on("data", (chunk: Buffer) => { - stdout += chunk.toString(); - }); - child.stderr?.on("data", (chunk: Buffer) => { - stderr += chunk.toString(); - }); - - child.on("error", (cause) => { - const message = cause instanceof Error ? cause.message : String(cause); - reject(new Error(`hermes spawn failed: ${message}`)); - }); - - child.on("close", (code) => { - if (code === 0) { - resolve({ stdout, stderr }); - return; - } - const detail = stderr.trim() !== "" ? ` stderr=${stderr.trim()}` : ""; - reject(new Error(`hermes exited with code ${code ?? "null"}${detail}`)); - }); - }); -} - -function spawnHermesChat(prompt: string): Promise<{ stdout: string; stderr: string }> { - return spawnHermes([ - "chat", - "-q", - prompt, - "--yolo", - "--max-turns", - String(HERMES_MAX_TURNS), - "--quiet", - ]); -} - -function spawnHermesResume( - sessionId: string, - message: string, -): Promise<{ stdout: string; stderr: string }> { - return spawnHermes([ - "chat", - "--resume", - sessionId, - "-q", - message, - "--yolo", - "--max-turns", - String(HERMES_MAX_TURNS), - "--quiet", - ]); -} - -function parseSessionId(stdout: string, stderr: string): string { - const sessionId = parseSessionIdFromStdout(stderr) ?? parseSessionIdFromStdout(stdout); - if (sessionId === null) { - throw new Error( - "Failed to parse session_id from hermes output.\n" + - `stderr (first 200 chars): ${stderr.slice(0, 200)}\n` + - `stdout (first 200 chars): ${stdout.slice(0, 200)}`, - ); - } - return sessionId; -} - -async function buildResultFromSession(sessionId: string, store: Store): Promise { - const session = await loadHermesSession(sessionId); - if (session === null) { - throw new Error(`Failed to load hermes session file for session_id: ${sessionId}`); - } - const { detailHash, output } = await storeHermesSessionDetail(store, session); - return { output, detailHash, sessionId }; -} - async function runHermes(ctx: AgentContext): Promise { const fullPrompt = buildHermesPrompt(ctx); - const { stdout, stderr } = await spawnHermesChat(fullPrompt); - const sessionId = parseSessionId(stdout, stderr); - return buildResultFromSession(sessionId, ctx.store); + const client = new HermesAcpClient(); + try { + await client.connect(process.cwd()); + const { text, sessionId } = await client.prompt(fullPrompt); + const detailHash = await storeHermesRawOutput(ctx.store, text); + return { output: text, detailHash, sessionId }; + } finally { + await client.close(); + } } async function continueHermes( - sessionId: string, + _sessionId: string, message: string, store: Store, ): Promise { - const { stdout, stderr } = await spawnHermesResume(sessionId, message); - // Resume may return a new session_id - const newSessionId = parseSessionIdFromStdout(stderr) ?? parseSessionIdFromStdout(stdout); - const resolvedId = newSessionId ?? sessionId; - return buildResultFromSession(resolvedId, store); + // ACP does not support resuming an external session; start a new session with the message as prompt + const client = new HermesAcpClient(); + try { + await client.connect(process.cwd()); + const { text, sessionId } = await client.prompt(message); + const detailHash = await storeHermesRawOutput(store, text); + return { output: text, detailHash, sessionId }; + } finally { + await client.close(); + } } /** Agent CLI factory: parses argv, runs Hermes, extracts output, writes StepNode. */ diff --git a/packages/workflow-agent-hermes/src/index.ts b/packages/workflow-agent-hermes/src/index.ts index 3eb4d4a..cf147d7 100644 --- a/packages/workflow-agent-hermes/src/index.ts +++ b/packages/workflow-agent-hermes/src/index.ts @@ -1,2 +1,2 @@ -export { buildHermesPrompt, createHermesAgent } from "./hermes.js"; export { HermesAcpClient } from "./acp-client.js"; +export { buildHermesPrompt, createHermesAgent } from "./hermes.js"; -- 2.43.0 From 025695dbe9d6275bd58232a26ca95fea0c214075 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Fri, 22 May 2026 12:58:55 +0000 Subject: [PATCH 3/5] refactor: use @agentclientprotocol/sdk instead of hand-rolled JSON-RPC Replace 250-line custom ACP client with official TypeScript SDK. Uses ClientSideConnection + ndJsonStream for stdio transport. Same public API (connect/prompt/close), 115 lines, zero custom protocol code. Ref #398 --- package.json | 2 + packages/workflow-agent-hermes/package.json | 1 + .../workflow-agent-hermes/src/acp-client.ts | 241 +++++------------- 3 files changed, 63 insertions(+), 181 deletions(-) diff --git a/package.json b/package.json index dd52be1..508ef28 100644 --- a/package.json +++ b/package.json @@ -15,10 +15,12 @@ "release": "bun run build && bun test && node scripts/publish-all.mjs" }, "devDependencies": { + "@agentclientprotocol/sdk": "^0.22.1", "@biomejs/biome": "^2.4.14", "@changesets/cli": "^2.31.0", "@types/node": "^25.7.0", "@types/xxhashjs": "^0.2.4", + "@uncaged/workflow-agent-hermes": "workspace:*", "bun-types": "^1.3.13" } } diff --git a/packages/workflow-agent-hermes/package.json b/packages/workflow-agent-hermes/package.json index 62f3bd6..c98d03e 100644 --- a/packages/workflow-agent-hermes/package.json +++ b/packages/workflow-agent-hermes/package.json @@ -21,6 +21,7 @@ "test": "bun test" }, "dependencies": { + "@agentclientprotocol/sdk": "^0.22.1", "@uncaged/json-cas": "^0.4.0", "@uncaged/workflow-agent-kit": "workspace:^" }, diff --git a/packages/workflow-agent-hermes/src/acp-client.ts b/packages/workflow-agent-hermes/src/acp-client.ts index 5a41732..6d76a22 100644 --- a/packages/workflow-agent-hermes/src/acp-client.ts +++ b/packages/workflow-agent-hermes/src/acp-client.ts @@ -1,60 +1,51 @@ import type { ChildProcess } from "node:child_process"; import { spawn } from "node:child_process"; -import { createInterface } from "node:readline"; +import { Readable, Writable } from "node:stream"; +import type { + Client, + RequestPermissionRequest, + RequestPermissionResponse, + SessionNotification, +} from "@agentclientprotocol/sdk"; +import { ClientSideConnection, ndJsonStream, PROTOCOL_VERSION } from "@agentclientprotocol/sdk"; const HERMES_COMMAND = "hermes"; -const PROMPT_TIMEOUT_MS = 10 * 60 * 1000; -type JsonRpcRequest = { - jsonrpc: "2.0"; - id: number; - method: string; - params: Record; -}; +class UwfAcpClient implements Client { + private messageChunks: string[] = []; -type JsonRpcNotification = { - jsonrpc: "2.0"; - method: string; - params?: Record; -}; + resetChunks(): void { + this.messageChunks = []; + } -type JsonRpcResponse = { - jsonrpc: "2.0"; - id: number; - result: unknown; - error?: { code: number; message: string }; -}; + collectChunks(): string { + return this.messageChunks.join(""); + } -type PendingRequest = { - resolve: (value: JsonRpcResponse) => void; - reject: (reason: Error) => void; -}; + async sessionUpdate(params: SessionNotification): Promise { + const { update } = params; + if (update.sessionUpdate === "agent_message_chunk" && update.content.type === "text") { + this.messageChunks.push(update.content.text); + } + } -type SessionUpdateParams = { - update: { - sessionUpdate: string; - content?: { - text?: string; + async requestPermission(params: RequestPermissionRequest): Promise { + const firstOption = params.options[0]; + return { + outcome: { + outcome: "selected", + optionId: firstOption?.optionId ?? "", + }, }; - }; -}; - -function isSessionUpdateParams(params: unknown): params is SessionUpdateParams { - return ( - typeof params === "object" && - params !== null && - "update" in params && - typeof (params as Record).update === "object" - ); + } } export class HermesAcpClient { private process: ChildProcess | null = null; - private nextId = 1; + private connection: ClientSideConnection | null = null; private sessionId: string | null = null; - private pending = new Map(); private stderrBuffer = ""; - private messageChunks: string[] = []; + private client = new UwfAcpClient(); /** Spawn hermes acp, initialize, create session */ async connect(cwd: string): Promise { @@ -70,51 +61,32 @@ export class HermesAcpClient { this.stderrBuffer += chunk.toString(); }); - child.on("error", (cause) => { - const message = cause instanceof Error ? cause.message : String(cause); - this.rejectAll(new Error(`hermes acp spawn failed: ${message}`)); - }); + if (child.stdin === null || child.stdout === null) { + throw new Error("hermes acp process stdio is not available"); + } - child.on("close", (code) => { - if (code !== 0 && this.pending.size > 0) { + const input = Writable.toWeb(child.stdin); + const output = Readable.toWeb(child.stdout); + const stream = ndJsonStream(input, output); + + const clientRef = this.client; + const connection = new ClientSideConnection((_agent) => clientRef, stream); + this.connection = connection; + + connection.signal.addEventListener("abort", () => { + if (this.sessionId !== null) { const detail = this.stderrBuffer.trim() !== "" ? ` stderr=${this.stderrBuffer.trim()}` : ""; - this.rejectAll( - new Error(`hermes acp exited unexpectedly with code ${code ?? "null"}${detail}`), - ); + throw new Error(`hermes acp connection closed unexpectedly${detail}`); } }); - if (child.stdout === null) { - throw new Error("hermes acp process stdout is not available"); - } - const rl = createInterface({ input: child.stdout }); - rl.on("line", (line) => { - this.handleLine(line.trim()); + await connection.initialize({ + protocolVersion: PROTOCOL_VERSION, + clientCapabilities: {}, }); - const initResponse = await this.sendRequest("initialize", { - protocolVersion: 1, - clientInfo: { name: "uwf", version: "0.1.0" }, - capabilities: {}, - }); - - if ((initResponse as { error?: unknown }).error !== undefined) { - throw new Error( - `initialize failed: ${JSON.stringify((initResponse as { error: unknown }).error)}`, - ); - } - - this.sendNotification("initialized"); - - const sessionResponse = (await this.sendRequest("session/new", { - cwd, - mcpServers: [], - })) as { result: { sessionId: string } }; - - const sessionId = sessionResponse.result?.sessionId; - if (typeof sessionId !== "string" || sessionId === "") { - throw new Error(`session/new did not return a sessionId: ${JSON.stringify(sessionResponse)}`); - } + const sessionResult = await connection.newSession({ cwd, mcpServers: [] }); + const { sessionId } = sessionResult; this.sessionId = sessionId; return sessionId; @@ -122,29 +94,19 @@ export class HermesAcpClient { /** Send prompt and collect full response text */ async prompt(text: string): Promise<{ text: string; sessionId: string }> { - if (this.sessionId === null) { + if (this.connection === null || this.sessionId === null) { throw new Error("Not connected — call connect() first"); } - this.messageChunks = []; + this.client.resetChunks(); - const response = await this.sendRequest( - "session/prompt", - { - sessionId: this.sessionId, - prompt: [{ type: "text", text }], - }, - PROMPT_TIMEOUT_MS, - ); - - if ((response as { error?: unknown }).error !== undefined) { - throw new Error( - `session/prompt failed: ${JSON.stringify((response as { error: unknown }).error)}`, - ); - } + await this.connection.prompt({ + sessionId: this.sessionId, + prompt: [{ type: "text", text }], + }); return { - text: this.messageChunks.join(""), + text: this.client.collectChunks(), sessionId: this.sessionId, }; } @@ -154,6 +116,7 @@ export class HermesAcpClient { if (this.process === null) { return; } + this.sessionId = null; this.process.stdin?.end(); const proc = this.process; await new Promise((resolve) => { @@ -161,90 +124,6 @@ export class HermesAcpClient { setTimeout(resolve, 5000); }); this.process = null; - } - - private sendRequest( - method: string, - params: Record, - timeoutMs = 30_000, - ): Promise { - const id = this.nextId++; - const message: JsonRpcRequest = { jsonrpc: "2.0", id, method, params }; - return new Promise((resolve, reject) => { - const timer = setTimeout(() => { - this.pending.delete(id); - reject(new Error(`Timeout waiting for response to ${method} (id=${id})`)); - }, timeoutMs); - - this.pending.set(id, { - resolve: (value) => { - clearTimeout(timer); - resolve(value); - }, - reject: (err) => { - clearTimeout(timer); - reject(err); - }, - }); - - this.writeLine(JSON.stringify(message)); - }); - } - - private sendNotification(method: string, params?: Record): void { - const message: JsonRpcNotification = { jsonrpc: "2.0", method }; - if (params !== undefined) { - message.params = params; - } - this.writeLine(JSON.stringify(message)); - } - - private writeLine(line: string): void { - if (this.process?.stdin === null || this.process?.stdin === undefined) { - throw new Error("Cannot write: hermes acp process stdin not available"); - } - this.process.stdin.write(`${line}\n`); - } - - private handleLine(line: string): void { - if (line === "") { - return; - } - - let parsed: unknown; - try { - parsed = JSON.parse(line); - } catch { - return; - } - - const msg = parsed as Record; - - if ("id" in msg && msg.id !== undefined && msg.id !== null) { - const response = msg as unknown as JsonRpcResponse; - const handler = this.pending.get(response.id); - if (handler !== undefined) { - this.pending.delete(response.id); - handler.resolve(response); - } - return; - } - - if (msg.method === "session/update" && isSessionUpdateParams(msg.params)) { - const updateType = msg.params.update.sessionUpdate; - if (updateType === "agent_message_chunk") { - const chunk = msg.params.update.content?.text; - if (typeof chunk === "string") { - this.messageChunks.push(chunk); - } - } - } - } - - private rejectAll(err: Error): void { - for (const handler of this.pending.values()) { - handler.reject(err); - } - this.pending.clear(); + this.connection = null; } } -- 2.43.0 From 68af555313e5a5d230e799165a30e6c39b1ad342 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Fri, 22 May 2026 13:06:14 +0000 Subject: [PATCH 4/5] fix: share ACP client across run/continue for session continuity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The client is now created once in createHermesAgent() and shared by runHermes and continueHermes closures. This preserves conversation context during frontmatter retry loops — continue() sends a follow-up prompt on the same ACP session instead of starting a new one. Client is cleaned up via process.on('exit'). Ref #398 --- packages/workflow-agent-hermes/src/hermes.ts | 43 +++++++++++--------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/packages/workflow-agent-hermes/src/hermes.ts b/packages/workflow-agent-hermes/src/hermes.ts index 178d49f..3ce000d 100644 --- a/packages/workflow-agent-hermes/src/hermes.ts +++ b/packages/workflow-agent-hermes/src/hermes.ts @@ -45,38 +45,41 @@ export function buildHermesPrompt(ctx: AgentContext): string { return parts.join("\n"); } -async function runHermes(ctx: AgentContext): Promise { - const fullPrompt = buildHermesPrompt(ctx); +/** + * Agent CLI factory: parses argv, runs Hermes, extracts output, writes StepNode. + * + * A single ACP client is shared across run() and continue() calls so that + * frontmatter retry loops keep the same Hermes session context. The client + * is closed once the agent process exits (via process.on("exit")). + */ +export function createHermesAgent(): () => Promise { const client = new HermesAcpClient(); - try { + + // Ensure cleanup regardless of how the process exits. + process.on("exit", () => { + void client.close(); + }); + + async function runHermes(ctx: AgentContext): Promise { + const fullPrompt = buildHermesPrompt(ctx); await client.connect(process.cwd()); const { text, sessionId } = await client.prompt(fullPrompt); const detailHash = await storeHermesRawOutput(ctx.store, text); return { output: text, detailHash, sessionId }; - } finally { - await client.close(); } -} -async function continueHermes( - _sessionId: string, - message: string, - store: Store, -): Promise { - // ACP does not support resuming an external session; start a new session with the message as prompt - const client = new HermesAcpClient(); - try { - await client.connect(process.cwd()); + async function continueHermes( + _sessionId: string, + message: string, + store: Store, + ): Promise { + // Client is already connected from runHermes — same ACP session, + // so the agent sees the full conversation history (crucial for retries). const { text, sessionId } = await client.prompt(message); const detailHash = await storeHermesRawOutput(store, text); return { output: text, detailHash, sessionId }; - } finally { - await client.close(); } -} -/** Agent CLI factory: parses argv, runs Hermes, extracts output, writes StepNode. */ -export function createHermesAgent(): () => Promise { return createAgent({ name: "hermes", run: runHermes, -- 2.43.0 From f90614a6225b82a402bb29a723fcca35a5393659 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Fri, 22 May 2026 13:13:02 +0000 Subject: [PATCH 5/5] feat: collect structured turns from ACP session updates MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit UwfAcpClient now tracks all session/update events: - agent_message_chunk → assistant message content - agent_thought_chunk → assistant reasoning - tool_call → pending tool invocation (name + rawInput) - tool_call_update (completed/failed) → assistant tool_call + tool result Messages are accumulated across prompts (same session) and stored via storeHermesSessionDetail, restoring the full structured detail (turns with tool calls, reasoning) that was lost in the initial ACP migration. Ref #398 --- .../__tests__/acp-client.test.ts | 21 +++ .../workflow-agent-hermes/src/acp-client.ts | 126 ++++++++++++++++-- packages/workflow-agent-hermes/src/hermes.ts | 12 +- 3 files changed, 143 insertions(+), 16 deletions(-) diff --git a/packages/workflow-agent-hermes/__tests__/acp-client.test.ts b/packages/workflow-agent-hermes/__tests__/acp-client.test.ts index ea3a7a5..54db937 100644 --- a/packages/workflow-agent-hermes/__tests__/acp-client.test.ts +++ b/packages/workflow-agent-hermes/__tests__/acp-client.test.ts @@ -53,4 +53,25 @@ describe("HermesAcpClient", () => { }, { timeout: 2 * 60 * 1000 }, ); + + it( + "prompt() collects structured messages including tool calls", + async () => { + await client.connect(process.cwd()); + const result = await client.prompt("Run this command: echo TOOL_DETAIL_TEST"); + expect(result.messages.length).toBeGreaterThan(0); + // Should have at least one tool message (the echo command) + const toolMessages = result.messages.filter((m) => m.role === "tool"); + expect(toolMessages.length).toBeGreaterThan(0); + // Tool message should contain the output + const toolContent = toolMessages[0]?.content ?? ""; + expect(toolContent).toContain("TOOL_DETAIL_TEST"); + // Should have assistant messages with tool_calls + const assistantWithTools = result.messages.filter( + (m) => m.role === "assistant" && m.tool_calls !== null, + ); + expect(assistantWithTools.length).toBeGreaterThan(0); + }, + { timeout: 2 * 60 * 1000 }, + ); }); diff --git a/packages/workflow-agent-hermes/src/acp-client.ts b/packages/workflow-agent-hermes/src/acp-client.ts index 6d76a22..74d55f1 100644 --- a/packages/workflow-agent-hermes/src/acp-client.ts +++ b/packages/workflow-agent-hermes/src/acp-client.ts @@ -9,26 +9,109 @@ import type { } from "@agentclientprotocol/sdk"; import { ClientSideConnection, ndJsonStream, PROTOCOL_VERSION } from "@agentclientprotocol/sdk"; +import type { HermesSessionMessage } from "./types.js"; + const HERMES_COMMAND = "hermes"; +/** Tracks in-flight tool calls so we can build complete messages when they finish. */ +type PendingToolCall = { + name: string; + args: string; +}; + +/** + * Collects ACP session/update events into a list of {@link HermesSessionMessage} + * that mirrors what Hermes writes to its session JSONL files. + */ class UwfAcpClient implements Client { private messageChunks: string[] = []; + private reasoningChunks: string[] = []; + private pendingTools = new Map(); + messages: HermesSessionMessage[] = []; - resetChunks(): void { + resetPerPrompt(): void { this.messageChunks = []; - } - - collectChunks(): string { - return this.messageChunks.join(""); + this.reasoningChunks = []; } async sessionUpdate(params: SessionNotification): Promise { const { update } = params; - if (update.sessionUpdate === "agent_message_chunk" && update.content.type === "text") { - this.messageChunks.push(update.content.text); + switch (update.sessionUpdate) { + case "agent_message_chunk": + if (update.content.type === "text") { + this.messageChunks.push(update.content.text); + } + break; + + case "agent_thought_chunk": + if (update.content.type === "text") { + this.reasoningChunks.push(update.content.text); + } + break; + + case "tool_call": { + // Agent is invoking a tool — record the call. + const title = update.title ?? ""; + const rawInput = + update.rawInput !== undefined && update.rawInput !== null + ? JSON.stringify(update.rawInput) + : ""; + this.pendingTools.set(update.toolCallId, { name: title, args: rawInput }); + + // Flush accumulated assistant text + reasoning as an assistant message + // (the agent "spoke" before calling the tool). + this.flushAssistantMessage(); + break; + } + + case "tool_call_update": { + if (update.status === "completed" || update.status === "failed") { + const pending = this.pendingTools.get(update.toolCallId); + const toolName = pending?.name ?? update.toolCallId; + const rawOutput = + update.rawOutput !== undefined && update.rawOutput !== null + ? typeof update.rawOutput === "string" + ? update.rawOutput + : JSON.stringify(update.rawOutput) + : ""; + this.messages.push({ + role: "assistant", + content: null, + reasoning: null, + tool_calls: [{ function: { name: toolName, arguments: pending?.args ?? "" } }], + }); + this.messages.push({ + role: "tool", + content: rawOutput, + reasoning: null, + tool_calls: null, + }); + this.pendingTools.delete(update.toolCallId); + } + break; + } + + default: + break; } } + /** Flush any accumulated text/reasoning into an assistant message. */ + flushAssistantMessage(): void { + const text = this.messageChunks.join(""); + const reasoning = this.reasoningChunks.join(""); + if (text !== "" || reasoning !== "") { + this.messages.push({ + role: "assistant", + content: text || null, + reasoning: reasoning || null, + tool_calls: null, + }); + } + this.messageChunks = []; + this.reasoningChunks = []; + } + async requestPermission(params: RequestPermissionRequest): Promise { const firstOption = params.options[0]; return { @@ -40,6 +123,12 @@ class UwfAcpClient implements Client { } } +export type AcpPromptResult = { + text: string; + sessionId: string; + messages: HermesSessionMessage[]; +}; + export class HermesAcpClient { private process: ChildProcess | null = null; private connection: ClientSideConnection | null = null; @@ -92,22 +181,37 @@ export class HermesAcpClient { return sessionId; } - /** Send prompt and collect full response text */ - async prompt(text: string): Promise<{ text: string; sessionId: string }> { + /** Send prompt and collect full response text + structured messages. */ + async prompt(text: string): Promise { if (this.connection === null || this.sessionId === null) { throw new Error("Not connected — call connect() first"); } - this.client.resetChunks(); + this.client.resetPerPrompt(); await this.connection.prompt({ sessionId: this.sessionId, prompt: [{ type: "text", text }], }); + // Flush any trailing assistant text that wasn't followed by a tool call. + this.client.flushAssistantMessage(); + + // Extract the final assistant text from collected messages. + const messages = this.client.messages; + let finalText = ""; + for (let i = messages.length - 1; i >= 0; i--) { + const msg = messages[i]; + if (msg !== undefined && msg.role === "assistant" && msg.content !== null && msg.content.trim() !== "") { + finalText = msg.content; + break; + } + } + return { - text: this.client.collectChunks(), + text: finalText, sessionId: this.sessionId, + messages, }; } diff --git a/packages/workflow-agent-hermes/src/hermes.ts b/packages/workflow-agent-hermes/src/hermes.ts index 3ce000d..6d3066f 100644 --- a/packages/workflow-agent-hermes/src/hermes.ts +++ b/packages/workflow-agent-hermes/src/hermes.ts @@ -8,7 +8,7 @@ import { } from "@uncaged/workflow-agent-kit"; import { HermesAcpClient } from "./acp-client.js"; -import { storeHermesRawOutput } from "./session-detail.js"; +import { storeHermesSessionDetail } from "./session-detail.js"; function buildHistorySummary(steps: AgentContext["steps"]): string { if (steps.length === 0) { @@ -63,8 +63,9 @@ export function createHermesAgent(): () => Promise { async function runHermes(ctx: AgentContext): Promise { const fullPrompt = buildHermesPrompt(ctx); await client.connect(process.cwd()); - const { text, sessionId } = await client.prompt(fullPrompt); - const detailHash = await storeHermesRawOutput(ctx.store, text); + const { text, sessionId, messages } = await client.prompt(fullPrompt); + const session = { session_id: sessionId, model: "", session_start: new Date().toISOString(), messages }; + const { detailHash } = await storeHermesSessionDetail(ctx.store, session); return { output: text, detailHash, sessionId }; } @@ -75,8 +76,9 @@ export function createHermesAgent(): () => Promise { ): Promise { // Client is already connected from runHermes — same ACP session, // so the agent sees the full conversation history (crucial for retries). - const { text, sessionId } = await client.prompt(message); - const detailHash = await storeHermesRawOutput(store, text); + const { text, sessionId, messages } = await client.prompt(message); + const session = { session_id: sessionId, model: "", session_start: new Date().toISOString(), messages }; + const { detailHash } = await storeHermesSessionDetail(store, session); return { output: text, detailHash, sessionId }; } -- 2.43.0