fix: replace @agentclientprotocol/sdk with readline-based JSON-RPC
The official TS SDK's ndJsonStream hangs indefinitely on prompt() for sessions with 20+ messages (solve-issue planner). Root cause appears to be a stream backpressure issue in the SDK's ReadableStream adapter. Switch back to readline-based line parsing which reliably receives all JSON-RPC responses. Also handle session/request_permission inline (auto-approve, yolo mode equivalent). Ref #398
This commit is contained in:
@@ -21,7 +21,6 @@
|
|||||||
"test": "bun test"
|
"test": "bun test"
|
||||||
},
|
},
|
||||||
"dependencies": {
|
"dependencies": {
|
||||||
"@agentclientprotocol/sdk": "^0.22.1",
|
|
||||||
"@uncaged/json-cas": "^0.4.0",
|
"@uncaged/json-cas": "^0.4.0",
|
||||||
"@uncaged/workflow-agent-kit": "workspace:^"
|
"@uncaged/workflow-agent-kit": "workspace:^"
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -1,17 +1,23 @@
|
|||||||
import type { ChildProcess } from "node:child_process";
|
import type { ChildProcess } from "node:child_process";
|
||||||
import { spawn } from "node:child_process";
|
import { spawn } from "node:child_process";
|
||||||
import { Readable, Writable } from "node:stream";
|
import { createInterface } from "node:readline";
|
||||||
import type {
|
|
||||||
Client,
|
|
||||||
RequestPermissionRequest,
|
|
||||||
RequestPermissionResponse,
|
|
||||||
SessionNotification,
|
|
||||||
} from "@agentclientprotocol/sdk";
|
|
||||||
import { ClientSideConnection, ndJsonStream, PROTOCOL_VERSION } from "@agentclientprotocol/sdk";
|
|
||||||
|
|
||||||
import type { HermesSessionMessage } from "./types.js";
|
import type { HermesSessionMessage } from "./types.js";
|
||||||
|
|
||||||
const HERMES_COMMAND = "hermes";
|
const HERMES_COMMAND = "hermes";
|
||||||
|
const PROTOCOL_VERSION = 1;
|
||||||
|
|
||||||
|
type JsonRpcResponse = {
|
||||||
|
jsonrpc: "2.0";
|
||||||
|
id: number;
|
||||||
|
result?: unknown;
|
||||||
|
error?: { code: number; message: string };
|
||||||
|
};
|
||||||
|
|
||||||
|
type PendingRequest = {
|
||||||
|
resolve: (value: JsonRpcResponse) => void;
|
||||||
|
reject: (reason: Error) => void;
|
||||||
|
};
|
||||||
|
|
||||||
/** Tracks in-flight tool calls so we can build complete messages when they finish. */
|
/** Tracks in-flight tool calls so we can build complete messages when they finish. */
|
||||||
type PendingToolCall = {
|
type PendingToolCall = {
|
||||||
@@ -19,110 +25,6 @@ type PendingToolCall = {
|
|||||||
args: string;
|
args: string;
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
|
||||||
* Collects ACP session/update events into a list of {@link HermesSessionMessage}
|
|
||||||
* that mirrors what Hermes writes to its session JSONL files.
|
|
||||||
*/
|
|
||||||
class UwfAcpClient implements Client {
|
|
||||||
private messageChunks: string[] = [];
|
|
||||||
private reasoningChunks: string[] = [];
|
|
||||||
private pendingTools = new Map<string, PendingToolCall>();
|
|
||||||
messages: HermesSessionMessage[] = [];
|
|
||||||
|
|
||||||
resetPerPrompt(): void {
|
|
||||||
this.messageChunks = [];
|
|
||||||
this.reasoningChunks = [];
|
|
||||||
}
|
|
||||||
|
|
||||||
async sessionUpdate(params: SessionNotification): Promise<void> {
|
|
||||||
const { update } = params;
|
|
||||||
switch (update.sessionUpdate) {
|
|
||||||
case "agent_message_chunk":
|
|
||||||
if (update.content.type === "text") {
|
|
||||||
this.messageChunks.push(update.content.text);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
|
|
||||||
case "agent_thought_chunk":
|
|
||||||
if (update.content.type === "text") {
|
|
||||||
this.reasoningChunks.push(update.content.text);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
|
|
||||||
case "tool_call": {
|
|
||||||
// Agent is invoking a tool — record the call.
|
|
||||||
const title = update.title ?? "";
|
|
||||||
const rawInput =
|
|
||||||
update.rawInput !== undefined && update.rawInput !== null
|
|
||||||
? JSON.stringify(update.rawInput)
|
|
||||||
: "";
|
|
||||||
this.pendingTools.set(update.toolCallId, { name: title, args: rawInput });
|
|
||||||
|
|
||||||
// Flush accumulated assistant text + reasoning as an assistant message
|
|
||||||
// (the agent "spoke" before calling the tool).
|
|
||||||
this.flushAssistantMessage();
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
case "tool_call_update": {
|
|
||||||
if (update.status === "completed" || update.status === "failed") {
|
|
||||||
const pending = this.pendingTools.get(update.toolCallId);
|
|
||||||
const toolName = pending?.name ?? update.toolCallId;
|
|
||||||
const rawOutput =
|
|
||||||
update.rawOutput !== undefined && update.rawOutput !== null
|
|
||||||
? typeof update.rawOutput === "string"
|
|
||||||
? update.rawOutput
|
|
||||||
: JSON.stringify(update.rawOutput)
|
|
||||||
: "";
|
|
||||||
this.messages.push({
|
|
||||||
role: "assistant",
|
|
||||||
content: null,
|
|
||||||
reasoning: null,
|
|
||||||
tool_calls: [{ function: { name: toolName, arguments: pending?.args ?? "" } }],
|
|
||||||
});
|
|
||||||
this.messages.push({
|
|
||||||
role: "tool",
|
|
||||||
content: rawOutput,
|
|
||||||
reasoning: null,
|
|
||||||
tool_calls: null,
|
|
||||||
});
|
|
||||||
this.pendingTools.delete(update.toolCallId);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Flush any accumulated text/reasoning into an assistant message. */
|
|
||||||
flushAssistantMessage(): void {
|
|
||||||
const text = this.messageChunks.join("");
|
|
||||||
const reasoning = this.reasoningChunks.join("");
|
|
||||||
if (text !== "" || reasoning !== "") {
|
|
||||||
this.messages.push({
|
|
||||||
role: "assistant",
|
|
||||||
content: text || null,
|
|
||||||
reasoning: reasoning || null,
|
|
||||||
tool_calls: null,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
this.messageChunks = [];
|
|
||||||
this.reasoningChunks = [];
|
|
||||||
}
|
|
||||||
|
|
||||||
async requestPermission(params: RequestPermissionRequest): Promise<RequestPermissionResponse> {
|
|
||||||
const firstOption = params.options[0];
|
|
||||||
return {
|
|
||||||
outcome: {
|
|
||||||
outcome: "selected",
|
|
||||||
optionId: firstOption?.optionId ?? "",
|
|
||||||
},
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export type AcpPromptResult = {
|
export type AcpPromptResult = {
|
||||||
text: string;
|
text: string;
|
||||||
sessionId: string;
|
sessionId: string;
|
||||||
@@ -131,10 +33,16 @@ export type AcpPromptResult = {
|
|||||||
|
|
||||||
export class HermesAcpClient {
|
export class HermesAcpClient {
|
||||||
private process: ChildProcess | null = null;
|
private process: ChildProcess | null = null;
|
||||||
private connection: ClientSideConnection | null = null;
|
private nextId = 1;
|
||||||
private sessionId: string | null = null;
|
private sessionId: string | null = null;
|
||||||
private stderrBuffer = "";
|
private stderrBuffer = "";
|
||||||
private client = new UwfAcpClient();
|
private pending = new Map<number, PendingRequest>();
|
||||||
|
|
||||||
|
// Message collection state
|
||||||
|
private messageChunks: string[] = [];
|
||||||
|
private reasoningChunks: string[] = [];
|
||||||
|
private pendingTools = new Map<string, PendingToolCall>();
|
||||||
|
messages: HermesSessionMessage[] = [];
|
||||||
|
|
||||||
/** Spawn hermes acp, initialize, create session */
|
/** Spawn hermes acp, initialize, create session */
|
||||||
async connect(cwd: string): Promise<string> {
|
async connect(cwd: string): Promise<string> {
|
||||||
@@ -150,32 +58,51 @@ export class HermesAcpClient {
|
|||||||
this.stderrBuffer += chunk.toString();
|
this.stderrBuffer += chunk.toString();
|
||||||
});
|
});
|
||||||
|
|
||||||
if (child.stdin === null || child.stdout === null) {
|
child.on("error", (cause) => {
|
||||||
throw new Error("hermes acp process stdio is not available");
|
const message = cause instanceof Error ? cause.message : String(cause);
|
||||||
}
|
this.rejectAll(new Error(`hermes acp spawn failed: ${message}`));
|
||||||
|
});
|
||||||
|
|
||||||
const input = Writable.toWeb(child.stdin);
|
child.on("close", (code) => {
|
||||||
const output = Readable.toWeb(child.stdout);
|
if (code !== 0 && this.pending.size > 0) {
|
||||||
const stream = ndJsonStream(input, output);
|
|
||||||
|
|
||||||
const clientRef = this.client;
|
|
||||||
const connection = new ClientSideConnection((_agent) => clientRef, stream);
|
|
||||||
this.connection = connection;
|
|
||||||
|
|
||||||
connection.signal.addEventListener("abort", () => {
|
|
||||||
if (this.sessionId !== null) {
|
|
||||||
const detail = this.stderrBuffer.trim() !== "" ? ` stderr=${this.stderrBuffer.trim()}` : "";
|
const detail = this.stderrBuffer.trim() !== "" ? ` stderr=${this.stderrBuffer.trim()}` : "";
|
||||||
throw new Error(`hermes acp connection closed unexpectedly${detail}`);
|
this.rejectAll(
|
||||||
|
new Error(`hermes acp exited unexpectedly with code ${code ?? "null"}${detail}`),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
await connection.initialize({
|
if (child.stdout === null) {
|
||||||
protocolVersion: PROTOCOL_VERSION,
|
throw new Error("hermes acp process stdout is not available");
|
||||||
clientCapabilities: {},
|
}
|
||||||
|
const rl = createInterface({ input: child.stdout });
|
||||||
|
rl.on("line", (line) => {
|
||||||
|
this.handleLine(line.trim());
|
||||||
});
|
});
|
||||||
|
|
||||||
const sessionResult = await connection.newSession({ cwd, mcpServers: [] });
|
const initResponse = await this.sendRequest("initialize", {
|
||||||
const { sessionId } = sessionResult;
|
protocolVersion: PROTOCOL_VERSION,
|
||||||
|
clientInfo: { name: "uwf", version: "0.1.0" },
|
||||||
|
capabilities: {},
|
||||||
|
});
|
||||||
|
|
||||||
|
if ((initResponse as { error?: unknown }).error !== undefined) {
|
||||||
|
throw new Error(
|
||||||
|
`initialize failed: ${JSON.stringify((initResponse as { error: unknown }).error)}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.sendNotification("initialized");
|
||||||
|
|
||||||
|
const sessionResponse = (await this.sendRequest("session/new", {
|
||||||
|
cwd,
|
||||||
|
mcpServers: [],
|
||||||
|
})) as { result: { sessionId: string } };
|
||||||
|
|
||||||
|
const sessionId = sessionResponse.result?.sessionId;
|
||||||
|
if (typeof sessionId !== "string" || sessionId === "") {
|
||||||
|
throw new Error(`session/new did not return a sessionId: ${JSON.stringify(sessionResponse)}`);
|
||||||
|
}
|
||||||
|
|
||||||
this.sessionId = sessionId;
|
this.sessionId = sessionId;
|
||||||
return sessionId;
|
return sessionId;
|
||||||
@@ -183,26 +110,37 @@ export class HermesAcpClient {
|
|||||||
|
|
||||||
/** Send prompt and collect full response text + structured messages. */
|
/** Send prompt and collect full response text + structured messages. */
|
||||||
async prompt(text: string): Promise<AcpPromptResult> {
|
async prompt(text: string): Promise<AcpPromptResult> {
|
||||||
if (this.connection === null || this.sessionId === null) {
|
if (this.sessionId === null) {
|
||||||
throw new Error("Not connected — call connect() first");
|
throw new Error("Not connected — call connect() first");
|
||||||
}
|
}
|
||||||
|
|
||||||
this.client.resetPerPrompt();
|
this.messageChunks = [];
|
||||||
|
this.reasoningChunks = [];
|
||||||
|
|
||||||
await this.connection.prompt({
|
const response = await this.sendRequest("session/prompt", {
|
||||||
sessionId: this.sessionId,
|
sessionId: this.sessionId,
|
||||||
prompt: [{ type: "text", text }],
|
prompt: [{ type: "text", text }],
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if ((response as { error?: unknown }).error !== undefined) {
|
||||||
|
throw new Error(
|
||||||
|
`session/prompt failed: ${JSON.stringify((response as { error: unknown }).error)}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
// Flush any trailing assistant text that wasn't followed by a tool call.
|
// Flush any trailing assistant text that wasn't followed by a tool call.
|
||||||
this.client.flushAssistantMessage();
|
this.flushAssistantMessage();
|
||||||
|
|
||||||
// Extract the final assistant text from collected messages.
|
// Extract the final assistant text from collected messages.
|
||||||
const messages = this.client.messages;
|
|
||||||
let finalText = "";
|
let finalText = "";
|
||||||
for (let i = messages.length - 1; i >= 0; i--) {
|
for (let i = this.messages.length - 1; i >= 0; i--) {
|
||||||
const msg = messages[i];
|
const msg = this.messages[i];
|
||||||
if (msg !== undefined && msg.role === "assistant" && msg.content !== null && msg.content.trim() !== "") {
|
if (
|
||||||
|
msg !== undefined &&
|
||||||
|
msg.role === "assistant" &&
|
||||||
|
msg.content !== null &&
|
||||||
|
msg.content.trim() !== ""
|
||||||
|
) {
|
||||||
finalText = msg.content;
|
finalText = msg.content;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -211,7 +149,7 @@ export class HermesAcpClient {
|
|||||||
return {
|
return {
|
||||||
text: finalText,
|
text: finalText,
|
||||||
sessionId: this.sessionId,
|
sessionId: this.sessionId,
|
||||||
messages,
|
messages: this.messages,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -228,6 +166,192 @@ export class HermesAcpClient {
|
|||||||
setTimeout(resolve, 5000);
|
setTimeout(resolve, 5000);
|
||||||
});
|
});
|
||||||
this.process = null;
|
this.process = null;
|
||||||
this.connection = null;
|
}
|
||||||
|
|
||||||
|
// ---- JSON-RPC transport ----
|
||||||
|
|
||||||
|
private sendRequest(
|
||||||
|
method: string,
|
||||||
|
params: Record<string, unknown>,
|
||||||
|
timeoutMs = 10 * 60 * 1000,
|
||||||
|
): Promise<JsonRpcResponse> {
|
||||||
|
const id = this.nextId++;
|
||||||
|
return new Promise<JsonRpcResponse>((resolve, reject) => {
|
||||||
|
const timer = setTimeout(() => {
|
||||||
|
this.pending.delete(id);
|
||||||
|
reject(new Error(`Timeout waiting for response to ${method} (id=${id})`));
|
||||||
|
}, timeoutMs);
|
||||||
|
|
||||||
|
this.pending.set(id, {
|
||||||
|
resolve: (value) => {
|
||||||
|
clearTimeout(timer);
|
||||||
|
resolve(value);
|
||||||
|
},
|
||||||
|
reject: (err) => {
|
||||||
|
clearTimeout(timer);
|
||||||
|
reject(err);
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
this.writeLine(JSON.stringify({ jsonrpc: "2.0", id, method, params }));
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private sendNotification(method: string, params?: Record<string, unknown>): void {
|
||||||
|
const message: Record<string, unknown> = { jsonrpc: "2.0", method };
|
||||||
|
if (params !== undefined) {
|
||||||
|
message.params = params;
|
||||||
|
}
|
||||||
|
this.writeLine(JSON.stringify(message));
|
||||||
|
}
|
||||||
|
|
||||||
|
private writeLine(line: string): void {
|
||||||
|
if (this.process?.stdin === null || this.process?.stdin === undefined) {
|
||||||
|
throw new Error("Cannot write: hermes acp process stdin not available");
|
||||||
|
}
|
||||||
|
this.process.stdin.write(`${line}\n`);
|
||||||
|
}
|
||||||
|
|
||||||
|
private handleLine(line: string): void {
|
||||||
|
if (line === "") {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let parsed: unknown;
|
||||||
|
try {
|
||||||
|
parsed = JSON.parse(line);
|
||||||
|
} catch {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const msg = parsed as Record<string, unknown>;
|
||||||
|
|
||||||
|
// JSON-RPC response (has "id")
|
||||||
|
if ("id" in msg && msg.id !== undefined && msg.id !== null) {
|
||||||
|
const response = msg as unknown as JsonRpcResponse;
|
||||||
|
const handler = this.pending.get(response.id);
|
||||||
|
if (handler !== undefined) {
|
||||||
|
this.pending.delete(response.id);
|
||||||
|
handler.resolve(response);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// JSON-RPC notification — session/update
|
||||||
|
if (msg.method === "session/update") {
|
||||||
|
const params = msg.params as Record<string, unknown> | undefined;
|
||||||
|
const update = params?.update as Record<string, unknown> | undefined;
|
||||||
|
if (update !== undefined) {
|
||||||
|
this.handleSessionUpdate(update);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// session/request_permission — auto-approve (yolo mode)
|
||||||
|
if (msg.method === "session/request_permission") {
|
||||||
|
const params = msg.params as Record<string, unknown> | undefined;
|
||||||
|
const options = (params?.options ?? []) as Array<{ optionId?: string }>;
|
||||||
|
const firstOptionId = options[0]?.optionId ?? "";
|
||||||
|
// Respond with the selected option
|
||||||
|
const responseMsg = {
|
||||||
|
jsonrpc: "2.0",
|
||||||
|
id: (msg as Record<string, unknown>).id,
|
||||||
|
result: { outcome: { outcome: "selected", optionId: firstOptionId } },
|
||||||
|
};
|
||||||
|
this.writeLine(JSON.stringify(responseMsg));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- Session update → structured messages ----
|
||||||
|
|
||||||
|
private handleSessionUpdate(update: Record<string, unknown>): void {
|
||||||
|
const updateType = update.sessionUpdate as string;
|
||||||
|
|
||||||
|
switch (updateType) {
|
||||||
|
case "agent_message_chunk": {
|
||||||
|
const content = update.content as { type?: string; text?: string } | undefined;
|
||||||
|
if (content?.type === "text" && typeof content.text === "string") {
|
||||||
|
this.messageChunks.push(content.text);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case "agent_thought_chunk": {
|
||||||
|
const content = update.content as { type?: string; text?: string } | undefined;
|
||||||
|
if (content?.type === "text" && typeof content.text === "string") {
|
||||||
|
this.reasoningChunks.push(content.text);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case "tool_call": {
|
||||||
|
const title = (update.title as string) ?? "";
|
||||||
|
const rawInput = update.rawInput;
|
||||||
|
const args =
|
||||||
|
rawInput !== undefined && rawInput !== null ? JSON.stringify(rawInput) : "";
|
||||||
|
const toolCallId = update.toolCallId as string;
|
||||||
|
this.pendingTools.set(toolCallId, { name: title, args });
|
||||||
|
|
||||||
|
// Flush accumulated assistant text before tool call
|
||||||
|
this.flushAssistantMessage();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case "tool_call_update": {
|
||||||
|
const status = update.status as string | undefined;
|
||||||
|
if (status === "completed" || status === "failed") {
|
||||||
|
const toolCallId = update.toolCallId as string;
|
||||||
|
const pending = this.pendingTools.get(toolCallId);
|
||||||
|
const toolName = pending?.name ?? toolCallId;
|
||||||
|
const rawOutput = update.rawOutput;
|
||||||
|
const outputStr =
|
||||||
|
rawOutput !== undefined && rawOutput !== null
|
||||||
|
? typeof rawOutput === "string"
|
||||||
|
? rawOutput
|
||||||
|
: JSON.stringify(rawOutput)
|
||||||
|
: "";
|
||||||
|
this.messages.push({
|
||||||
|
role: "assistant",
|
||||||
|
content: null,
|
||||||
|
reasoning: null,
|
||||||
|
tool_calls: [{ function: { name: toolName, arguments: pending?.args ?? "" } }],
|
||||||
|
});
|
||||||
|
this.messages.push({
|
||||||
|
role: "tool",
|
||||||
|
content: outputStr,
|
||||||
|
reasoning: null,
|
||||||
|
tool_calls: null,
|
||||||
|
});
|
||||||
|
this.pendingTools.delete(toolCallId);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Flush any accumulated text/reasoning into an assistant message. */
|
||||||
|
private flushAssistantMessage(): void {
|
||||||
|
const text = this.messageChunks.join("");
|
||||||
|
const reasoning = this.reasoningChunks.join("");
|
||||||
|
if (text !== "" || reasoning !== "") {
|
||||||
|
this.messages.push({
|
||||||
|
role: "assistant",
|
||||||
|
content: text || null,
|
||||||
|
reasoning: reasoning || null,
|
||||||
|
tool_calls: null,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
this.messageChunks = [];
|
||||||
|
this.reasoningChunks = [];
|
||||||
|
}
|
||||||
|
|
||||||
|
private rejectAll(err: Error): void {
|
||||||
|
for (const handler of this.pending.values()) {
|
||||||
|
handler.reject(err);
|
||||||
|
}
|
||||||
|
this.pending.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user