From 766ec7ddc2c14bccecd5984d5427bc93a2f56071 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Fri, 22 May 2026 12:15:09 +0000 Subject: [PATCH] feat: add HermesAcpClient for structured ACP communication MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements JSON-RPC client that communicates with `hermes acp` via stdin/stdout. Replaces fragile stdout/stderr parsing with structured protocol: initialize → session/new → session/prompt → collect chunks. Session ID comes directly from protocol response, eliminating the race condition in #380. Phase 1 of RFC #398 --- .../__tests__/acp-client.test.ts | 56 ++++ .../workflow-agent-hermes/src/acp-client.ts | 250 ++++++++++++++++++ packages/workflow-agent-hermes/src/index.ts | 1 + 3 files changed, 307 insertions(+) create mode 100644 packages/workflow-agent-hermes/__tests__/acp-client.test.ts create mode 100644 packages/workflow-agent-hermes/src/acp-client.ts diff --git a/packages/workflow-agent-hermes/__tests__/acp-client.test.ts b/packages/workflow-agent-hermes/__tests__/acp-client.test.ts new file mode 100644 index 0000000..ea3a7a5 --- /dev/null +++ b/packages/workflow-agent-hermes/__tests__/acp-client.test.ts @@ -0,0 +1,56 @@ +import { afterEach, beforeEach, describe, expect, it } from "bun:test"; + +import { HermesAcpClient } from "../src/acp-client.js"; + +const UUID_RE = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i; + +describe("HermesAcpClient", () => { + let client: HermesAcpClient; + + beforeEach(() => { + client = new HermesAcpClient(); + }); + + afterEach(async () => { + await client.close(); + }); + + it( + "connect() returns a UUID sessionId", + async () => { + const sessionId = await client.connect(process.cwd()); + expect(typeof sessionId).toBe("string"); + expect(sessionId).toMatch(UUID_RE); + }, + { timeout: 2 * 60 * 1000 }, + ); + + it( + "prompt() returns a non-empty text response", + async () => { + await client.connect(process.cwd()); + const result = await client.prompt("Reply with exactly the word: PONG"); + expect(typeof result.text).toBe("string"); + expect(result.text.length).toBeGreaterThan(0); + expect(typeof result.sessionId).toBe("string"); + expect(result.sessionId).toMatch(UUID_RE); + }, + { timeout: 2 * 60 * 1000 }, + ); + + it( + "prompt() can be called twice on the same session (resume)", + async () => { + await client.connect(process.cwd()); + + const first = await client.prompt("Say the word ALPHA and nothing else."); + expect(first.text.length).toBeGreaterThan(0); + + const second = await client.prompt("Now say the word BETA and nothing else."); + expect(second.text.length).toBeGreaterThan(0); + + expect(first.sessionId).toBe(second.sessionId); + }, + { timeout: 2 * 60 * 1000 }, + ); +}); diff --git a/packages/workflow-agent-hermes/src/acp-client.ts b/packages/workflow-agent-hermes/src/acp-client.ts new file mode 100644 index 0000000..5a41732 --- /dev/null +++ b/packages/workflow-agent-hermes/src/acp-client.ts @@ -0,0 +1,250 @@ +import type { ChildProcess } from "node:child_process"; +import { spawn } from "node:child_process"; +import { createInterface } from "node:readline"; + +const HERMES_COMMAND = "hermes"; +const PROMPT_TIMEOUT_MS = 10 * 60 * 1000; + +type JsonRpcRequest = { + jsonrpc: "2.0"; + id: number; + method: string; + params: Record; +}; + +type JsonRpcNotification = { + jsonrpc: "2.0"; + method: string; + params?: Record; +}; + +type JsonRpcResponse = { + jsonrpc: "2.0"; + id: number; + result: unknown; + error?: { code: number; message: string }; +}; + +type PendingRequest = { + resolve: (value: JsonRpcResponse) => void; + reject: (reason: Error) => void; +}; + +type SessionUpdateParams = { + update: { + sessionUpdate: string; + content?: { + text?: string; + }; + }; +}; + +function isSessionUpdateParams(params: unknown): params is SessionUpdateParams { + return ( + typeof params === "object" && + params !== null && + "update" in params && + typeof (params as Record).update === "object" + ); +} + +export class HermesAcpClient { + private process: ChildProcess | null = null; + private nextId = 1; + private sessionId: string | null = null; + private pending = new Map(); + private stderrBuffer = ""; + private messageChunks: string[] = []; + + /** Spawn hermes acp, initialize, create session */ + async connect(cwd: string): Promise { + const child = spawn(HERMES_COMMAND, ["acp"], { + env: process.env, + shell: false, + stdio: ["pipe", "pipe", "pipe"], + }); + + this.process = child; + + child.stderr?.on("data", (chunk: Buffer) => { + this.stderrBuffer += chunk.toString(); + }); + + child.on("error", (cause) => { + const message = cause instanceof Error ? cause.message : String(cause); + this.rejectAll(new Error(`hermes acp spawn failed: ${message}`)); + }); + + child.on("close", (code) => { + if (code !== 0 && this.pending.size > 0) { + const detail = this.stderrBuffer.trim() !== "" ? ` stderr=${this.stderrBuffer.trim()}` : ""; + this.rejectAll( + new Error(`hermes acp exited unexpectedly with code ${code ?? "null"}${detail}`), + ); + } + }); + + if (child.stdout === null) { + throw new Error("hermes acp process stdout is not available"); + } + const rl = createInterface({ input: child.stdout }); + rl.on("line", (line) => { + this.handleLine(line.trim()); + }); + + const initResponse = await this.sendRequest("initialize", { + protocolVersion: 1, + clientInfo: { name: "uwf", version: "0.1.0" }, + capabilities: {}, + }); + + if ((initResponse as { error?: unknown }).error !== undefined) { + throw new Error( + `initialize failed: ${JSON.stringify((initResponse as { error: unknown }).error)}`, + ); + } + + this.sendNotification("initialized"); + + const sessionResponse = (await this.sendRequest("session/new", { + cwd, + mcpServers: [], + })) as { result: { sessionId: string } }; + + const sessionId = sessionResponse.result?.sessionId; + if (typeof sessionId !== "string" || sessionId === "") { + throw new Error(`session/new did not return a sessionId: ${JSON.stringify(sessionResponse)}`); + } + + this.sessionId = sessionId; + return sessionId; + } + + /** Send prompt and collect full response text */ + async prompt(text: string): Promise<{ text: string; sessionId: string }> { + if (this.sessionId === null) { + throw new Error("Not connected — call connect() first"); + } + + this.messageChunks = []; + + const response = await this.sendRequest( + "session/prompt", + { + sessionId: this.sessionId, + prompt: [{ type: "text", text }], + }, + PROMPT_TIMEOUT_MS, + ); + + if ((response as { error?: unknown }).error !== undefined) { + throw new Error( + `session/prompt failed: ${JSON.stringify((response as { error: unknown }).error)}`, + ); + } + + return { + text: this.messageChunks.join(""), + sessionId: this.sessionId, + }; + } + + /** Close the connection */ + async close(): Promise { + if (this.process === null) { + return; + } + this.process.stdin?.end(); + const proc = this.process; + await new Promise((resolve) => { + proc.on("close", () => resolve()); + setTimeout(resolve, 5000); + }); + this.process = null; + } + + private sendRequest( + method: string, + params: Record, + timeoutMs = 30_000, + ): Promise { + const id = this.nextId++; + const message: JsonRpcRequest = { jsonrpc: "2.0", id, method, params }; + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + this.pending.delete(id); + reject(new Error(`Timeout waiting for response to ${method} (id=${id})`)); + }, timeoutMs); + + this.pending.set(id, { + resolve: (value) => { + clearTimeout(timer); + resolve(value); + }, + reject: (err) => { + clearTimeout(timer); + reject(err); + }, + }); + + this.writeLine(JSON.stringify(message)); + }); + } + + private sendNotification(method: string, params?: Record): void { + const message: JsonRpcNotification = { jsonrpc: "2.0", method }; + if (params !== undefined) { + message.params = params; + } + this.writeLine(JSON.stringify(message)); + } + + private writeLine(line: string): void { + if (this.process?.stdin === null || this.process?.stdin === undefined) { + throw new Error("Cannot write: hermes acp process stdin not available"); + } + this.process.stdin.write(`${line}\n`); + } + + private handleLine(line: string): void { + if (line === "") { + return; + } + + let parsed: unknown; + try { + parsed = JSON.parse(line); + } catch { + return; + } + + const msg = parsed as Record; + + if ("id" in msg && msg.id !== undefined && msg.id !== null) { + const response = msg as unknown as JsonRpcResponse; + const handler = this.pending.get(response.id); + if (handler !== undefined) { + this.pending.delete(response.id); + handler.resolve(response); + } + return; + } + + if (msg.method === "session/update" && isSessionUpdateParams(msg.params)) { + const updateType = msg.params.update.sessionUpdate; + if (updateType === "agent_message_chunk") { + const chunk = msg.params.update.content?.text; + if (typeof chunk === "string") { + this.messageChunks.push(chunk); + } + } + } + } + + private rejectAll(err: Error): void { + for (const handler of this.pending.values()) { + handler.reject(err); + } + this.pending.clear(); + } +} diff --git a/packages/workflow-agent-hermes/src/index.ts b/packages/workflow-agent-hermes/src/index.ts index b27f12a..3eb4d4a 100644 --- a/packages/workflow-agent-hermes/src/index.ts +++ b/packages/workflow-agent-hermes/src/index.ts @@ -1 +1,2 @@ export { buildHermesPrompt, createHermesAgent } from "./hermes.js"; +export { HermesAcpClient } from "./acp-client.js";