diff --git a/package.json b/package.json index dd52be1..508ef28 100644 --- a/package.json +++ b/package.json @@ -15,10 +15,12 @@ "release": "bun run build && bun test && node scripts/publish-all.mjs" }, "devDependencies": { + "@agentclientprotocol/sdk": "^0.22.1", "@biomejs/biome": "^2.4.14", "@changesets/cli": "^2.31.0", "@types/node": "^25.7.0", "@types/xxhashjs": "^0.2.4", + "@uncaged/workflow-agent-hermes": "workspace:*", "bun-types": "^1.3.13" } } diff --git a/packages/workflow-agent-hermes/__tests__/acp-client.test.ts b/packages/workflow-agent-hermes/__tests__/acp-client.test.ts new file mode 100644 index 0000000..54db937 --- /dev/null +++ b/packages/workflow-agent-hermes/__tests__/acp-client.test.ts @@ -0,0 +1,77 @@ +import { afterEach, beforeEach, describe, expect, it } from "bun:test"; + +import { HermesAcpClient } from "../src/acp-client.js"; + +const UUID_RE = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i; + +describe("HermesAcpClient", () => { + let client: HermesAcpClient; + + beforeEach(() => { + client = new HermesAcpClient(); + }); + + afterEach(async () => { + await client.close(); + }); + + it( + "connect() returns a UUID sessionId", + async () => { + const sessionId = await client.connect(process.cwd()); + expect(typeof sessionId).toBe("string"); + expect(sessionId).toMatch(UUID_RE); + }, + { timeout: 2 * 60 * 1000 }, + ); + + it( + "prompt() returns a non-empty text response", + async () => { + await client.connect(process.cwd()); + const result = await client.prompt("Reply with exactly the word: PONG"); + expect(typeof result.text).toBe("string"); + expect(result.text.length).toBeGreaterThan(0); + expect(typeof result.sessionId).toBe("string"); + expect(result.sessionId).toMatch(UUID_RE); + }, + { timeout: 2 * 60 * 1000 }, + ); + + it( + "prompt() can be called twice on the same session (resume)", + async () => { + await client.connect(process.cwd()); + + const first = await client.prompt("Say the word ALPHA and nothing else."); + expect(first.text.length).toBeGreaterThan(0); + + const second = await client.prompt("Now say the word BETA and nothing else."); + expect(second.text.length).toBeGreaterThan(0); + + expect(first.sessionId).toBe(second.sessionId); + }, + { timeout: 2 * 60 * 1000 }, + ); + + it( + "prompt() collects structured messages including tool calls", + async () => { + await client.connect(process.cwd()); + const result = await client.prompt("Run this command: echo TOOL_DETAIL_TEST"); + expect(result.messages.length).toBeGreaterThan(0); + // Should have at least one tool message (the echo command) + const toolMessages = result.messages.filter((m) => m.role === "tool"); + expect(toolMessages.length).toBeGreaterThan(0); + // Tool message should contain the output + const toolContent = toolMessages[0]?.content ?? ""; + expect(toolContent).toContain("TOOL_DETAIL_TEST"); + // Should have assistant messages with tool_calls + const assistantWithTools = result.messages.filter( + (m) => m.role === "assistant" && m.tool_calls !== null, + ); + expect(assistantWithTools.length).toBeGreaterThan(0); + }, + { timeout: 2 * 60 * 1000 }, + ); +}); diff --git a/packages/workflow-agent-hermes/package.json b/packages/workflow-agent-hermes/package.json index 62f3bd6..c98d03e 100644 --- a/packages/workflow-agent-hermes/package.json +++ b/packages/workflow-agent-hermes/package.json @@ -21,6 +21,7 @@ "test": "bun test" }, "dependencies": { + "@agentclientprotocol/sdk": "^0.22.1", "@uncaged/json-cas": "^0.4.0", "@uncaged/workflow-agent-kit": "workspace:^" }, diff --git a/packages/workflow-agent-hermes/src/acp-client.ts b/packages/workflow-agent-hermes/src/acp-client.ts new file mode 100644 index 0000000..74d55f1 --- /dev/null +++ b/packages/workflow-agent-hermes/src/acp-client.ts @@ -0,0 +1,233 @@ +import type { ChildProcess } from "node:child_process"; +import { spawn } from "node:child_process"; +import { Readable, Writable } from "node:stream"; +import type { + Client, + RequestPermissionRequest, + RequestPermissionResponse, + SessionNotification, +} from "@agentclientprotocol/sdk"; +import { ClientSideConnection, ndJsonStream, PROTOCOL_VERSION } from "@agentclientprotocol/sdk"; + +import type { HermesSessionMessage } from "./types.js"; + +const HERMES_COMMAND = "hermes"; + +/** Tracks in-flight tool calls so we can build complete messages when they finish. */ +type PendingToolCall = { + name: string; + args: string; +}; + +/** + * Collects ACP session/update events into a list of {@link HermesSessionMessage} + * that mirrors what Hermes writes to its session JSONL files. + */ +class UwfAcpClient implements Client { + private messageChunks: string[] = []; + private reasoningChunks: string[] = []; + private pendingTools = new Map(); + messages: HermesSessionMessage[] = []; + + resetPerPrompt(): void { + this.messageChunks = []; + this.reasoningChunks = []; + } + + async sessionUpdate(params: SessionNotification): Promise { + const { update } = params; + switch (update.sessionUpdate) { + case "agent_message_chunk": + if (update.content.type === "text") { + this.messageChunks.push(update.content.text); + } + break; + + case "agent_thought_chunk": + if (update.content.type === "text") { + this.reasoningChunks.push(update.content.text); + } + break; + + case "tool_call": { + // Agent is invoking a tool — record the call. + const title = update.title ?? ""; + const rawInput = + update.rawInput !== undefined && update.rawInput !== null + ? JSON.stringify(update.rawInput) + : ""; + this.pendingTools.set(update.toolCallId, { name: title, args: rawInput }); + + // Flush accumulated assistant text + reasoning as an assistant message + // (the agent "spoke" before calling the tool). + this.flushAssistantMessage(); + break; + } + + case "tool_call_update": { + if (update.status === "completed" || update.status === "failed") { + const pending = this.pendingTools.get(update.toolCallId); + const toolName = pending?.name ?? update.toolCallId; + const rawOutput = + update.rawOutput !== undefined && update.rawOutput !== null + ? typeof update.rawOutput === "string" + ? update.rawOutput + : JSON.stringify(update.rawOutput) + : ""; + this.messages.push({ + role: "assistant", + content: null, + reasoning: null, + tool_calls: [{ function: { name: toolName, arguments: pending?.args ?? "" } }], + }); + this.messages.push({ + role: "tool", + content: rawOutput, + reasoning: null, + tool_calls: null, + }); + this.pendingTools.delete(update.toolCallId); + } + break; + } + + default: + break; + } + } + + /** Flush any accumulated text/reasoning into an assistant message. */ + flushAssistantMessage(): void { + const text = this.messageChunks.join(""); + const reasoning = this.reasoningChunks.join(""); + if (text !== "" || reasoning !== "") { + this.messages.push({ + role: "assistant", + content: text || null, + reasoning: reasoning || null, + tool_calls: null, + }); + } + this.messageChunks = []; + this.reasoningChunks = []; + } + + async requestPermission(params: RequestPermissionRequest): Promise { + const firstOption = params.options[0]; + return { + outcome: { + outcome: "selected", + optionId: firstOption?.optionId ?? "", + }, + }; + } +} + +export type AcpPromptResult = { + text: string; + sessionId: string; + messages: HermesSessionMessage[]; +}; + +export class HermesAcpClient { + private process: ChildProcess | null = null; + private connection: ClientSideConnection | null = null; + private sessionId: string | null = null; + private stderrBuffer = ""; + private client = new UwfAcpClient(); + + /** Spawn hermes acp, initialize, create session */ + async connect(cwd: string): Promise { + const child = spawn(HERMES_COMMAND, ["acp"], { + env: process.env, + shell: false, + stdio: ["pipe", "pipe", "pipe"], + }); + + this.process = child; + + child.stderr?.on("data", (chunk: Buffer) => { + this.stderrBuffer += chunk.toString(); + }); + + if (child.stdin === null || child.stdout === null) { + throw new Error("hermes acp process stdio is not available"); + } + + const input = Writable.toWeb(child.stdin); + const output = Readable.toWeb(child.stdout); + const stream = ndJsonStream(input, output); + + const clientRef = this.client; + const connection = new ClientSideConnection((_agent) => clientRef, stream); + this.connection = connection; + + connection.signal.addEventListener("abort", () => { + if (this.sessionId !== null) { + const detail = this.stderrBuffer.trim() !== "" ? ` stderr=${this.stderrBuffer.trim()}` : ""; + throw new Error(`hermes acp connection closed unexpectedly${detail}`); + } + }); + + await connection.initialize({ + protocolVersion: PROTOCOL_VERSION, + clientCapabilities: {}, + }); + + const sessionResult = await connection.newSession({ cwd, mcpServers: [] }); + const { sessionId } = sessionResult; + + this.sessionId = sessionId; + return sessionId; + } + + /** Send prompt and collect full response text + structured messages. */ + async prompt(text: string): Promise { + if (this.connection === null || this.sessionId === null) { + throw new Error("Not connected — call connect() first"); + } + + this.client.resetPerPrompt(); + + await this.connection.prompt({ + sessionId: this.sessionId, + prompt: [{ type: "text", text }], + }); + + // Flush any trailing assistant text that wasn't followed by a tool call. + this.client.flushAssistantMessage(); + + // Extract the final assistant text from collected messages. + const messages = this.client.messages; + let finalText = ""; + for (let i = messages.length - 1; i >= 0; i--) { + const msg = messages[i]; + if (msg !== undefined && msg.role === "assistant" && msg.content !== null && msg.content.trim() !== "") { + finalText = msg.content; + break; + } + } + + return { + text: finalText, + sessionId: this.sessionId, + messages, + }; + } + + /** Close the connection */ + async close(): Promise { + if (this.process === null) { + return; + } + this.sessionId = null; + this.process.stdin?.end(); + const proc = this.process; + await new Promise((resolve) => { + proc.on("close", () => resolve()); + setTimeout(resolve, 5000); + }); + this.process = null; + this.connection = null; + } +} diff --git a/packages/workflow-agent-hermes/src/hermes.ts b/packages/workflow-agent-hermes/src/hermes.ts index 5264934..6d3066f 100644 --- a/packages/workflow-agent-hermes/src/hermes.ts +++ b/packages/workflow-agent-hermes/src/hermes.ts @@ -1,4 +1,3 @@ -import { spawn } from "node:child_process"; import type { Store } from "@uncaged/json-cas"; import { @@ -8,14 +7,8 @@ import { createAgent, } from "@uncaged/workflow-agent-kit"; -import { - loadHermesSession, - parseSessionIdFromStdout, - storeHermesSessionDetail, -} from "./session-detail.js"; - -const HERMES_COMMAND = "hermes"; -const HERMES_MAX_TURNS = 90; +import { HermesAcpClient } from "./acp-client.js"; +import { storeHermesSessionDetail } from "./session-detail.js"; function buildHistorySummary(steps: AgentContext["steps"]): string { if (steps.length === 0) { @@ -52,110 +45,43 @@ export function buildHermesPrompt(ctx: AgentContext): string { return parts.join("\n"); } -function spawnHermes(args: string[]): Promise<{ stdout: string; stderr: string }> { - return new Promise((resolve, reject) => { - const child = spawn(HERMES_COMMAND, args, { - env: process.env, - shell: false, - stdio: ["ignore", "pipe", "pipe"], - }); - - let stdout = ""; - let stderr = ""; - child.stdout?.on("data", (chunk: Buffer) => { - stdout += chunk.toString(); - }); - child.stderr?.on("data", (chunk: Buffer) => { - stderr += chunk.toString(); - }); - - child.on("error", (cause) => { - const message = cause instanceof Error ? cause.message : String(cause); - reject(new Error(`hermes spawn failed: ${message}`)); - }); - - child.on("close", (code) => { - if (code === 0) { - resolve({ stdout, stderr }); - return; - } - const detail = stderr.trim() !== "" ? ` stderr=${stderr.trim()}` : ""; - reject(new Error(`hermes exited with code ${code ?? "null"}${detail}`)); - }); - }); -} - -function spawnHermesChat(prompt: string): Promise<{ stdout: string; stderr: string }> { - return spawnHermes([ - "chat", - "-q", - prompt, - "--yolo", - "--max-turns", - String(HERMES_MAX_TURNS), - "--quiet", - ]); -} - -function spawnHermesResume( - sessionId: string, - message: string, -): Promise<{ stdout: string; stderr: string }> { - return spawnHermes([ - "chat", - "--resume", - sessionId, - "-q", - message, - "--yolo", - "--max-turns", - String(HERMES_MAX_TURNS), - "--quiet", - ]); -} - -function parseSessionId(stdout: string, stderr: string): string { - const sessionId = parseSessionIdFromStdout(stderr) ?? parseSessionIdFromStdout(stdout); - if (sessionId === null) { - throw new Error( - "Failed to parse session_id from hermes output.\n" + - `stderr (first 200 chars): ${stderr.slice(0, 200)}\n` + - `stdout (first 200 chars): ${stdout.slice(0, 200)}`, - ); - } - return sessionId; -} - -async function buildResultFromSession(sessionId: string, store: Store): Promise { - const session = await loadHermesSession(sessionId); - if (session === null) { - throw new Error(`Failed to load hermes session file for session_id: ${sessionId}`); - } - const { detailHash, output } = await storeHermesSessionDetail(store, session); - return { output, detailHash, sessionId }; -} - -async function runHermes(ctx: AgentContext): Promise { - const fullPrompt = buildHermesPrompt(ctx); - const { stdout, stderr } = await spawnHermesChat(fullPrompt); - const sessionId = parseSessionId(stdout, stderr); - return buildResultFromSession(sessionId, ctx.store); -} - -async function continueHermes( - sessionId: string, - message: string, - store: Store, -): Promise { - const { stdout, stderr } = await spawnHermesResume(sessionId, message); - // Resume may return a new session_id - const newSessionId = parseSessionIdFromStdout(stderr) ?? parseSessionIdFromStdout(stdout); - const resolvedId = newSessionId ?? sessionId; - return buildResultFromSession(resolvedId, store); -} - -/** Agent CLI factory: parses argv, runs Hermes, extracts output, writes StepNode. */ +/** + * Agent CLI factory: parses argv, runs Hermes, extracts output, writes StepNode. + * + * A single ACP client is shared across run() and continue() calls so that + * frontmatter retry loops keep the same Hermes session context. The client + * is closed once the agent process exits (via process.on("exit")). + */ export function createHermesAgent(): () => Promise { + const client = new HermesAcpClient(); + + // Ensure cleanup regardless of how the process exits. + process.on("exit", () => { + void client.close(); + }); + + async function runHermes(ctx: AgentContext): Promise { + const fullPrompt = buildHermesPrompt(ctx); + await client.connect(process.cwd()); + const { text, sessionId, messages } = await client.prompt(fullPrompt); + const session = { session_id: sessionId, model: "", session_start: new Date().toISOString(), messages }; + const { detailHash } = await storeHermesSessionDetail(ctx.store, session); + return { output: text, detailHash, sessionId }; + } + + async function continueHermes( + _sessionId: string, + message: string, + store: Store, + ): Promise { + // Client is already connected from runHermes — same ACP session, + // so the agent sees the full conversation history (crucial for retries). + const { text, sessionId, messages } = await client.prompt(message); + const session = { session_id: sessionId, model: "", session_start: new Date().toISOString(), messages }; + const { detailHash } = await storeHermesSessionDetail(store, session); + return { output: text, detailHash, sessionId }; + } + return createAgent({ name: "hermes", run: runHermes, diff --git a/packages/workflow-agent-hermes/src/index.ts b/packages/workflow-agent-hermes/src/index.ts index b27f12a..cf147d7 100644 --- a/packages/workflow-agent-hermes/src/index.ts +++ b/packages/workflow-agent-hermes/src/index.ts @@ -1 +1,2 @@ +export { HermesAcpClient } from "./acp-client.js"; export { buildHermesPrompt, createHermesAgent } from "./hermes.js";