Merge pull request 'fix: read token usage from ACP response instead of DB' (#94) from fix/usage-tokens-from-acp into main
CI / check (push) Successful in 3m25s

This commit was merged in pull request #94.
This commit is contained in:
2026-06-05 06:18:05 +00:00
5 changed files with 125 additions and 70 deletions
+16
View File
@@ -0,0 +1,16 @@
---
"@united-workforce/agent-hermes": patch
---
fix: read token usage from ACP PromptResponse instead of DB
Token counts (inputTokens, outputTokens) now come from the ACP
`PromptResponse.usage` field, which is populated synchronously from
`run_conversation()` return data — no WAL race condition.
Turns (assistant message count) still come from the DB via
`snapshotTurns()` before/after delta.
Previously both tokens and turns were read from the Hermes state DB
after the ACP prompt returned, but due to WAL write lag the DB often
had incomplete token data at read time (e.g. 235 vs actual 26,080).
@@ -1,5 +1,6 @@
import { describe, expect, test } from "vitest"; import { describe, expect, test } from "vitest";
import { computeUsageDelta, snapshotUsage } from "../src/hermes.js"; import type { AcpUsage } from "../src/acp-client.js";
import { buildUsage, snapshotTurns } from "../src/hermes.js";
import type { HermesSessionJson } from "../src/types.js"; import type { HermesSessionJson } from "../src/types.js";
function makeSession(overrides: Partial<HermesSessionJson> = {}): HermesSessionJson { function makeSession(overrides: Partial<HermesSessionJson> = {}): HermesSessionJson {
@@ -14,19 +15,19 @@ function makeSession(overrides: Partial<HermesSessionJson> = {}): HermesSessionJ
}; };
} }
describe("snapshotUsage", () => { describe("snapshotTurns", () => {
test("returns zero snapshot for null session", () => { test("returns zero for null session", () => {
const result = snapshotUsage(null); const result = snapshotTurns(null);
expect(result).toEqual({ turns: 0, inputTokens: 0, outputTokens: 0 }); expect(result).toEqual({ turns: 0 });
}); });
test("returns zero snapshot for empty session", () => { test("returns zero for empty session", () => {
const result = snapshotUsage(makeSession()); const result = snapshotTurns(makeSession());
expect(result).toEqual({ turns: 0, inputTokens: 0, outputTokens: 0 }); expect(result).toEqual({ turns: 0 });
}); });
test("counts assistant messages as turns", () => { test("counts assistant messages as turns", () => {
const result = snapshotUsage( const result = snapshotTurns(
makeSession({ makeSession({
messages: [ messages: [
{ role: "user", content: "hello", reasoning: null, tool_calls: null }, { role: "user", content: "hello", reasoning: null, tool_calls: null },
@@ -39,11 +40,11 @@ describe("snapshotUsage", () => {
outputTokens: 500, outputTokens: 500,
}), }),
); );
expect(result).toEqual({ turns: 2, inputTokens: 1000, outputTokens: 500 }); expect(result).toEqual({ turns: 2 });
}); });
test("ignores non-assistant messages for turn count", () => { test("ignores non-assistant messages for turn count", () => {
const result = snapshotUsage( const result = snapshotTurns(
makeSession({ makeSession({
messages: [ messages: [
{ role: "user", content: "hello", reasoning: null, tool_calls: null }, { role: "user", content: "hello", reasoning: null, tool_calls: null },
@@ -55,11 +56,13 @@ describe("snapshotUsage", () => {
}); });
}); });
describe("computeUsageDelta", () => { describe("buildUsage", () => {
test("first visit: before is zero, after has all values", () => { const acpUsage: AcpUsage = { inputTokens: 5000, outputTokens: 2000, totalTokens: 7000 };
const before = { turns: 0, inputTokens: 0, outputTokens: 0 };
const after = { turns: 3, inputTokens: 5000, outputTokens: 2000 }; test("first visit: tokens from ACP, turns from DB delta", () => {
const result = computeUsageDelta(before, after, 12.5); const beforeTurns = { turns: 0 };
const afterTurns = { turns: 3 };
const result = buildUsage(acpUsage, beforeTurns, afterTurns, 12.5);
expect(result).toEqual({ expect(result).toEqual({
turns: 3, turns: 3,
inputTokens: 5000, inputTokens: 5000,
@@ -68,43 +71,52 @@ describe("computeUsageDelta", () => {
}); });
}); });
test("re-entry: computes delta correctly", () => { test("re-entry: turn delta computed correctly, tokens from ACP", () => {
const before = { turns: 2, inputTokens: 3000, outputTokens: 1000 }; const beforeTurns = { turns: 2 };
const after = { turns: 4, inputTokens: 8000, outputTokens: 3500 }; const afterTurns = { turns: 4 };
const result = computeUsageDelta(before, after, 7.3); const acpDelta: AcpUsage = { inputTokens: 8000, outputTokens: 3500, totalTokens: 11500 };
const result = buildUsage(acpDelta, beforeTurns, afterTurns, 7.3);
expect(result).toEqual({ expect(result).toEqual({
turns: 2, turns: 2,
inputTokens: 5000, inputTokens: 8000,
outputTokens: 2500, outputTokens: 3500,
duration: 7, duration: 7,
}); });
}); });
test("floors negative deltas at 0 (defensive)", () => { test("floors negative turn deltas at 0, then defaults to 1", () => {
const before = { turns: 5, inputTokens: 10000, outputTokens: 5000 }; const beforeTurns = { turns: 5 };
const after = { turns: 3, inputTokens: 8000, outputTokens: 4000 }; const afterTurns = { turns: 3 };
const result = computeUsageDelta(before, after, 1.0); const result = buildUsage(acpUsage, beforeTurns, afterTurns, 1.0);
// turns would be negative (-2), floored to 0, then || 1 gives 1 // turns would be negative (-2), floored to 0, then || 1 gives 1
expect(result.turns).toBe(1); expect(result.turns).toBe(1);
expect(result.inputTokens).toBe(0);
expect(result.outputTokens).toBe(0);
}); });
test("zero turns delta defaults to 1 (at least one turn happened)", () => { test("zero turns delta defaults to 1 (at least one turn happened)", () => {
const before = { turns: 3, inputTokens: 1000, outputTokens: 500 }; const beforeTurns = { turns: 3 };
const after = { turns: 3, inputTokens: 2000, outputTokens: 1000 }; const afterTurns = { turns: 3 };
const result = computeUsageDelta(before, after, 5.0); const result = buildUsage(acpUsage, beforeTurns, afterTurns, 5.0);
// turns delta is 0, || 1 gives 1 // turns delta is 0, || 1 gives 1
expect(result.turns).toBe(1); expect(result.turns).toBe(1);
expect(result.inputTokens).toBe(1000); });
expect(result.outputTokens).toBe(500);
test("null ACP usage yields zero tokens", () => {
const beforeTurns = { turns: 0 };
const afterTurns = { turns: 2 };
const result = buildUsage(null, beforeTurns, afterTurns, 10.0);
expect(result).toEqual({
turns: 2,
inputTokens: 0,
outputTokens: 0,
duration: 10,
});
}); });
test("duration is rounded", () => { test("duration is rounded", () => {
const before = { turns: 0, inputTokens: 0, outputTokens: 0 }; const beforeTurns = { turns: 0 };
const after = { turns: 1, inputTokens: 100, outputTokens: 50 }; const afterTurns = { turns: 1 };
expect(computeUsageDelta(before, after, 3.7).duration).toBe(4); expect(buildUsage(acpUsage, beforeTurns, afterTurns, 3.7).duration).toBe(4);
expect(computeUsageDelta(before, after, 3.2).duration).toBe(3); expect(buildUsage(acpUsage, beforeTurns, afterTurns, 3.2).duration).toBe(3);
expect(computeUsageDelta(before, after, 0.0).duration).toBe(0); expect(buildUsage(acpUsage, beforeTurns, afterTurns, 0.0).duration).toBe(0);
}); });
}); });
+24
View File
@@ -17,9 +17,17 @@ type PendingRequest = {
reject: (reason: Error) => void; reject: (reason: Error) => void;
}; };
/** Token usage returned by ACP PromptResponse. */
export type AcpUsage = {
inputTokens: number;
outputTokens: number;
totalTokens: number;
};
export type AcpPromptResult = { export type AcpPromptResult = {
text: string; text: string;
sessionId: string; sessionId: string;
usage: AcpUsage | null;
}; };
export class HermesAcpClient { export class HermesAcpClient {
@@ -96,9 +104,25 @@ export class HermesAcpClient {
); );
} }
// Extract token usage from ACP PromptResponse.result.usage (camelCase wire format)
const result = (response as { result?: Record<string, unknown> }).result;
const rawUsage = result?.usage as Record<string, unknown> | undefined;
const usage: AcpUsage | null =
rawUsage !== undefined &&
typeof rawUsage.inputTokens === "number" &&
typeof rawUsage.outputTokens === "number" &&
typeof rawUsage.totalTokens === "number"
? {
inputTokens: rawUsage.inputTokens,
outputTokens: rawUsage.outputTokens,
totalTokens: rawUsage.totalTokens,
}
: null;
return { return {
text: this.messageChunks.join(""), text: this.messageChunks.join(""),
sessionId: this.sessionId, sessionId: this.sessionId,
usage,
}; };
} }
+32 -30
View File
@@ -8,7 +8,7 @@ import {
buildRolePrompt, buildRolePrompt,
createAgent, createAgent,
} from "@united-workforce/util-agent"; } from "@united-workforce/util-agent";
import type { AcpUsage } from "./acp-client.js";
import { HermesAcpClient } from "./acp-client.js"; import { HermesAcpClient } from "./acp-client.js";
import { getCachedSessionId, setCachedSessionId } from "./session-cache.js"; import { getCachedSessionId, setCachedSessionId } from "./session-cache.js";
import { loadHermesSession, storeHermesSessionDetail } from "./session-detail.js"; import { loadHermesSession, storeHermesSessionDetail } from "./session-detail.js";
@@ -17,36 +17,37 @@ import type { HermesSessionJson } from "./types.js";
const log = createLogger({ sink: { kind: "stderr" } }); const log = createLogger({ sink: { kind: "stderr" } });
/** Snapshot of session metrics taken before and after a prompt call. */ /** Snapshot of session metrics taken before and after a prompt call. */
type UsageSnapshot = { type TurnsSnapshot = {
turns: number; turns: number;
inputTokens: number;
outputTokens: number;
}; };
const ZERO_SNAPSHOT: UsageSnapshot = { turns: 0, inputTokens: 0, outputTokens: 0 }; const ZERO_TURNS: TurnsSnapshot = { turns: 0 };
/** Extract usage metrics from a session. Returns zeros for null sessions. */ /** Extract assistant turn count from a session. Returns zero for null sessions. */
export function snapshotUsage(session: HermesSessionJson | null): UsageSnapshot { export function snapshotTurns(session: HermesSessionJson | null): TurnsSnapshot {
if (session === null) { if (session === null) {
return ZERO_SNAPSHOT; return ZERO_TURNS;
} }
return { return {
turns: session.messages.filter((m) => m.role === "assistant").length, turns: session.messages.filter((m) => m.role === "assistant").length,
inputTokens: session.inputTokens,
outputTokens: session.outputTokens,
}; };
} }
/** Compute the delta between two snapshots (after minus before). Floors at 0. */ /**
export function computeUsageDelta( * Build Usage from ACP token data + DB turn delta.
before: UsageSnapshot, * Tokens come from ACP PromptResponse (synchronous, accurate).
after: UsageSnapshot, * Turns come from DB before/after snapshots (may have WAL lag, but acceptable).
*/
export function buildUsage(
acpUsage: AcpUsage | null,
beforeTurns: TurnsSnapshot,
afterTurns: TurnsSnapshot,
durationSec: number, durationSec: number,
): Usage { ): Usage {
return { return {
turns: Math.max(0, after.turns - before.turns) || 1, turns: Math.max(0, afterTurns.turns - beforeTurns.turns) || 1,
inputTokens: Math.max(0, after.inputTokens - before.inputTokens), inputTokens: acpUsage?.inputTokens ?? 0,
outputTokens: Math.max(0, after.outputTokens - before.outputTokens), outputTokens: acpUsage?.outputTokens ?? 0,
duration: Math.round(durationSec), duration: Math.round(durationSec),
}; };
} }
@@ -148,12 +149,12 @@ export function createHermesAgent(resumeDisabled: boolean): () => Promise<void>
async function runPrompt( async function runPrompt(
ctx: AgentContext, ctx: AgentContext,
useContinuation: boolean, useContinuation: boolean,
beforeSnapshot: UsageSnapshot, beforeTurns: TurnsSnapshot,
): Promise<AgentRunResult> { ): Promise<AgentRunResult> {
const effectiveCtx = useContinuation ? ctx : { ...ctx, isFirstVisit: true }; const effectiveCtx = useContinuation ? ctx : { ...ctx, isFirstVisit: true };
const fullPrompt = buildHermesPrompt(effectiveCtx); const fullPrompt = buildHermesPrompt(effectiveCtx);
const startMs = Date.now(); const startMs = Date.now();
const { text, sessionId } = await client.prompt(fullPrompt); const { text, sessionId, usage: acpUsage } = await client.prompt(fullPrompt);
const durationSec = (Date.now() - startMs) / 1000; const durationSec = (Date.now() - startMs) / 1000;
const { detailHash } = await storePromptResult(ctx.store, sessionId); const { detailHash } = await storePromptResult(ctx.store, sessionId);
@@ -161,9 +162,10 @@ export function createHermesAgent(resumeDisabled: boolean): () => Promise<void>
await setCachedSessionId(ctx.threadId, ctx.role, sessionId, ctx.storageRoot); await setCachedSessionId(ctx.threadId, ctx.role, sessionId, ctx.storageRoot);
} }
// Turns from DB (may lag slightly due to WAL, but acceptable)
const afterSession = await loadHermesSession(sessionId); const afterSession = await loadHermesSession(sessionId);
const afterSnapshot = snapshotUsage(afterSession); const afterTurns = snapshotTurns(afterSession);
const usage = computeUsageDelta(beforeSnapshot, afterSnapshot, durationSec); const usage = buildUsage(acpUsage, beforeTurns, afterTurns, durationSec);
return { output: text, detailHash, sessionId, assembledPrompt: fullPrompt, usage }; return { output: text, detailHash, sessionId, assembledPrompt: fullPrompt, usage };
} }
@@ -173,16 +175,16 @@ export function createHermesAgent(resumeDisabled: boolean): () => Promise<void>
const attempt = await prepareSession(client, ctx, cwd, resumeDisabled); const attempt = await prepareSession(client, ctx, cwd, resumeDisabled);
// Snapshot before prompt: for resumed sessions, captures cumulative state // Snapshot before prompt: for resumed sessions, captures cumulative state
// so we can compute the delta. For new sessions, this is ZERO_SNAPSHOT. // so we can compute the turn delta. For new sessions, this is ZERO_TURNS.
const currentSessionId = client.getSessionId(); const currentSessionId = client.getSessionId();
const beforeSession = const beforeSession =
attempt.resumed && currentSessionId !== null attempt.resumed && currentSessionId !== null
? await loadHermesSession(currentSessionId) ? await loadHermesSession(currentSessionId)
: null; : null;
const beforeSnapshot = snapshotUsage(beforeSession); const beforeTurns = snapshotTurns(beforeSession);
try { try {
return await runPrompt(ctx, attempt.useContinuation, beforeSnapshot); return await runPrompt(ctx, attempt.useContinuation, beforeTurns);
} catch (error) { } catch (error) {
if (!attempt.resumed) { if (!attempt.resumed) {
throw error; throw error;
@@ -193,7 +195,7 @@ export function createHermesAgent(resumeDisabled: boolean): () => Promise<void>
await client.close(); await client.close();
await client.connect(cwd); await client.connect(cwd);
// Fresh session after retry — reset snapshot to zero // Fresh session after retry — reset snapshot to zero
return runPrompt(ctx, false, ZERO_SNAPSHOT); return runPrompt(ctx, false, ZERO_TURNS);
} }
} }
@@ -204,20 +206,20 @@ export function createHermesAgent(resumeDisabled: boolean): () => Promise<void>
): Promise<AgentRunResult> { ): Promise<AgentRunResult> {
// Client is already connected from runHermes — same ACP session, // Client is already connected from runHermes — same ACP session,
// so the agent sees the full conversation history (crucial for retries). // so the agent sees the full conversation history (crucial for retries).
// Snapshot before the continuation prompt for delta computation. // Snapshot turns before the continuation prompt for delta computation.
const currentSessionId = client.getSessionId(); const currentSessionId = client.getSessionId();
const beforeSession = const beforeSession =
currentSessionId !== null ? await loadHermesSession(currentSessionId) : null; currentSessionId !== null ? await loadHermesSession(currentSessionId) : null;
const beforeSnapshot = snapshotUsage(beforeSession); const beforeTurns = snapshotTurns(beforeSession);
const startMs = Date.now(); const startMs = Date.now();
const { text, sessionId } = await client.prompt(message); const { text, sessionId, usage: acpUsage } = await client.prompt(message);
const durationSec = (Date.now() - startMs) / 1000; const durationSec = (Date.now() - startMs) / 1000;
const { detailHash } = await storePromptResult(store, sessionId); const { detailHash } = await storePromptResult(store, sessionId);
const afterSession = await loadHermesSession(sessionId); const afterSession = await loadHermesSession(sessionId);
const afterSnapshot = snapshotUsage(afterSession); const afterTurns = snapshotTurns(afterSession);
const usage = computeUsageDelta(beforeSnapshot, afterSnapshot, durationSec); const usage = buildUsage(acpUsage, beforeTurns, afterTurns, durationSec);
return { output: text, detailHash, sessionId, assembledPrompt: "", usage }; return { output: text, detailHash, sessionId, assembledPrompt: "", usage };
} }
+3 -2
View File
@@ -1,7 +1,8 @@
export type { AcpUsage } from "./acp-client.js";
export { HermesAcpClient } from "./acp-client.js"; export { HermesAcpClient } from "./acp-client.js";
export { export {
buildHermesPrompt, buildHermesPrompt,
computeUsageDelta, buildUsage,
createHermesAgent, createHermesAgent,
snapshotUsage, snapshotTurns,
} from "./hermes.js"; } from "./hermes.js";