From 025695dbe9d6275bd58232a26ca95fea0c214075 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Fri, 22 May 2026 12:58:55 +0000 Subject: [PATCH] refactor: use @agentclientprotocol/sdk instead of hand-rolled JSON-RPC Replace 250-line custom ACP client with official TypeScript SDK. Uses ClientSideConnection + ndJsonStream for stdio transport. Same public API (connect/prompt/close), 115 lines, zero custom protocol code. Ref #398 --- package.json | 2 + packages/workflow-agent-hermes/package.json | 1 + .../workflow-agent-hermes/src/acp-client.ts | 241 +++++------------- 3 files changed, 63 insertions(+), 181 deletions(-) diff --git a/package.json b/package.json index dd52be1..508ef28 100644 --- a/package.json +++ b/package.json @@ -15,10 +15,12 @@ "release": "bun run build && bun test && node scripts/publish-all.mjs" }, "devDependencies": { + "@agentclientprotocol/sdk": "^0.22.1", "@biomejs/biome": "^2.4.14", "@changesets/cli": "^2.31.0", "@types/node": "^25.7.0", "@types/xxhashjs": "^0.2.4", + "@uncaged/workflow-agent-hermes": "workspace:*", "bun-types": "^1.3.13" } } diff --git a/packages/workflow-agent-hermes/package.json b/packages/workflow-agent-hermes/package.json index 62f3bd6..c98d03e 100644 --- a/packages/workflow-agent-hermes/package.json +++ b/packages/workflow-agent-hermes/package.json @@ -21,6 +21,7 @@ "test": "bun test" }, "dependencies": { + "@agentclientprotocol/sdk": "^0.22.1", "@uncaged/json-cas": "^0.4.0", "@uncaged/workflow-agent-kit": "workspace:^" }, diff --git a/packages/workflow-agent-hermes/src/acp-client.ts b/packages/workflow-agent-hermes/src/acp-client.ts index 5a41732..6d76a22 100644 --- a/packages/workflow-agent-hermes/src/acp-client.ts +++ b/packages/workflow-agent-hermes/src/acp-client.ts @@ -1,60 +1,51 @@ import type { ChildProcess } from "node:child_process"; import { spawn } from "node:child_process"; -import { createInterface } from "node:readline"; +import { Readable, Writable } from "node:stream"; +import type { + Client, + RequestPermissionRequest, + RequestPermissionResponse, + SessionNotification, +} from "@agentclientprotocol/sdk"; +import { ClientSideConnection, ndJsonStream, PROTOCOL_VERSION } from "@agentclientprotocol/sdk"; const HERMES_COMMAND = "hermes"; -const PROMPT_TIMEOUT_MS = 10 * 60 * 1000; -type JsonRpcRequest = { - jsonrpc: "2.0"; - id: number; - method: string; - params: Record; -}; +class UwfAcpClient implements Client { + private messageChunks: string[] = []; -type JsonRpcNotification = { - jsonrpc: "2.0"; - method: string; - params?: Record; -}; + resetChunks(): void { + this.messageChunks = []; + } -type JsonRpcResponse = { - jsonrpc: "2.0"; - id: number; - result: unknown; - error?: { code: number; message: string }; -}; + collectChunks(): string { + return this.messageChunks.join(""); + } -type PendingRequest = { - resolve: (value: JsonRpcResponse) => void; - reject: (reason: Error) => void; -}; + async sessionUpdate(params: SessionNotification): Promise { + const { update } = params; + if (update.sessionUpdate === "agent_message_chunk" && update.content.type === "text") { + this.messageChunks.push(update.content.text); + } + } -type SessionUpdateParams = { - update: { - sessionUpdate: string; - content?: { - text?: string; + async requestPermission(params: RequestPermissionRequest): Promise { + const firstOption = params.options[0]; + return { + outcome: { + outcome: "selected", + optionId: firstOption?.optionId ?? "", + }, }; - }; -}; - -function isSessionUpdateParams(params: unknown): params is SessionUpdateParams { - return ( - typeof params === "object" && - params !== null && - "update" in params && - typeof (params as Record).update === "object" - ); + } } export class HermesAcpClient { private process: ChildProcess | null = null; - private nextId = 1; + private connection: ClientSideConnection | null = null; private sessionId: string | null = null; - private pending = new Map(); private stderrBuffer = ""; - private messageChunks: string[] = []; + private client = new UwfAcpClient(); /** Spawn hermes acp, initialize, create session */ async connect(cwd: string): Promise { @@ -70,51 +61,32 @@ export class HermesAcpClient { this.stderrBuffer += chunk.toString(); }); - child.on("error", (cause) => { - const message = cause instanceof Error ? cause.message : String(cause); - this.rejectAll(new Error(`hermes acp spawn failed: ${message}`)); - }); + if (child.stdin === null || child.stdout === null) { + throw new Error("hermes acp process stdio is not available"); + } - child.on("close", (code) => { - if (code !== 0 && this.pending.size > 0) { + const input = Writable.toWeb(child.stdin); + const output = Readable.toWeb(child.stdout); + const stream = ndJsonStream(input, output); + + const clientRef = this.client; + const connection = new ClientSideConnection((_agent) => clientRef, stream); + this.connection = connection; + + connection.signal.addEventListener("abort", () => { + if (this.sessionId !== null) { const detail = this.stderrBuffer.trim() !== "" ? ` stderr=${this.stderrBuffer.trim()}` : ""; - this.rejectAll( - new Error(`hermes acp exited unexpectedly with code ${code ?? "null"}${detail}`), - ); + throw new Error(`hermes acp connection closed unexpectedly${detail}`); } }); - if (child.stdout === null) { - throw new Error("hermes acp process stdout is not available"); - } - const rl = createInterface({ input: child.stdout }); - rl.on("line", (line) => { - this.handleLine(line.trim()); + await connection.initialize({ + protocolVersion: PROTOCOL_VERSION, + clientCapabilities: {}, }); - const initResponse = await this.sendRequest("initialize", { - protocolVersion: 1, - clientInfo: { name: "uwf", version: "0.1.0" }, - capabilities: {}, - }); - - if ((initResponse as { error?: unknown }).error !== undefined) { - throw new Error( - `initialize failed: ${JSON.stringify((initResponse as { error: unknown }).error)}`, - ); - } - - this.sendNotification("initialized"); - - const sessionResponse = (await this.sendRequest("session/new", { - cwd, - mcpServers: [], - })) as { result: { sessionId: string } }; - - const sessionId = sessionResponse.result?.sessionId; - if (typeof sessionId !== "string" || sessionId === "") { - throw new Error(`session/new did not return a sessionId: ${JSON.stringify(sessionResponse)}`); - } + const sessionResult = await connection.newSession({ cwd, mcpServers: [] }); + const { sessionId } = sessionResult; this.sessionId = sessionId; return sessionId; @@ -122,29 +94,19 @@ export class HermesAcpClient { /** Send prompt and collect full response text */ async prompt(text: string): Promise<{ text: string; sessionId: string }> { - if (this.sessionId === null) { + if (this.connection === null || this.sessionId === null) { throw new Error("Not connected — call connect() first"); } - this.messageChunks = []; + this.client.resetChunks(); - const response = await this.sendRequest( - "session/prompt", - { - sessionId: this.sessionId, - prompt: [{ type: "text", text }], - }, - PROMPT_TIMEOUT_MS, - ); - - if ((response as { error?: unknown }).error !== undefined) { - throw new Error( - `session/prompt failed: ${JSON.stringify((response as { error: unknown }).error)}`, - ); - } + await this.connection.prompt({ + sessionId: this.sessionId, + prompt: [{ type: "text", text }], + }); return { - text: this.messageChunks.join(""), + text: this.client.collectChunks(), sessionId: this.sessionId, }; } @@ -154,6 +116,7 @@ export class HermesAcpClient { if (this.process === null) { return; } + this.sessionId = null; this.process.stdin?.end(); const proc = this.process; await new Promise((resolve) => { @@ -161,90 +124,6 @@ export class HermesAcpClient { setTimeout(resolve, 5000); }); this.process = null; - } - - private sendRequest( - method: string, - params: Record, - timeoutMs = 30_000, - ): Promise { - const id = this.nextId++; - const message: JsonRpcRequest = { jsonrpc: "2.0", id, method, params }; - return new Promise((resolve, reject) => { - const timer = setTimeout(() => { - this.pending.delete(id); - reject(new Error(`Timeout waiting for response to ${method} (id=${id})`)); - }, timeoutMs); - - this.pending.set(id, { - resolve: (value) => { - clearTimeout(timer); - resolve(value); - }, - reject: (err) => { - clearTimeout(timer); - reject(err); - }, - }); - - this.writeLine(JSON.stringify(message)); - }); - } - - private sendNotification(method: string, params?: Record): void { - const message: JsonRpcNotification = { jsonrpc: "2.0", method }; - if (params !== undefined) { - message.params = params; - } - this.writeLine(JSON.stringify(message)); - } - - private writeLine(line: string): void { - if (this.process?.stdin === null || this.process?.stdin === undefined) { - throw new Error("Cannot write: hermes acp process stdin not available"); - } - this.process.stdin.write(`${line}\n`); - } - - private handleLine(line: string): void { - if (line === "") { - return; - } - - let parsed: unknown; - try { - parsed = JSON.parse(line); - } catch { - return; - } - - const msg = parsed as Record; - - if ("id" in msg && msg.id !== undefined && msg.id !== null) { - const response = msg as unknown as JsonRpcResponse; - const handler = this.pending.get(response.id); - if (handler !== undefined) { - this.pending.delete(response.id); - handler.resolve(response); - } - return; - } - - if (msg.method === "session/update" && isSessionUpdateParams(msg.params)) { - const updateType = msg.params.update.sessionUpdate; - if (updateType === "agent_message_chunk") { - const chunk = msg.params.update.content?.text; - if (typeof chunk === "string") { - this.messageChunks.push(chunk); - } - } - } - } - - private rejectAll(err: Error): void { - for (const handler of this.pending.values()) { - handler.reject(err); - } - this.pending.clear(); + this.connection = null; } }