import { createReadStream } from "node:fs"; import { readdir } from "node:fs/promises"; import { homedir } from "node:os"; import { join } from "node:path"; import { createInterface } from "node:readline"; import { hermesSessionMessageStats } from "./schema.ts"; const MEASUREMENT_WINDOW_MS = 900_000; const MEASUREMENT_WINDOW_SECONDS = 900; /** * @param {string} filePath * @param {number} cutoffMs * @param {number} nowMs * @returns {Promise<{ user: number; assistant: number; tool: number; fileHadActivity: boolean }>} */ async function aggregateJsonlFile(filePath, cutoffMs, nowMs) { let user = 0; let assistant = 0; let tool = 0; let fileHadActivity = false; const input = createReadStream(filePath, { encoding: "utf8" }); const rl = createInterface({ input, crlfDelay: Infinity }); try { for await (const line of rl) { const trimmed = line.trim(); if (!trimmed) continue; let obj; try { obj = JSON.parse(trimmed); } catch { continue; } if (typeof obj.role !== "string" || typeof obj.timestamp !== "string") { continue; } const t = Date.parse(obj.timestamp); if (!Number.isFinite(t) || t < cutoffMs || t > nowMs) continue; const roleNorm = obj.role.trim().toLowerCase(); if (roleNorm === "user") { user++; fileHadActivity = true; } else if (roleNorm === "assistant") { assistant++; fileHadActivity = true; } else if (roleNorm === "tool") { tool++; fileHadActivity = true; } } } finally { rl.close(); } return { user, assistant, tool, fileHadActivity }; } export async function compute(db, _peers) { const nowMs = Date.now(); const cutoffMs = nowMs - MEASUREMENT_WINDOW_MS; const ts = nowMs; let totalUserMessages = 0; let totalAssistantMessages = 0; let totalToolMessages = 0; let activeSessions = 0; const sessionsDir = join(homedir(), ".hermes", "sessions"); let files = []; try { const entries = await readdir(sessionsDir, { withFileTypes: true }); files = entries .filter((e) => e.isFile() && e.name.endsWith(".jsonl")) .map((e) => join(sessionsDir, e.name)); } catch (err) { if (err && typeof err === "object" && "code" in err && err.code === "ENOENT") { files = []; } else { throw err; } } for (const filePath of files) { const { user, assistant, tool, fileHadActivity } = await aggregateJsonlFile( filePath, cutoffMs, nowMs, ); totalUserMessages += user; totalAssistantMessages += assistant; totalToolMessages += tool; if (fileHadActivity) activeSessions++; } const totalMessages = totalUserMessages + totalAssistantMessages + totalToolMessages; const row = { ts, totalUserMessages, totalAssistantMessages, totalToolMessages, totalMessages, activeSessions, measurementWindowSeconds: MEASUREMENT_WINDOW_SECONDS, }; await db.insert(hermesSessionMessageStats).values(row); return { ts: row.ts, totalUserMessages: row.totalUserMessages, totalAssistantMessages: row.totalAssistantMessages, totalToolMessages: row.totalToolMessages, totalMessages: row.totalMessages, activeSessions: row.activeSessions, measurementWindowSeconds: row.measurementWindowSeconds, }; }