fix: read token usage from ACP response instead of DB #94

Merged
xiaomo merged 1 commits from fix/usage-tokens-from-acp into main 2026-06-05 06:18:05 +00:00
5 changed files with 125 additions and 70 deletions
+16
View File
@@ -0,0 +1,16 @@
---
"@united-workforce/agent-hermes": patch
---
fix: read token usage from ACP PromptResponse instead of DB
Token counts (inputTokens, outputTokens) now come from the ACP
`PromptResponse.usage` field, which is populated synchronously from
`run_conversation()` return data — no WAL race condition.
Turns (assistant message count) still come from the DB via
`snapshotTurns()` before/after delta.
Previously both tokens and turns were read from the Hermes state DB
after the ACP prompt returned, but due to WAL write lag the DB often
had incomplete token data at read time (e.g. 235 vs actual 26,080).
@@ -1,5 +1,6 @@
import { describe, expect, test } from "vitest";
import { computeUsageDelta, snapshotUsage } from "../src/hermes.js";
import type { AcpUsage } from "../src/acp-client.js";
import { buildUsage, snapshotTurns } from "../src/hermes.js";
import type { HermesSessionJson } from "../src/types.js";
function makeSession(overrides: Partial<HermesSessionJson> = {}): HermesSessionJson {
@@ -14,19 +15,19 @@ function makeSession(overrides: Partial<HermesSessionJson> = {}): HermesSessionJ
};
}
describe("snapshotUsage", () => {
test("returns zero snapshot for null session", () => {
const result = snapshotUsage(null);
expect(result).toEqual({ turns: 0, inputTokens: 0, outputTokens: 0 });
describe("snapshotTurns", () => {
test("returns zero for null session", () => {
const result = snapshotTurns(null);
expect(result).toEqual({ turns: 0 });
});
test("returns zero snapshot for empty session", () => {
const result = snapshotUsage(makeSession());
expect(result).toEqual({ turns: 0, inputTokens: 0, outputTokens: 0 });
test("returns zero for empty session", () => {
const result = snapshotTurns(makeSession());
expect(result).toEqual({ turns: 0 });
});
test("counts assistant messages as turns", () => {
const result = snapshotUsage(
const result = snapshotTurns(
makeSession({
messages: [
{ role: "user", content: "hello", reasoning: null, tool_calls: null },
@@ -39,11 +40,11 @@ describe("snapshotUsage", () => {
outputTokens: 500,
}),
);
expect(result).toEqual({ turns: 2, inputTokens: 1000, outputTokens: 500 });
expect(result).toEqual({ turns: 2 });
});
test("ignores non-assistant messages for turn count", () => {
const result = snapshotUsage(
const result = snapshotTurns(
makeSession({
messages: [
{ role: "user", content: "hello", reasoning: null, tool_calls: null },
@@ -55,11 +56,13 @@ describe("snapshotUsage", () => {
});
});
describe("computeUsageDelta", () => {
test("first visit: before is zero, after has all values", () => {
const before = { turns: 0, inputTokens: 0, outputTokens: 0 };
const after = { turns: 3, inputTokens: 5000, outputTokens: 2000 };
const result = computeUsageDelta(before, after, 12.5);
describe("buildUsage", () => {
const acpUsage: AcpUsage = { inputTokens: 5000, outputTokens: 2000, totalTokens: 7000 };
test("first visit: tokens from ACP, turns from DB delta", () => {
const beforeTurns = { turns: 0 };
const afterTurns = { turns: 3 };
const result = buildUsage(acpUsage, beforeTurns, afterTurns, 12.5);
expect(result).toEqual({
turns: 3,
inputTokens: 5000,
@@ -68,43 +71,52 @@ describe("computeUsageDelta", () => {
});
});
test("re-entry: computes delta correctly", () => {
const before = { turns: 2, inputTokens: 3000, outputTokens: 1000 };
const after = { turns: 4, inputTokens: 8000, outputTokens: 3500 };
const result = computeUsageDelta(before, after, 7.3);
test("re-entry: turn delta computed correctly, tokens from ACP", () => {
const beforeTurns = { turns: 2 };
const afterTurns = { turns: 4 };
const acpDelta: AcpUsage = { inputTokens: 8000, outputTokens: 3500, totalTokens: 11500 };
const result = buildUsage(acpDelta, beforeTurns, afterTurns, 7.3);
expect(result).toEqual({
turns: 2,
inputTokens: 5000,
outputTokens: 2500,
inputTokens: 8000,
outputTokens: 3500,
duration: 7,
});
});
test("floors negative deltas at 0 (defensive)", () => {
const before = { turns: 5, inputTokens: 10000, outputTokens: 5000 };
const after = { turns: 3, inputTokens: 8000, outputTokens: 4000 };
const result = computeUsageDelta(before, after, 1.0);
test("floors negative turn deltas at 0, then defaults to 1", () => {
const beforeTurns = { turns: 5 };
const afterTurns = { turns: 3 };
const result = buildUsage(acpUsage, beforeTurns, afterTurns, 1.0);
// turns would be negative (-2), floored to 0, then || 1 gives 1
expect(result.turns).toBe(1);
expect(result.inputTokens).toBe(0);
expect(result.outputTokens).toBe(0);
});
test("zero turns delta defaults to 1 (at least one turn happened)", () => {
const before = { turns: 3, inputTokens: 1000, outputTokens: 500 };
const after = { turns: 3, inputTokens: 2000, outputTokens: 1000 };
const result = computeUsageDelta(before, after, 5.0);
const beforeTurns = { turns: 3 };
const afterTurns = { turns: 3 };
const result = buildUsage(acpUsage, beforeTurns, afterTurns, 5.0);
// turns delta is 0, || 1 gives 1
expect(result.turns).toBe(1);
expect(result.inputTokens).toBe(1000);
expect(result.outputTokens).toBe(500);
});
test("null ACP usage yields zero tokens", () => {
const beforeTurns = { turns: 0 };
const afterTurns = { turns: 2 };
const result = buildUsage(null, beforeTurns, afterTurns, 10.0);
expect(result).toEqual({
turns: 2,
inputTokens: 0,
outputTokens: 0,
duration: 10,
});
});
test("duration is rounded", () => {
const before = { turns: 0, inputTokens: 0, outputTokens: 0 };
const after = { turns: 1, inputTokens: 100, outputTokens: 50 };
expect(computeUsageDelta(before, after, 3.7).duration).toBe(4);
expect(computeUsageDelta(before, after, 3.2).duration).toBe(3);
expect(computeUsageDelta(before, after, 0.0).duration).toBe(0);
const beforeTurns = { turns: 0 };
const afterTurns = { turns: 1 };
expect(buildUsage(acpUsage, beforeTurns, afterTurns, 3.7).duration).toBe(4);
expect(buildUsage(acpUsage, beforeTurns, afterTurns, 3.2).duration).toBe(3);
expect(buildUsage(acpUsage, beforeTurns, afterTurns, 0.0).duration).toBe(0);
});
});
+24
View File
@@ -17,9 +17,17 @@ type PendingRequest = {
reject: (reason: Error) => void;
};
/** Token usage returned by ACP PromptResponse. */
export type AcpUsage = {
inputTokens: number;
outputTokens: number;
totalTokens: number;
};
export type AcpPromptResult = {
text: string;
sessionId: string;
usage: AcpUsage | null;
};
export class HermesAcpClient {
@@ -96,9 +104,25 @@ export class HermesAcpClient {
);
}
// Extract token usage from ACP PromptResponse.result.usage (camelCase wire format)
const result = (response as { result?: Record<string, unknown> }).result;
const rawUsage = result?.usage as Record<string, unknown> | undefined;
const usage: AcpUsage | null =
rawUsage !== undefined &&
typeof rawUsage.inputTokens === "number" &&
typeof rawUsage.outputTokens === "number" &&
typeof rawUsage.totalTokens === "number"
? {
inputTokens: rawUsage.inputTokens,
outputTokens: rawUsage.outputTokens,
totalTokens: rawUsage.totalTokens,
}
: null;
return {
text: this.messageChunks.join(""),
sessionId: this.sessionId,
usage,
};
}
+32 -30
View File
@@ -8,7 +8,7 @@ import {
buildRolePrompt,
createAgent,
} from "@united-workforce/util-agent";
import type { AcpUsage } from "./acp-client.js";
import { HermesAcpClient } from "./acp-client.js";
import { getCachedSessionId, setCachedSessionId } from "./session-cache.js";
import { loadHermesSession, storeHermesSessionDetail } from "./session-detail.js";
@@ -17,36 +17,37 @@ import type { HermesSessionJson } from "./types.js";
const log = createLogger({ sink: { kind: "stderr" } });
/** Snapshot of session metrics taken before and after a prompt call. */
type UsageSnapshot = {
type TurnsSnapshot = {
turns: number;
inputTokens: number;
outputTokens: number;
};
const ZERO_SNAPSHOT: UsageSnapshot = { turns: 0, inputTokens: 0, outputTokens: 0 };
const ZERO_TURNS: TurnsSnapshot = { turns: 0 };
/** Extract usage metrics from a session. Returns zeros for null sessions. */
export function snapshotUsage(session: HermesSessionJson | null): UsageSnapshot {
/** Extract assistant turn count from a session. Returns zero for null sessions. */
export function snapshotTurns(session: HermesSessionJson | null): TurnsSnapshot {
if (session === null) {
return ZERO_SNAPSHOT;
return ZERO_TURNS;
}
return {
turns: session.messages.filter((m) => m.role === "assistant").length,
inputTokens: session.inputTokens,
outputTokens: session.outputTokens,
};
}
/** Compute the delta between two snapshots (after minus before). Floors at 0. */
export function computeUsageDelta(
before: UsageSnapshot,
after: UsageSnapshot,
/**
* Build Usage from ACP token data + DB turn delta.
* Tokens come from ACP PromptResponse (synchronous, accurate).
* Turns come from DB before/after snapshots (may have WAL lag, but acceptable).
*/
export function buildUsage(
acpUsage: AcpUsage | null,
beforeTurns: TurnsSnapshot,
afterTurns: TurnsSnapshot,
durationSec: number,
): Usage {
return {
turns: Math.max(0, after.turns - before.turns) || 1,
inputTokens: Math.max(0, after.inputTokens - before.inputTokens),
outputTokens: Math.max(0, after.outputTokens - before.outputTokens),
turns: Math.max(0, afterTurns.turns - beforeTurns.turns) || 1,
inputTokens: acpUsage?.inputTokens ?? 0,
outputTokens: acpUsage?.outputTokens ?? 0,
duration: Math.round(durationSec),
};
}
@@ -148,12 +149,12 @@ export function createHermesAgent(resumeDisabled: boolean): () => Promise<void>
async function runPrompt(
ctx: AgentContext,
useContinuation: boolean,
beforeSnapshot: UsageSnapshot,
beforeTurns: TurnsSnapshot,
): Promise<AgentRunResult> {
const effectiveCtx = useContinuation ? ctx : { ...ctx, isFirstVisit: true };
const fullPrompt = buildHermesPrompt(effectiveCtx);
const startMs = Date.now();
const { text, sessionId } = await client.prompt(fullPrompt);
const { text, sessionId, usage: acpUsage } = await client.prompt(fullPrompt);
const durationSec = (Date.now() - startMs) / 1000;
const { detailHash } = await storePromptResult(ctx.store, sessionId);
@@ -161,9 +162,10 @@ export function createHermesAgent(resumeDisabled: boolean): () => Promise<void>
await setCachedSessionId(ctx.threadId, ctx.role, sessionId, ctx.storageRoot);
}
// Turns from DB (may lag slightly due to WAL, but acceptable)
const afterSession = await loadHermesSession(sessionId);
const afterSnapshot = snapshotUsage(afterSession);
const usage = computeUsageDelta(beforeSnapshot, afterSnapshot, durationSec);
const afterTurns = snapshotTurns(afterSession);
const usage = buildUsage(acpUsage, beforeTurns, afterTurns, durationSec);
return { output: text, detailHash, sessionId, assembledPrompt: fullPrompt, usage };
}
@@ -173,16 +175,16 @@ export function createHermesAgent(resumeDisabled: boolean): () => Promise<void>
const attempt = await prepareSession(client, ctx, cwd, resumeDisabled);
// Snapshot before prompt: for resumed sessions, captures cumulative state
// so we can compute the delta. For new sessions, this is ZERO_SNAPSHOT.
// so we can compute the turn delta. For new sessions, this is ZERO_TURNS.
const currentSessionId = client.getSessionId();
const beforeSession =
attempt.resumed && currentSessionId !== null
? await loadHermesSession(currentSessionId)
: null;
const beforeSnapshot = snapshotUsage(beforeSession);
const beforeTurns = snapshotTurns(beforeSession);
try {
return await runPrompt(ctx, attempt.useContinuation, beforeSnapshot);
return await runPrompt(ctx, attempt.useContinuation, beforeTurns);
} catch (error) {
if (!attempt.resumed) {
throw error;
@@ -193,7 +195,7 @@ export function createHermesAgent(resumeDisabled: boolean): () => Promise<void>
await client.close();
await client.connect(cwd);
// Fresh session after retry — reset snapshot to zero
return runPrompt(ctx, false, ZERO_SNAPSHOT);
return runPrompt(ctx, false, ZERO_TURNS);
}
}
@@ -204,20 +206,20 @@ export function createHermesAgent(resumeDisabled: boolean): () => Promise<void>
): Promise<AgentRunResult> {
// Client is already connected from runHermes — same ACP session,
// so the agent sees the full conversation history (crucial for retries).
// Snapshot before the continuation prompt for delta computation.
// Snapshot turns before the continuation prompt for delta computation.
const currentSessionId = client.getSessionId();
const beforeSession =
currentSessionId !== null ? await loadHermesSession(currentSessionId) : null;
const beforeSnapshot = snapshotUsage(beforeSession);
const beforeTurns = snapshotTurns(beforeSession);
const startMs = Date.now();
const { text, sessionId } = await client.prompt(message);
const { text, sessionId, usage: acpUsage } = await client.prompt(message);
const durationSec = (Date.now() - startMs) / 1000;
const { detailHash } = await storePromptResult(store, sessionId);
const afterSession = await loadHermesSession(sessionId);
const afterSnapshot = snapshotUsage(afterSession);
const usage = computeUsageDelta(beforeSnapshot, afterSnapshot, durationSec);
const afterTurns = snapshotTurns(afterSession);
const usage = buildUsage(acpUsage, beforeTurns, afterTurns, durationSec);
return { output: text, detailHash, sessionId, assembledPrompt: "", usage };
}
+3 -2
View File
@@ -1,7 +1,8 @@
export type { AcpUsage } from "./acp-client.js";
export { HermesAcpClient } from "./acp-client.js";
export {
buildHermesPrompt,
computeUsageDelta,
buildUsage,
createHermesAgent,
snapshotUsage,
snapshotTurns,
} from "./hermes.js";