Merge pull request 'refactor: migrate hermes agent from stdout parsing to ACP protocol' (#401) from feat/398-hermes-acp-client into main
This commit is contained in:
@@ -15,10 +15,12 @@
|
||||
"release": "bun run build && bun test && node scripts/publish-all.mjs"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@agentclientprotocol/sdk": "^0.22.1",
|
||||
"@biomejs/biome": "^2.4.14",
|
||||
"@changesets/cli": "^2.31.0",
|
||||
"@types/node": "^25.7.0",
|
||||
"@types/xxhashjs": "^0.2.4",
|
||||
"@uncaged/workflow-agent-hermes": "workspace:*",
|
||||
"bun-types": "^1.3.13"
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,77 @@
|
||||
import { afterEach, beforeEach, describe, expect, it } from "bun:test";
|
||||
|
||||
import { HermesAcpClient } from "../src/acp-client.js";
|
||||
|
||||
const UUID_RE = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i;
|
||||
|
||||
describe("HermesAcpClient", () => {
|
||||
let client: HermesAcpClient;
|
||||
|
||||
beforeEach(() => {
|
||||
client = new HermesAcpClient();
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await client.close();
|
||||
});
|
||||
|
||||
it(
|
||||
"connect() returns a UUID sessionId",
|
||||
async () => {
|
||||
const sessionId = await client.connect(process.cwd());
|
||||
expect(typeof sessionId).toBe("string");
|
||||
expect(sessionId).toMatch(UUID_RE);
|
||||
},
|
||||
{ timeout: 2 * 60 * 1000 },
|
||||
);
|
||||
|
||||
it(
|
||||
"prompt() returns a non-empty text response",
|
||||
async () => {
|
||||
await client.connect(process.cwd());
|
||||
const result = await client.prompt("Reply with exactly the word: PONG");
|
||||
expect(typeof result.text).toBe("string");
|
||||
expect(result.text.length).toBeGreaterThan(0);
|
||||
expect(typeof result.sessionId).toBe("string");
|
||||
expect(result.sessionId).toMatch(UUID_RE);
|
||||
},
|
||||
{ timeout: 2 * 60 * 1000 },
|
||||
);
|
||||
|
||||
it(
|
||||
"prompt() can be called twice on the same session (resume)",
|
||||
async () => {
|
||||
await client.connect(process.cwd());
|
||||
|
||||
const first = await client.prompt("Say the word ALPHA and nothing else.");
|
||||
expect(first.text.length).toBeGreaterThan(0);
|
||||
|
||||
const second = await client.prompt("Now say the word BETA and nothing else.");
|
||||
expect(second.text.length).toBeGreaterThan(0);
|
||||
|
||||
expect(first.sessionId).toBe(second.sessionId);
|
||||
},
|
||||
{ timeout: 2 * 60 * 1000 },
|
||||
);
|
||||
|
||||
it(
|
||||
"prompt() collects structured messages including tool calls",
|
||||
async () => {
|
||||
await client.connect(process.cwd());
|
||||
const result = await client.prompt("Run this command: echo TOOL_DETAIL_TEST");
|
||||
expect(result.messages.length).toBeGreaterThan(0);
|
||||
// Should have at least one tool message (the echo command)
|
||||
const toolMessages = result.messages.filter((m) => m.role === "tool");
|
||||
expect(toolMessages.length).toBeGreaterThan(0);
|
||||
// Tool message should contain the output
|
||||
const toolContent = toolMessages[0]?.content ?? "";
|
||||
expect(toolContent).toContain("TOOL_DETAIL_TEST");
|
||||
// Should have assistant messages with tool_calls
|
||||
const assistantWithTools = result.messages.filter(
|
||||
(m) => m.role === "assistant" && m.tool_calls !== null,
|
||||
);
|
||||
expect(assistantWithTools.length).toBeGreaterThan(0);
|
||||
},
|
||||
{ timeout: 2 * 60 * 1000 },
|
||||
);
|
||||
});
|
||||
@@ -21,6 +21,7 @@
|
||||
"test": "bun test"
|
||||
},
|
||||
"dependencies": {
|
||||
"@agentclientprotocol/sdk": "^0.22.1",
|
||||
"@uncaged/json-cas": "^0.4.0",
|
||||
"@uncaged/workflow-agent-kit": "workspace:^"
|
||||
},
|
||||
|
||||
@@ -0,0 +1,233 @@
|
||||
import type { ChildProcess } from "node:child_process";
|
||||
import { spawn } from "node:child_process";
|
||||
import { Readable, Writable } from "node:stream";
|
||||
import type {
|
||||
Client,
|
||||
RequestPermissionRequest,
|
||||
RequestPermissionResponse,
|
||||
SessionNotification,
|
||||
} from "@agentclientprotocol/sdk";
|
||||
import { ClientSideConnection, ndJsonStream, PROTOCOL_VERSION } from "@agentclientprotocol/sdk";
|
||||
|
||||
import type { HermesSessionMessage } from "./types.js";
|
||||
|
||||
const HERMES_COMMAND = "hermes";
|
||||
|
||||
/** Tracks in-flight tool calls so we can build complete messages when they finish. */
|
||||
type PendingToolCall = {
|
||||
name: string;
|
||||
args: string;
|
||||
};
|
||||
|
||||
/**
|
||||
* Collects ACP session/update events into a list of {@link HermesSessionMessage}
|
||||
* that mirrors what Hermes writes to its session JSONL files.
|
||||
*/
|
||||
class UwfAcpClient implements Client {
|
||||
private messageChunks: string[] = [];
|
||||
private reasoningChunks: string[] = [];
|
||||
private pendingTools = new Map<string, PendingToolCall>();
|
||||
messages: HermesSessionMessage[] = [];
|
||||
|
||||
resetPerPrompt(): void {
|
||||
this.messageChunks = [];
|
||||
this.reasoningChunks = [];
|
||||
}
|
||||
|
||||
async sessionUpdate(params: SessionNotification): Promise<void> {
|
||||
const { update } = params;
|
||||
switch (update.sessionUpdate) {
|
||||
case "agent_message_chunk":
|
||||
if (update.content.type === "text") {
|
||||
this.messageChunks.push(update.content.text);
|
||||
}
|
||||
break;
|
||||
|
||||
case "agent_thought_chunk":
|
||||
if (update.content.type === "text") {
|
||||
this.reasoningChunks.push(update.content.text);
|
||||
}
|
||||
break;
|
||||
|
||||
case "tool_call": {
|
||||
// Agent is invoking a tool — record the call.
|
||||
const title = update.title ?? "";
|
||||
const rawInput =
|
||||
update.rawInput !== undefined && update.rawInput !== null
|
||||
? JSON.stringify(update.rawInput)
|
||||
: "";
|
||||
this.pendingTools.set(update.toolCallId, { name: title, args: rawInput });
|
||||
|
||||
// Flush accumulated assistant text + reasoning as an assistant message
|
||||
// (the agent "spoke" before calling the tool).
|
||||
this.flushAssistantMessage();
|
||||
break;
|
||||
}
|
||||
|
||||
case "tool_call_update": {
|
||||
if (update.status === "completed" || update.status === "failed") {
|
||||
const pending = this.pendingTools.get(update.toolCallId);
|
||||
const toolName = pending?.name ?? update.toolCallId;
|
||||
const rawOutput =
|
||||
update.rawOutput !== undefined && update.rawOutput !== null
|
||||
? typeof update.rawOutput === "string"
|
||||
? update.rawOutput
|
||||
: JSON.stringify(update.rawOutput)
|
||||
: "";
|
||||
this.messages.push({
|
||||
role: "assistant",
|
||||
content: null,
|
||||
reasoning: null,
|
||||
tool_calls: [{ function: { name: toolName, arguments: pending?.args ?? "" } }],
|
||||
});
|
||||
this.messages.push({
|
||||
role: "tool",
|
||||
content: rawOutput,
|
||||
reasoning: null,
|
||||
tool_calls: null,
|
||||
});
|
||||
this.pendingTools.delete(update.toolCallId);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/** Flush any accumulated text/reasoning into an assistant message. */
|
||||
flushAssistantMessage(): void {
|
||||
const text = this.messageChunks.join("");
|
||||
const reasoning = this.reasoningChunks.join("");
|
||||
if (text !== "" || reasoning !== "") {
|
||||
this.messages.push({
|
||||
role: "assistant",
|
||||
content: text || null,
|
||||
reasoning: reasoning || null,
|
||||
tool_calls: null,
|
||||
});
|
||||
}
|
||||
this.messageChunks = [];
|
||||
this.reasoningChunks = [];
|
||||
}
|
||||
|
||||
async requestPermission(params: RequestPermissionRequest): Promise<RequestPermissionResponse> {
|
||||
const firstOption = params.options[0];
|
||||
return {
|
||||
outcome: {
|
||||
outcome: "selected",
|
||||
optionId: firstOption?.optionId ?? "",
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
export type AcpPromptResult = {
|
||||
text: string;
|
||||
sessionId: string;
|
||||
messages: HermesSessionMessage[];
|
||||
};
|
||||
|
||||
export class HermesAcpClient {
|
||||
private process: ChildProcess | null = null;
|
||||
private connection: ClientSideConnection | null = null;
|
||||
private sessionId: string | null = null;
|
||||
private stderrBuffer = "";
|
||||
private client = new UwfAcpClient();
|
||||
|
||||
/** Spawn hermes acp, initialize, create session */
|
||||
async connect(cwd: string): Promise<string> {
|
||||
const child = spawn(HERMES_COMMAND, ["acp"], {
|
||||
env: process.env,
|
||||
shell: false,
|
||||
stdio: ["pipe", "pipe", "pipe"],
|
||||
});
|
||||
|
||||
this.process = child;
|
||||
|
||||
child.stderr?.on("data", (chunk: Buffer) => {
|
||||
this.stderrBuffer += chunk.toString();
|
||||
});
|
||||
|
||||
if (child.stdin === null || child.stdout === null) {
|
||||
throw new Error("hermes acp process stdio is not available");
|
||||
}
|
||||
|
||||
const input = Writable.toWeb(child.stdin);
|
||||
const output = Readable.toWeb(child.stdout);
|
||||
const stream = ndJsonStream(input, output);
|
||||
|
||||
const clientRef = this.client;
|
||||
const connection = new ClientSideConnection((_agent) => clientRef, stream);
|
||||
this.connection = connection;
|
||||
|
||||
connection.signal.addEventListener("abort", () => {
|
||||
if (this.sessionId !== null) {
|
||||
const detail = this.stderrBuffer.trim() !== "" ? ` stderr=${this.stderrBuffer.trim()}` : "";
|
||||
throw new Error(`hermes acp connection closed unexpectedly${detail}`);
|
||||
}
|
||||
});
|
||||
|
||||
await connection.initialize({
|
||||
protocolVersion: PROTOCOL_VERSION,
|
||||
clientCapabilities: {},
|
||||
});
|
||||
|
||||
const sessionResult = await connection.newSession({ cwd, mcpServers: [] });
|
||||
const { sessionId } = sessionResult;
|
||||
|
||||
this.sessionId = sessionId;
|
||||
return sessionId;
|
||||
}
|
||||
|
||||
/** Send prompt and collect full response text + structured messages. */
|
||||
async prompt(text: string): Promise<AcpPromptResult> {
|
||||
if (this.connection === null || this.sessionId === null) {
|
||||
throw new Error("Not connected — call connect() first");
|
||||
}
|
||||
|
||||
this.client.resetPerPrompt();
|
||||
|
||||
await this.connection.prompt({
|
||||
sessionId: this.sessionId,
|
||||
prompt: [{ type: "text", text }],
|
||||
});
|
||||
|
||||
// Flush any trailing assistant text that wasn't followed by a tool call.
|
||||
this.client.flushAssistantMessage();
|
||||
|
||||
// Extract the final assistant text from collected messages.
|
||||
const messages = this.client.messages;
|
||||
let finalText = "";
|
||||
for (let i = messages.length - 1; i >= 0; i--) {
|
||||
const msg = messages[i];
|
||||
if (msg !== undefined && msg.role === "assistant" && msg.content !== null && msg.content.trim() !== "") {
|
||||
finalText = msg.content;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
text: finalText,
|
||||
sessionId: this.sessionId,
|
||||
messages,
|
||||
};
|
||||
}
|
||||
|
||||
/** Close the connection */
|
||||
async close(): Promise<void> {
|
||||
if (this.process === null) {
|
||||
return;
|
||||
}
|
||||
this.sessionId = null;
|
||||
this.process.stdin?.end();
|
||||
const proc = this.process;
|
||||
await new Promise<void>((resolve) => {
|
||||
proc.on("close", () => resolve());
|
||||
setTimeout(resolve, 5000);
|
||||
});
|
||||
this.process = null;
|
||||
this.connection = null;
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,3 @@
|
||||
import { spawn } from "node:child_process";
|
||||
import type { Store } from "@uncaged/json-cas";
|
||||
|
||||
import {
|
||||
@@ -8,14 +7,8 @@ import {
|
||||
createAgent,
|
||||
} from "@uncaged/workflow-agent-kit";
|
||||
|
||||
import {
|
||||
loadHermesSession,
|
||||
parseSessionIdFromStdout,
|
||||
storeHermesSessionDetail,
|
||||
} from "./session-detail.js";
|
||||
|
||||
const HERMES_COMMAND = "hermes";
|
||||
const HERMES_MAX_TURNS = 90;
|
||||
import { HermesAcpClient } from "./acp-client.js";
|
||||
import { storeHermesSessionDetail } from "./session-detail.js";
|
||||
|
||||
function buildHistorySummary(steps: AgentContext["steps"]): string {
|
||||
if (steps.length === 0) {
|
||||
@@ -52,110 +45,43 @@ export function buildHermesPrompt(ctx: AgentContext): string {
|
||||
return parts.join("\n");
|
||||
}
|
||||
|
||||
function spawnHermes(args: string[]): Promise<{ stdout: string; stderr: string }> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const child = spawn(HERMES_COMMAND, args, {
|
||||
env: process.env,
|
||||
shell: false,
|
||||
stdio: ["ignore", "pipe", "pipe"],
|
||||
});
|
||||
|
||||
let stdout = "";
|
||||
let stderr = "";
|
||||
child.stdout?.on("data", (chunk: Buffer) => {
|
||||
stdout += chunk.toString();
|
||||
});
|
||||
child.stderr?.on("data", (chunk: Buffer) => {
|
||||
stderr += chunk.toString();
|
||||
});
|
||||
|
||||
child.on("error", (cause) => {
|
||||
const message = cause instanceof Error ? cause.message : String(cause);
|
||||
reject(new Error(`hermes spawn failed: ${message}`));
|
||||
});
|
||||
|
||||
child.on("close", (code) => {
|
||||
if (code === 0) {
|
||||
resolve({ stdout, stderr });
|
||||
return;
|
||||
}
|
||||
const detail = stderr.trim() !== "" ? ` stderr=${stderr.trim()}` : "";
|
||||
reject(new Error(`hermes exited with code ${code ?? "null"}${detail}`));
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function spawnHermesChat(prompt: string): Promise<{ stdout: string; stderr: string }> {
|
||||
return spawnHermes([
|
||||
"chat",
|
||||
"-q",
|
||||
prompt,
|
||||
"--yolo",
|
||||
"--max-turns",
|
||||
String(HERMES_MAX_TURNS),
|
||||
"--quiet",
|
||||
]);
|
||||
}
|
||||
|
||||
function spawnHermesResume(
|
||||
sessionId: string,
|
||||
message: string,
|
||||
): Promise<{ stdout: string; stderr: string }> {
|
||||
return spawnHermes([
|
||||
"chat",
|
||||
"--resume",
|
||||
sessionId,
|
||||
"-q",
|
||||
message,
|
||||
"--yolo",
|
||||
"--max-turns",
|
||||
String(HERMES_MAX_TURNS),
|
||||
"--quiet",
|
||||
]);
|
||||
}
|
||||
|
||||
function parseSessionId(stdout: string, stderr: string): string {
|
||||
const sessionId = parseSessionIdFromStdout(stderr) ?? parseSessionIdFromStdout(stdout);
|
||||
if (sessionId === null) {
|
||||
throw new Error(
|
||||
"Failed to parse session_id from hermes output.\n" +
|
||||
`stderr (first 200 chars): ${stderr.slice(0, 200)}\n` +
|
||||
`stdout (first 200 chars): ${stdout.slice(0, 200)}`,
|
||||
);
|
||||
}
|
||||
return sessionId;
|
||||
}
|
||||
|
||||
async function buildResultFromSession(sessionId: string, store: Store): Promise<AgentRunResult> {
|
||||
const session = await loadHermesSession(sessionId);
|
||||
if (session === null) {
|
||||
throw new Error(`Failed to load hermes session file for session_id: ${sessionId}`);
|
||||
}
|
||||
const { detailHash, output } = await storeHermesSessionDetail(store, session);
|
||||
return { output, detailHash, sessionId };
|
||||
}
|
||||
|
||||
async function runHermes(ctx: AgentContext): Promise<AgentRunResult> {
|
||||
const fullPrompt = buildHermesPrompt(ctx);
|
||||
const { stdout, stderr } = await spawnHermesChat(fullPrompt);
|
||||
const sessionId = parseSessionId(stdout, stderr);
|
||||
return buildResultFromSession(sessionId, ctx.store);
|
||||
}
|
||||
|
||||
async function continueHermes(
|
||||
sessionId: string,
|
||||
message: string,
|
||||
store: Store,
|
||||
): Promise<AgentRunResult> {
|
||||
const { stdout, stderr } = await spawnHermesResume(sessionId, message);
|
||||
// Resume may return a new session_id
|
||||
const newSessionId = parseSessionIdFromStdout(stderr) ?? parseSessionIdFromStdout(stdout);
|
||||
const resolvedId = newSessionId ?? sessionId;
|
||||
return buildResultFromSession(resolvedId, store);
|
||||
}
|
||||
|
||||
/** Agent CLI factory: parses argv, runs Hermes, extracts output, writes StepNode. */
|
||||
/**
|
||||
* Agent CLI factory: parses argv, runs Hermes, extracts output, writes StepNode.
|
||||
*
|
||||
* A single ACP client is shared across run() and continue() calls so that
|
||||
* frontmatter retry loops keep the same Hermes session context. The client
|
||||
* is closed once the agent process exits (via process.on("exit")).
|
||||
*/
|
||||
export function createHermesAgent(): () => Promise<void> {
|
||||
const client = new HermesAcpClient();
|
||||
|
||||
// Ensure cleanup regardless of how the process exits.
|
||||
process.on("exit", () => {
|
||||
void client.close();
|
||||
});
|
||||
|
||||
async function runHermes(ctx: AgentContext): Promise<AgentRunResult> {
|
||||
const fullPrompt = buildHermesPrompt(ctx);
|
||||
await client.connect(process.cwd());
|
||||
const { text, sessionId, messages } = await client.prompt(fullPrompt);
|
||||
const session = { session_id: sessionId, model: "", session_start: new Date().toISOString(), messages };
|
||||
const { detailHash } = await storeHermesSessionDetail(ctx.store, session);
|
||||
return { output: text, detailHash, sessionId };
|
||||
}
|
||||
|
||||
async function continueHermes(
|
||||
_sessionId: string,
|
||||
message: string,
|
||||
store: Store,
|
||||
): Promise<AgentRunResult> {
|
||||
// Client is already connected from runHermes — same ACP session,
|
||||
// so the agent sees the full conversation history (crucial for retries).
|
||||
const { text, sessionId, messages } = await client.prompt(message);
|
||||
const session = { session_id: sessionId, model: "", session_start: new Date().toISOString(), messages };
|
||||
const { detailHash } = await storeHermesSessionDetail(store, session);
|
||||
return { output: text, detailHash, sessionId };
|
||||
}
|
||||
|
||||
return createAgent({
|
||||
name: "hermes",
|
||||
run: runHermes,
|
||||
|
||||
@@ -1 +1,2 @@
|
||||
export { HermesAcpClient } from "./acp-client.js";
|
||||
export { buildHermesPrompt, createHermesAgent } from "./hermes.js";
|
||||
|
||||
Reference in New Issue
Block a user