refactor: use @agentclientprotocol/sdk instead of hand-rolled JSON-RPC

Replace 250-line custom ACP client with official TypeScript SDK.
Uses ClientSideConnection + ndJsonStream for stdio transport.
Same public API (connect/prompt/close), 115 lines, zero custom protocol code.

Ref #398
This commit is contained in:
2026-05-22 12:58:55 +00:00
parent 96584e481f
commit 025695dbe9
3 changed files with 63 additions and 181 deletions
+2
View File
@@ -15,10 +15,12 @@
"release": "bun run build && bun test && node scripts/publish-all.mjs" "release": "bun run build && bun test && node scripts/publish-all.mjs"
}, },
"devDependencies": { "devDependencies": {
"@agentclientprotocol/sdk": "^0.22.1",
"@biomejs/biome": "^2.4.14", "@biomejs/biome": "^2.4.14",
"@changesets/cli": "^2.31.0", "@changesets/cli": "^2.31.0",
"@types/node": "^25.7.0", "@types/node": "^25.7.0",
"@types/xxhashjs": "^0.2.4", "@types/xxhashjs": "^0.2.4",
"@uncaged/workflow-agent-hermes": "workspace:*",
"bun-types": "^1.3.13" "bun-types": "^1.3.13"
} }
} }
@@ -21,6 +21,7 @@
"test": "bun test" "test": "bun test"
}, },
"dependencies": { "dependencies": {
"@agentclientprotocol/sdk": "^0.22.1",
"@uncaged/json-cas": "^0.4.0", "@uncaged/json-cas": "^0.4.0",
"@uncaged/workflow-agent-kit": "workspace:^" "@uncaged/workflow-agent-kit": "workspace:^"
}, },
+60 -181
View File
@@ -1,60 +1,51 @@
import type { ChildProcess } from "node:child_process"; import type { ChildProcess } from "node:child_process";
import { spawn } from "node:child_process"; import { spawn } from "node:child_process";
import { createInterface } from "node:readline"; import { Readable, Writable } from "node:stream";
import type {
Client,
RequestPermissionRequest,
RequestPermissionResponse,
SessionNotification,
} from "@agentclientprotocol/sdk";
import { ClientSideConnection, ndJsonStream, PROTOCOL_VERSION } from "@agentclientprotocol/sdk";
const HERMES_COMMAND = "hermes"; const HERMES_COMMAND = "hermes";
const PROMPT_TIMEOUT_MS = 10 * 60 * 1000;
type JsonRpcRequest = { class UwfAcpClient implements Client {
jsonrpc: "2.0"; private messageChunks: string[] = [];
id: number;
method: string;
params: Record<string, unknown>;
};
type JsonRpcNotification = { resetChunks(): void {
jsonrpc: "2.0"; this.messageChunks = [];
method: string; }
params?: Record<string, unknown>;
};
type JsonRpcResponse = { collectChunks(): string {
jsonrpc: "2.0"; return this.messageChunks.join("");
id: number; }
result: unknown;
error?: { code: number; message: string };
};
type PendingRequest = { async sessionUpdate(params: SessionNotification): Promise<void> {
resolve: (value: JsonRpcResponse) => void; const { update } = params;
reject: (reason: Error) => void; if (update.sessionUpdate === "agent_message_chunk" && update.content.type === "text") {
}; this.messageChunks.push(update.content.text);
}
}
type SessionUpdateParams = { async requestPermission(params: RequestPermissionRequest): Promise<RequestPermissionResponse> {
update: { const firstOption = params.options[0];
sessionUpdate: string; return {
content?: { outcome: {
text?: string; outcome: "selected",
optionId: firstOption?.optionId ?? "",
},
}; };
}; }
};
function isSessionUpdateParams(params: unknown): params is SessionUpdateParams {
return (
typeof params === "object" &&
params !== null &&
"update" in params &&
typeof (params as Record<string, unknown>).update === "object"
);
} }
export class HermesAcpClient { export class HermesAcpClient {
private process: ChildProcess | null = null; private process: ChildProcess | null = null;
private nextId = 1; private connection: ClientSideConnection | null = null;
private sessionId: string | null = null; private sessionId: string | null = null;
private pending = new Map<number, PendingRequest>();
private stderrBuffer = ""; private stderrBuffer = "";
private messageChunks: string[] = []; private client = new UwfAcpClient();
/** Spawn hermes acp, initialize, create session */ /** Spawn hermes acp, initialize, create session */
async connect(cwd: string): Promise<string> { async connect(cwd: string): Promise<string> {
@@ -70,51 +61,32 @@ export class HermesAcpClient {
this.stderrBuffer += chunk.toString(); this.stderrBuffer += chunk.toString();
}); });
child.on("error", (cause) => { if (child.stdin === null || child.stdout === null) {
const message = cause instanceof Error ? cause.message : String(cause); throw new Error("hermes acp process stdio is not available");
this.rejectAll(new Error(`hermes acp spawn failed: ${message}`)); }
});
child.on("close", (code) => { const input = Writable.toWeb(child.stdin);
if (code !== 0 && this.pending.size > 0) { const output = Readable.toWeb(child.stdout);
const stream = ndJsonStream(input, output);
const clientRef = this.client;
const connection = new ClientSideConnection((_agent) => clientRef, stream);
this.connection = connection;
connection.signal.addEventListener("abort", () => {
if (this.sessionId !== null) {
const detail = this.stderrBuffer.trim() !== "" ? ` stderr=${this.stderrBuffer.trim()}` : ""; const detail = this.stderrBuffer.trim() !== "" ? ` stderr=${this.stderrBuffer.trim()}` : "";
this.rejectAll( throw new Error(`hermes acp connection closed unexpectedly${detail}`);
new Error(`hermes acp exited unexpectedly with code ${code ?? "null"}${detail}`),
);
} }
}); });
if (child.stdout === null) { await connection.initialize({
throw new Error("hermes acp process stdout is not available"); protocolVersion: PROTOCOL_VERSION,
} clientCapabilities: {},
const rl = createInterface({ input: child.stdout });
rl.on("line", (line) => {
this.handleLine(line.trim());
}); });
const initResponse = await this.sendRequest("initialize", { const sessionResult = await connection.newSession({ cwd, mcpServers: [] });
protocolVersion: 1, const { sessionId } = sessionResult;
clientInfo: { name: "uwf", version: "0.1.0" },
capabilities: {},
});
if ((initResponse as { error?: unknown }).error !== undefined) {
throw new Error(
`initialize failed: ${JSON.stringify((initResponse as { error: unknown }).error)}`,
);
}
this.sendNotification("initialized");
const sessionResponse = (await this.sendRequest("session/new", {
cwd,
mcpServers: [],
})) as { result: { sessionId: string } };
const sessionId = sessionResponse.result?.sessionId;
if (typeof sessionId !== "string" || sessionId === "") {
throw new Error(`session/new did not return a sessionId: ${JSON.stringify(sessionResponse)}`);
}
this.sessionId = sessionId; this.sessionId = sessionId;
return sessionId; return sessionId;
@@ -122,29 +94,19 @@ export class HermesAcpClient {
/** Send prompt and collect full response text */ /** Send prompt and collect full response text */
async prompt(text: string): Promise<{ text: string; sessionId: string }> { async prompt(text: string): Promise<{ text: string; sessionId: string }> {
if (this.sessionId === null) { if (this.connection === null || this.sessionId === null) {
throw new Error("Not connected — call connect() first"); throw new Error("Not connected — call connect() first");
} }
this.messageChunks = []; this.client.resetChunks();
const response = await this.sendRequest( await this.connection.prompt({
"session/prompt", sessionId: this.sessionId,
{ prompt: [{ type: "text", text }],
sessionId: this.sessionId, });
prompt: [{ type: "text", text }],
},
PROMPT_TIMEOUT_MS,
);
if ((response as { error?: unknown }).error !== undefined) {
throw new Error(
`session/prompt failed: ${JSON.stringify((response as { error: unknown }).error)}`,
);
}
return { return {
text: this.messageChunks.join(""), text: this.client.collectChunks(),
sessionId: this.sessionId, sessionId: this.sessionId,
}; };
} }
@@ -154,6 +116,7 @@ export class HermesAcpClient {
if (this.process === null) { if (this.process === null) {
return; return;
} }
this.sessionId = null;
this.process.stdin?.end(); this.process.stdin?.end();
const proc = this.process; const proc = this.process;
await new Promise<void>((resolve) => { await new Promise<void>((resolve) => {
@@ -161,90 +124,6 @@ export class HermesAcpClient {
setTimeout(resolve, 5000); setTimeout(resolve, 5000);
}); });
this.process = null; this.process = null;
} this.connection = null;
private sendRequest(
method: string,
params: Record<string, unknown>,
timeoutMs = 30_000,
): Promise<JsonRpcResponse> {
const id = this.nextId++;
const message: JsonRpcRequest = { jsonrpc: "2.0", id, method, params };
return new Promise<JsonRpcResponse>((resolve, reject) => {
const timer = setTimeout(() => {
this.pending.delete(id);
reject(new Error(`Timeout waiting for response to ${method} (id=${id})`));
}, timeoutMs);
this.pending.set(id, {
resolve: (value) => {
clearTimeout(timer);
resolve(value);
},
reject: (err) => {
clearTimeout(timer);
reject(err);
},
});
this.writeLine(JSON.stringify(message));
});
}
private sendNotification(method: string, params?: Record<string, unknown>): void {
const message: JsonRpcNotification = { jsonrpc: "2.0", method };
if (params !== undefined) {
message.params = params;
}
this.writeLine(JSON.stringify(message));
}
private writeLine(line: string): void {
if (this.process?.stdin === null || this.process?.stdin === undefined) {
throw new Error("Cannot write: hermes acp process stdin not available");
}
this.process.stdin.write(`${line}\n`);
}
private handleLine(line: string): void {
if (line === "") {
return;
}
let parsed: unknown;
try {
parsed = JSON.parse(line);
} catch {
return;
}
const msg = parsed as Record<string, unknown>;
if ("id" in msg && msg.id !== undefined && msg.id !== null) {
const response = msg as unknown as JsonRpcResponse;
const handler = this.pending.get(response.id);
if (handler !== undefined) {
this.pending.delete(response.id);
handler.resolve(response);
}
return;
}
if (msg.method === "session/update" && isSessionUpdateParams(msg.params)) {
const updateType = msg.params.update.sessionUpdate;
if (updateType === "agent_message_chunk") {
const chunk = msg.params.update.content?.text;
if (typeof chunk === "string") {
this.messageChunks.push(chunk);
}
}
}
}
private rejectAll(err: Error): void {
for (const handler of this.pending.values()) {
handler.reject(err);
}
this.pending.clear();
} }
} }