diff --git a/packages/workflow-agent-hermes/package.json b/packages/workflow-agent-hermes/package.json index c98d03e..62f3bd6 100644 --- a/packages/workflow-agent-hermes/package.json +++ b/packages/workflow-agent-hermes/package.json @@ -21,7 +21,6 @@ "test": "bun test" }, "dependencies": { - "@agentclientprotocol/sdk": "^0.22.1", "@uncaged/json-cas": "^0.4.0", "@uncaged/workflow-agent-kit": "workspace:^" }, diff --git a/packages/workflow-agent-hermes/src/acp-client.ts b/packages/workflow-agent-hermes/src/acp-client.ts index 74d55f1..e7a9279 100644 --- a/packages/workflow-agent-hermes/src/acp-client.ts +++ b/packages/workflow-agent-hermes/src/acp-client.ts @@ -1,17 +1,23 @@ import type { ChildProcess } from "node:child_process"; import { spawn } from "node:child_process"; -import { Readable, Writable } from "node:stream"; -import type { - Client, - RequestPermissionRequest, - RequestPermissionResponse, - SessionNotification, -} from "@agentclientprotocol/sdk"; -import { ClientSideConnection, ndJsonStream, PROTOCOL_VERSION } from "@agentclientprotocol/sdk"; +import { createInterface } from "node:readline"; import type { HermesSessionMessage } from "./types.js"; const HERMES_COMMAND = "hermes"; +const PROTOCOL_VERSION = 1; + +type JsonRpcResponse = { + jsonrpc: "2.0"; + id: number; + result?: unknown; + error?: { code: number; message: string }; +}; + +type PendingRequest = { + resolve: (value: JsonRpcResponse) => void; + reject: (reason: Error) => void; +}; /** Tracks in-flight tool calls so we can build complete messages when they finish. */ type PendingToolCall = { @@ -19,110 +25,6 @@ type PendingToolCall = { args: string; }; -/** - * Collects ACP session/update events into a list of {@link HermesSessionMessage} - * that mirrors what Hermes writes to its session JSONL files. - */ -class UwfAcpClient implements Client { - private messageChunks: string[] = []; - private reasoningChunks: string[] = []; - private pendingTools = new Map(); - messages: HermesSessionMessage[] = []; - - resetPerPrompt(): void { - this.messageChunks = []; - this.reasoningChunks = []; - } - - async sessionUpdate(params: SessionNotification): Promise { - const { update } = params; - switch (update.sessionUpdate) { - case "agent_message_chunk": - if (update.content.type === "text") { - this.messageChunks.push(update.content.text); - } - break; - - case "agent_thought_chunk": - if (update.content.type === "text") { - this.reasoningChunks.push(update.content.text); - } - break; - - case "tool_call": { - // Agent is invoking a tool — record the call. - const title = update.title ?? ""; - const rawInput = - update.rawInput !== undefined && update.rawInput !== null - ? JSON.stringify(update.rawInput) - : ""; - this.pendingTools.set(update.toolCallId, { name: title, args: rawInput }); - - // Flush accumulated assistant text + reasoning as an assistant message - // (the agent "spoke" before calling the tool). - this.flushAssistantMessage(); - break; - } - - case "tool_call_update": { - if (update.status === "completed" || update.status === "failed") { - const pending = this.pendingTools.get(update.toolCallId); - const toolName = pending?.name ?? update.toolCallId; - const rawOutput = - update.rawOutput !== undefined && update.rawOutput !== null - ? typeof update.rawOutput === "string" - ? update.rawOutput - : JSON.stringify(update.rawOutput) - : ""; - this.messages.push({ - role: "assistant", - content: null, - reasoning: null, - tool_calls: [{ function: { name: toolName, arguments: pending?.args ?? "" } }], - }); - this.messages.push({ - role: "tool", - content: rawOutput, - reasoning: null, - tool_calls: null, - }); - this.pendingTools.delete(update.toolCallId); - } - break; - } - - default: - break; - } - } - - /** Flush any accumulated text/reasoning into an assistant message. */ - flushAssistantMessage(): void { - const text = this.messageChunks.join(""); - const reasoning = this.reasoningChunks.join(""); - if (text !== "" || reasoning !== "") { - this.messages.push({ - role: "assistant", - content: text || null, - reasoning: reasoning || null, - tool_calls: null, - }); - } - this.messageChunks = []; - this.reasoningChunks = []; - } - - async requestPermission(params: RequestPermissionRequest): Promise { - const firstOption = params.options[0]; - return { - outcome: { - outcome: "selected", - optionId: firstOption?.optionId ?? "", - }, - }; - } -} - export type AcpPromptResult = { text: string; sessionId: string; @@ -131,10 +33,16 @@ export type AcpPromptResult = { export class HermesAcpClient { private process: ChildProcess | null = null; - private connection: ClientSideConnection | null = null; + private nextId = 1; private sessionId: string | null = null; private stderrBuffer = ""; - private client = new UwfAcpClient(); + private pending = new Map(); + + // Message collection state + private messageChunks: string[] = []; + private reasoningChunks: string[] = []; + private pendingTools = new Map(); + messages: HermesSessionMessage[] = []; /** Spawn hermes acp, initialize, create session */ async connect(cwd: string): Promise { @@ -150,32 +58,51 @@ export class HermesAcpClient { this.stderrBuffer += chunk.toString(); }); - if (child.stdin === null || child.stdout === null) { - throw new Error("hermes acp process stdio is not available"); - } + child.on("error", (cause) => { + const message = cause instanceof Error ? cause.message : String(cause); + this.rejectAll(new Error(`hermes acp spawn failed: ${message}`)); + }); - const input = Writable.toWeb(child.stdin); - const output = Readable.toWeb(child.stdout); - const stream = ndJsonStream(input, output); - - const clientRef = this.client; - const connection = new ClientSideConnection((_agent) => clientRef, stream); - this.connection = connection; - - connection.signal.addEventListener("abort", () => { - if (this.sessionId !== null) { + child.on("close", (code) => { + if (code !== 0 && this.pending.size > 0) { const detail = this.stderrBuffer.trim() !== "" ? ` stderr=${this.stderrBuffer.trim()}` : ""; - throw new Error(`hermes acp connection closed unexpectedly${detail}`); + this.rejectAll( + new Error(`hermes acp exited unexpectedly with code ${code ?? "null"}${detail}`), + ); } }); - await connection.initialize({ - protocolVersion: PROTOCOL_VERSION, - clientCapabilities: {}, + if (child.stdout === null) { + throw new Error("hermes acp process stdout is not available"); + } + const rl = createInterface({ input: child.stdout }); + rl.on("line", (line) => { + this.handleLine(line.trim()); }); - const sessionResult = await connection.newSession({ cwd, mcpServers: [] }); - const { sessionId } = sessionResult; + const initResponse = await this.sendRequest("initialize", { + protocolVersion: PROTOCOL_VERSION, + clientInfo: { name: "uwf", version: "0.1.0" }, + capabilities: {}, + }); + + if ((initResponse as { error?: unknown }).error !== undefined) { + throw new Error( + `initialize failed: ${JSON.stringify((initResponse as { error: unknown }).error)}`, + ); + } + + this.sendNotification("initialized"); + + const sessionResponse = (await this.sendRequest("session/new", { + cwd, + mcpServers: [], + })) as { result: { sessionId: string } }; + + const sessionId = sessionResponse.result?.sessionId; + if (typeof sessionId !== "string" || sessionId === "") { + throw new Error(`session/new did not return a sessionId: ${JSON.stringify(sessionResponse)}`); + } this.sessionId = sessionId; return sessionId; @@ -183,26 +110,37 @@ export class HermesAcpClient { /** Send prompt and collect full response text + structured messages. */ async prompt(text: string): Promise { - if (this.connection === null || this.sessionId === null) { + if (this.sessionId === null) { throw new Error("Not connected — call connect() first"); } - this.client.resetPerPrompt(); + this.messageChunks = []; + this.reasoningChunks = []; - await this.connection.prompt({ + const response = await this.sendRequest("session/prompt", { sessionId: this.sessionId, prompt: [{ type: "text", text }], }); + if ((response as { error?: unknown }).error !== undefined) { + throw new Error( + `session/prompt failed: ${JSON.stringify((response as { error: unknown }).error)}`, + ); + } + // Flush any trailing assistant text that wasn't followed by a tool call. - this.client.flushAssistantMessage(); + this.flushAssistantMessage(); // Extract the final assistant text from collected messages. - const messages = this.client.messages; let finalText = ""; - for (let i = messages.length - 1; i >= 0; i--) { - const msg = messages[i]; - if (msg !== undefined && msg.role === "assistant" && msg.content !== null && msg.content.trim() !== "") { + for (let i = this.messages.length - 1; i >= 0; i--) { + const msg = this.messages[i]; + if ( + msg !== undefined && + msg.role === "assistant" && + msg.content !== null && + msg.content.trim() !== "" + ) { finalText = msg.content; break; } @@ -211,7 +149,7 @@ export class HermesAcpClient { return { text: finalText, sessionId: this.sessionId, - messages, + messages: this.messages, }; } @@ -228,6 +166,192 @@ export class HermesAcpClient { setTimeout(resolve, 5000); }); this.process = null; - this.connection = null; + } + + // ---- JSON-RPC transport ---- + + private sendRequest( + method: string, + params: Record, + timeoutMs = 10 * 60 * 1000, + ): Promise { + const id = this.nextId++; + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + this.pending.delete(id); + reject(new Error(`Timeout waiting for response to ${method} (id=${id})`)); + }, timeoutMs); + + this.pending.set(id, { + resolve: (value) => { + clearTimeout(timer); + resolve(value); + }, + reject: (err) => { + clearTimeout(timer); + reject(err); + }, + }); + + this.writeLine(JSON.stringify({ jsonrpc: "2.0", id, method, params })); + }); + } + + private sendNotification(method: string, params?: Record): void { + const message: Record = { jsonrpc: "2.0", method }; + if (params !== undefined) { + message.params = params; + } + this.writeLine(JSON.stringify(message)); + } + + private writeLine(line: string): void { + if (this.process?.stdin === null || this.process?.stdin === undefined) { + throw new Error("Cannot write: hermes acp process stdin not available"); + } + this.process.stdin.write(`${line}\n`); + } + + private handleLine(line: string): void { + if (line === "") { + return; + } + + let parsed: unknown; + try { + parsed = JSON.parse(line); + } catch { + return; + } + + const msg = parsed as Record; + + // JSON-RPC response (has "id") + if ("id" in msg && msg.id !== undefined && msg.id !== null) { + const response = msg as unknown as JsonRpcResponse; + const handler = this.pending.get(response.id); + if (handler !== undefined) { + this.pending.delete(response.id); + handler.resolve(response); + } + return; + } + + // JSON-RPC notification — session/update + if (msg.method === "session/update") { + const params = msg.params as Record | undefined; + const update = params?.update as Record | undefined; + if (update !== undefined) { + this.handleSessionUpdate(update); + } + return; + } + + // session/request_permission — auto-approve (yolo mode) + if (msg.method === "session/request_permission") { + const params = msg.params as Record | undefined; + const options = (params?.options ?? []) as Array<{ optionId?: string }>; + const firstOptionId = options[0]?.optionId ?? ""; + // Respond with the selected option + const responseMsg = { + jsonrpc: "2.0", + id: (msg as Record).id, + result: { outcome: { outcome: "selected", optionId: firstOptionId } }, + }; + this.writeLine(JSON.stringify(responseMsg)); + } + } + + // ---- Session update → structured messages ---- + + private handleSessionUpdate(update: Record): void { + const updateType = update.sessionUpdate as string; + + switch (updateType) { + case "agent_message_chunk": { + const content = update.content as { type?: string; text?: string } | undefined; + if (content?.type === "text" && typeof content.text === "string") { + this.messageChunks.push(content.text); + } + break; + } + + case "agent_thought_chunk": { + const content = update.content as { type?: string; text?: string } | undefined; + if (content?.type === "text" && typeof content.text === "string") { + this.reasoningChunks.push(content.text); + } + break; + } + + case "tool_call": { + const title = (update.title as string) ?? ""; + const rawInput = update.rawInput; + const args = + rawInput !== undefined && rawInput !== null ? JSON.stringify(rawInput) : ""; + const toolCallId = update.toolCallId as string; + this.pendingTools.set(toolCallId, { name: title, args }); + + // Flush accumulated assistant text before tool call + this.flushAssistantMessage(); + break; + } + + case "tool_call_update": { + const status = update.status as string | undefined; + if (status === "completed" || status === "failed") { + const toolCallId = update.toolCallId as string; + const pending = this.pendingTools.get(toolCallId); + const toolName = pending?.name ?? toolCallId; + const rawOutput = update.rawOutput; + const outputStr = + rawOutput !== undefined && rawOutput !== null + ? typeof rawOutput === "string" + ? rawOutput + : JSON.stringify(rawOutput) + : ""; + this.messages.push({ + role: "assistant", + content: null, + reasoning: null, + tool_calls: [{ function: { name: toolName, arguments: pending?.args ?? "" } }], + }); + this.messages.push({ + role: "tool", + content: outputStr, + reasoning: null, + tool_calls: null, + }); + this.pendingTools.delete(toolCallId); + } + break; + } + + default: + break; + } + } + + /** Flush any accumulated text/reasoning into an assistant message. */ + private flushAssistantMessage(): void { + const text = this.messageChunks.join(""); + const reasoning = this.reasoningChunks.join(""); + if (text !== "" || reasoning !== "") { + this.messages.push({ + role: "assistant", + content: text || null, + reasoning: reasoning || null, + tool_calls: null, + }); + } + this.messageChunks = []; + this.reasoningChunks = []; + } + + private rejectAll(err: Error): void { + for (const handler of this.pending.values()) { + handler.reject(err); + } + this.pending.clear(); } }