feat: add restart-gateway workflow, remove unused senses
- Remove 4 data-only senses (linux-system-health, worker-process-metrics, hermes-session-message-stats, git-workspace-status) — none triggered workflows - Refactor hermes-gateway-health sense: add state tracking, trigger restart-gateway workflow after 3 consecutive failures (with 5min cooldown) - Add restart-gateway workflow: restarter role (systemctl restart) + verifier role (check service came back) - Simplify nerve.yaml to single sense + single workflow
This commit is contained in:
parent
436ccf12b3
commit
29d47bd9c4
33
nerve.yaml
33
nerve.yaml
@ -5,42 +5,13 @@ extract:
|
|||||||
model: qwen-plus
|
model: qwen-plus
|
||||||
|
|
||||||
senses:
|
senses:
|
||||||
linux-system-health:
|
|
||||||
group: system
|
|
||||||
interval: 30s
|
|
||||||
throttle: 10s
|
|
||||||
timeout: 15s
|
|
||||||
hermes-gateway-health:
|
hermes-gateway-health:
|
||||||
group: system
|
group: system
|
||||||
interval: 2m
|
interval: 2m
|
||||||
throttle: 30s
|
throttle: 30s
|
||||||
timeout: 30s
|
timeout: 30s
|
||||||
hermes-session-message-stats:
|
|
||||||
group: hermes
|
|
||||||
interval: 15m
|
|
||||||
throttle: 30s
|
|
||||||
timeout: 60s
|
|
||||||
worker-process-metrics:
|
|
||||||
group: system
|
|
||||||
interval: 1m
|
|
||||||
throttle: 15s
|
|
||||||
timeout: 5s
|
|
||||||
git-workspace-status:
|
|
||||||
group: workspace
|
|
||||||
interval: 2m
|
|
||||||
throttle: 30s
|
|
||||||
timeout: 15s
|
|
||||||
|
|
||||||
workflows:
|
workflows:
|
||||||
develop-sense:
|
restart-gateway:
|
||||||
concurrency: 1
|
concurrency: 1
|
||||||
overflow: queue
|
overflow: drop
|
||||||
develop-workflow:
|
|
||||||
concurrency: 1
|
|
||||||
overflow: queue
|
|
||||||
solve-issue:
|
|
||||||
concurrency: 1
|
|
||||||
overflow: queue
|
|
||||||
extract-knowledge:
|
|
||||||
concurrency: 1
|
|
||||||
overflow: queue
|
|
||||||
|
|||||||
@ -1,13 +0,0 @@
|
|||||||
-- Migration: 0001_init
|
|
||||||
-- Creates the snapshots table for git-workspace-status sense.
|
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS snapshots (
|
|
||||||
ts INTEGER PRIMARY KEY,
|
|
||||||
branch TEXT NOT NULL,
|
|
||||||
head_short TEXT NOT NULL,
|
|
||||||
porcelain_lines INTEGER NOT NULL,
|
|
||||||
has_upstream INTEGER NOT NULL,
|
|
||||||
ahead_count INTEGER NOT NULL,
|
|
||||||
behind_count INTEGER NOT NULL,
|
|
||||||
git_error TEXT NOT NULL
|
|
||||||
);
|
|
||||||
@ -1,76 +0,0 @@
|
|||||||
import { execFileSync } from "node:child_process";
|
|
||||||
import { resolve } from "node:path";
|
|
||||||
export { snapshots as table } from "./schema.ts";
|
|
||||||
|
|
||||||
const GIT_TIMEOUT_MS = 15_000;
|
|
||||||
|
|
||||||
function workspaceRoot(): string {
|
|
||||||
const raw = process.env.GIT_WORKSPACE_ROOT;
|
|
||||||
return raw ? resolve(raw) : resolve(process.cwd());
|
|
||||||
}
|
|
||||||
|
|
||||||
function gitErrorMessage(err: unknown): string {
|
|
||||||
if (err instanceof Error) {
|
|
||||||
const m = err.message.trim();
|
|
||||||
return m.length > 200 ? `${m.slice(0, 197)}...` : m;
|
|
||||||
}
|
|
||||||
return String(err);
|
|
||||||
}
|
|
||||||
|
|
||||||
function runGit(cwd: string, args: string[]): string {
|
|
||||||
return execFileSync("git", args, {
|
|
||||||
cwd,
|
|
||||||
encoding: "utf8",
|
|
||||||
timeout: GIT_TIMEOUT_MS,
|
|
||||||
maxBuffer: 2 * 1024 * 1024,
|
|
||||||
}).trimEnd();
|
|
||||||
}
|
|
||||||
|
|
||||||
function countPorcelainLines(output: string): number {
|
|
||||||
if (!output) return 0;
|
|
||||||
return output.split("\n").filter((line) => line.length > 0).length;
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function compute() {
|
|
||||||
const root = workspaceRoot();
|
|
||||||
const ts = Date.now();
|
|
||||||
|
|
||||||
let branch = "";
|
|
||||||
let headShort = "";
|
|
||||||
let porcelainLines = 0;
|
|
||||||
let hasUpstream = 0;
|
|
||||||
let aheadCount = 0;
|
|
||||||
let behindCount = 0;
|
|
||||||
let gitError = "";
|
|
||||||
|
|
||||||
try {
|
|
||||||
const inside = runGit(root, ["rev-parse", "--is-inside-work-tree"]).trim();
|
|
||||||
if (inside !== "true") {
|
|
||||||
gitError = "not a git work tree";
|
|
||||||
return { signal: { ts, branch, headShort, porcelainLines, hasUpstream, aheadCount, behindCount, gitError }, workflow: null };
|
|
||||||
}
|
|
||||||
|
|
||||||
branch = runGit(root, ["rev-parse", "--abbrev-ref", "HEAD"]);
|
|
||||||
headShort = runGit(root, ["rev-parse", "--short", "HEAD"]);
|
|
||||||
porcelainLines = countPorcelainLines(runGit(root, ["status", "--porcelain"]));
|
|
||||||
|
|
||||||
try {
|
|
||||||
runGit(root, ["rev-parse", "--abbrev-ref", "@{upstream}"]);
|
|
||||||
hasUpstream = 1;
|
|
||||||
const lb = runGit(root, ["rev-list", "--left-right", "--count", "HEAD...@{upstream}"]);
|
|
||||||
const parts = lb.split(/[\t\s]+/).filter(Boolean);
|
|
||||||
if (parts.length >= 2) {
|
|
||||||
aheadCount = Number.parseInt(parts[0], 10) || 0;
|
|
||||||
behindCount = Number.parseInt(parts[1], 10) || 0;
|
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
hasUpstream = 0;
|
|
||||||
aheadCount = 0;
|
|
||||||
behindCount = 0;
|
|
||||||
}
|
|
||||||
} catch (e) {
|
|
||||||
gitError = gitErrorMessage(e);
|
|
||||||
}
|
|
||||||
|
|
||||||
return { signal: { ts, branch, headShort, porcelainLines, hasUpstream, aheadCount, behindCount, gitError }, workflow: null };
|
|
||||||
}
|
|
||||||
@ -1,13 +0,0 @@
|
|||||||
import { integer, sqliteTable, text } from "drizzle-orm/sqlite-core";
|
|
||||||
|
|
||||||
export const snapshots = sqliteTable("snapshots", {
|
|
||||||
ts: integer("ts").primaryKey(),
|
|
||||||
branch: text("branch").notNull(),
|
|
||||||
headShort: text("head_short").notNull(),
|
|
||||||
porcelainLines: integer("porcelain_lines").notNull(),
|
|
||||||
hasUpstream: integer("has_upstream").notNull(),
|
|
||||||
aheadCount: integer("ahead_count").notNull(),
|
|
||||||
behindCount: integer("behind_count").notNull(),
|
|
||||||
/** Empty string when the snapshot succeeded; otherwise a short error summary. */
|
|
||||||
gitError: text("git_error").notNull(),
|
|
||||||
});
|
|
||||||
@ -9,6 +9,22 @@ const HTTP_TIMEOUT_MS = Math.min(23_000, EXEC_TIMEOUT_MS - 2000);
|
|||||||
|
|
||||||
const HTTP_ERROR_MAX_LEN = 256;
|
const HTTP_ERROR_MAX_LEN = 256;
|
||||||
|
|
||||||
|
/** How many consecutive failures before triggering a restart workflow. */
|
||||||
|
const FAILURE_THRESHOLD = 3;
|
||||||
|
|
||||||
|
type SenseState = {
|
||||||
|
consecutiveFailures: number;
|
||||||
|
lastRestartTs: number;
|
||||||
|
/** Minimum ms between restart attempts to avoid restart loops. */
|
||||||
|
restartCooldownMs: number;
|
||||||
|
};
|
||||||
|
|
||||||
|
export const initialState: SenseState = {
|
||||||
|
consecutiveFailures: 0,
|
||||||
|
lastRestartTs: 0,
|
||||||
|
restartCooldownMs: 300_000, // 5 minutes
|
||||||
|
};
|
||||||
|
|
||||||
function gatewayProbeUrl(): string {
|
function gatewayProbeUrl(): string {
|
||||||
const u =
|
const u =
|
||||||
process.env.HERMES_GATEWAY_HEALTH_URL ??
|
process.env.HERMES_GATEWAY_HEALTH_URL ??
|
||||||
@ -26,17 +42,13 @@ function truncateHttpError(err: unknown): string {
|
|||||||
return s.length > HTTP_ERROR_MAX_LEN ? s.slice(0, HTTP_ERROR_MAX_LEN) : s;
|
return s.length > HTTP_ERROR_MAX_LEN ? s.slice(0, HTTP_ERROR_MAX_LEN) : s;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface HttpProbeResult {
|
type HttpProbeResult = {
|
||||||
httpOk: number;
|
httpOk: number;
|
||||||
httpStatusCode: number;
|
httpStatusCode: number;
|
||||||
httpLatencyMs: number;
|
httpLatencyMs: number;
|
||||||
httpError: string;
|
httpError: string;
|
||||||
}
|
};
|
||||||
|
|
||||||
/**
|
|
||||||
* GET the gateway URL; success = HTTP 200–399.
|
|
||||||
* URL must be set via HERMES_GATEWAY_HEALTH_URL or NERVE_HERMES_GATEWAY_URL.
|
|
||||||
*/
|
|
||||||
async function probeGatewayHttp(url: string): Promise<HttpProbeResult> {
|
async function probeGatewayHttp(url: string): Promise<HttpProbeResult> {
|
||||||
if (!url) {
|
if (!url) {
|
||||||
return {
|
return {
|
||||||
@ -74,10 +86,6 @@ async function probeGatewayHttp(url: string): Promise<HttpProbeResult> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* When `ps` lacks `etimes` (wall-clock seconds since start), parse `etime`
|
|
||||||
* ([[dd-]hh:]mm:ss) into seconds. See ps(1) `etime` field description.
|
|
||||||
*/
|
|
||||||
function etimeToSeconds(etime: string): number {
|
function etimeToSeconds(etime: string): number {
|
||||||
let s = String(etime).trim();
|
let s = String(etime).trim();
|
||||||
if (!s) return 0;
|
if (!s) return 0;
|
||||||
@ -102,12 +110,12 @@ function etimeToSeconds(etime: string): number {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface ExecResult {
|
type ExecResult = {
|
||||||
exitCode: number;
|
exitCode: number;
|
||||||
errCode: string | undefined;
|
errCode: string | undefined;
|
||||||
stdout: string;
|
stdout: string;
|
||||||
stderr: string;
|
stderr: string;
|
||||||
}
|
};
|
||||||
|
|
||||||
function execFileUtf8(file: string, args: string[], opts: Record<string, unknown> = {}): Promise<ExecResult> {
|
function execFileUtf8(file: string, args: string[], opts: Record<string, unknown> = {}): Promise<ExecResult> {
|
||||||
return new Promise((resolve) => {
|
return new Promise((resolve) => {
|
||||||
@ -216,11 +224,11 @@ async function processExists(mainPid: number): Promise<boolean> {
|
|||||||
return r.stdout.trim().length > 0;
|
return r.stdout.trim().length > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface PsMetrics {
|
type PsMetrics = {
|
||||||
rssBytes: number;
|
rssBytes: number;
|
||||||
cpuPercent: number;
|
cpuPercent: number;
|
||||||
uptimeSec: number;
|
uptimeSec: number;
|
||||||
}
|
};
|
||||||
|
|
||||||
async function readPsMetrics(mainPid: number): Promise<PsMetrics> {
|
async function readPsMetrics(mainPid: number): Promise<PsMetrics> {
|
||||||
if (mainPid <= 0) {
|
if (mainPid <= 0) {
|
||||||
@ -265,61 +273,12 @@ async function readPsMetrics(mainPid: number): Promise<PsMetrics> {
|
|||||||
return { rssBytes, cpuPercent, uptimeSec };
|
return { rssBytes, cpuPercent, uptimeSec };
|
||||||
}
|
}
|
||||||
|
|
||||||
function parseActiveSessionsFromHermesStats(text: string): number {
|
export async function compute(prevState: SenseState) {
|
||||||
const src = String(text);
|
const now = Date.now();
|
||||||
const patterns = [
|
|
||||||
/^\s*Active\s+sessions?:\s*(\d+)/gim,
|
|
||||||
/^\s*active\s+sessions?:\s*(\d+)/gim,
|
|
||||||
/^\s*Total\s+sessions?:\s*(\d+)/gim,
|
|
||||||
];
|
|
||||||
for (const re of patterns) {
|
|
||||||
re.lastIndex = 0;
|
|
||||||
const m = re.exec(src);
|
|
||||||
if (m) {
|
|
||||||
const n = Math.trunc(Number.parseInt(m[1], 10));
|
|
||||||
return Number.isFinite(n) ? n : 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
async function readActiveSessions(): Promise<number> {
|
|
||||||
try {
|
|
||||||
const r = await execFileUtf8("hermes", ["sessions", "stats"]);
|
|
||||||
if (r.errCode === "ENOENT") return 0;
|
|
||||||
return parseActiveSessionsFromHermesStats(`${r.stdout}\n${r.stderr}`);
|
|
||||||
} catch {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async function countDirectChildren(mainPid: number): Promise<number> {
|
|
||||||
if (mainPid <= 0) return 0;
|
|
||||||
try {
|
|
||||||
const r = await execFileUtf8("ps", [
|
|
||||||
"--no-headers",
|
|
||||||
"-o",
|
|
||||||
"pid",
|
|
||||||
"--ppid",
|
|
||||||
String(mainPid),
|
|
||||||
]);
|
|
||||||
if (r.errCode === "ENOENT") return 0;
|
|
||||||
const lines = r.stdout
|
|
||||||
.split("\n")
|
|
||||||
.map((l) => l.trim())
|
|
||||||
.filter(Boolean);
|
|
||||||
return lines.length;
|
|
||||||
} catch {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function compute() {
|
|
||||||
const ts = Date.now();
|
|
||||||
|
|
||||||
|
// --- probe gateway ---
|
||||||
let mainPid = 0;
|
let mainPid = 0;
|
||||||
let systemdActiveRunning = false;
|
let systemdActiveRunning = false;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const st = await readSystemdState();
|
const st = await readSystemdState();
|
||||||
mainPid = st.mainPid;
|
mainPid = st.mainPid;
|
||||||
@ -354,22 +313,6 @@ export async function compute() {
|
|||||||
|
|
||||||
const alive = systemdActiveRunning && mainPid > 0 && psOk ? 1 : 0;
|
const alive = systemdActiveRunning && mainPid > 0 && psOk ? 1 : 0;
|
||||||
|
|
||||||
let activeSessions = 0;
|
|
||||||
try {
|
|
||||||
activeSessions = await readActiveSessions();
|
|
||||||
} catch {
|
|
||||||
activeSessions = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
let childProcessCount = 0;
|
|
||||||
if (alive && mainPid > 0) {
|
|
||||||
try {
|
|
||||||
childProcessCount = await countDirectChildren(mainPid);
|
|
||||||
} catch {
|
|
||||||
childProcessCount = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let httpOk = 0;
|
let httpOk = 0;
|
||||||
let httpStatusCode = 0;
|
let httpStatusCode = 0;
|
||||||
let httpLatencyMs = 0;
|
let httpLatencyMs = 0;
|
||||||
@ -387,22 +330,47 @@ export async function compute() {
|
|||||||
httpError = "probe_failed";
|
httpError = "probe_failed";
|
||||||
}
|
}
|
||||||
|
|
||||||
const storedMainPid = mainPid > 0 ? mainPid : 0;
|
// --- decide health ---
|
||||||
|
const healthy = alive === 1 && httpOk === 1;
|
||||||
|
|
||||||
const row = {
|
// --- state machine: track consecutive failures ---
|
||||||
ts,
|
const consecutiveFailures = healthy ? 0 : prevState.consecutiveFailures + 1;
|
||||||
|
const lastRestartTs = prevState.lastRestartTs;
|
||||||
|
const cooldown = prevState.restartCooldownMs;
|
||||||
|
const cooldownElapsed = now - lastRestartTs >= cooldown;
|
||||||
|
|
||||||
|
// --- trigger restart workflow? ---
|
||||||
|
const shouldRestart =
|
||||||
|
consecutiveFailures >= FAILURE_THRESHOLD && cooldownElapsed;
|
||||||
|
|
||||||
|
const nextState: SenseState = {
|
||||||
|
consecutiveFailures,
|
||||||
|
lastRestartTs: shouldRestart ? now : lastRestartTs,
|
||||||
|
restartCooldownMs: cooldown,
|
||||||
|
};
|
||||||
|
|
||||||
|
const signal = {
|
||||||
|
ts: now,
|
||||||
alive,
|
alive,
|
||||||
mainPid: storedMainPid,
|
mainPid: mainPid > 0 ? mainPid : 0,
|
||||||
rssBytes: alive ? rssBytes : 0,
|
rssBytes: alive ? rssBytes : 0,
|
||||||
cpuPercent: alive ? cpuPercent : 0,
|
cpuPercent: alive ? cpuPercent : 0,
|
||||||
uptimeSec: alive ? uptimeSec : 0,
|
uptimeSec: alive ? uptimeSec : 0,
|
||||||
activeSessions,
|
|
||||||
childProcessCount: alive ? childProcessCount : 0,
|
|
||||||
httpOk,
|
httpOk,
|
||||||
httpStatusCode,
|
httpStatusCode,
|
||||||
httpLatencyMs,
|
httpLatencyMs,
|
||||||
httpError,
|
httpError,
|
||||||
|
consecutiveFailures,
|
||||||
};
|
};
|
||||||
|
|
||||||
return { signal: row, workflow: null };
|
const workflow = shouldRestart
|
||||||
|
? {
|
||||||
|
name: "restart-gateway",
|
||||||
|
maxRounds: 3,
|
||||||
|
prompt: `Hermes gateway is down (${consecutiveFailures} consecutive failures). Last HTTP error: "${httpError}". systemd active+running: ${systemdActiveRunning}, process alive: ${psOk}. Restart the gateway and verify it comes back.`,
|
||||||
|
dryRun: false,
|
||||||
|
}
|
||||||
|
: null;
|
||||||
|
|
||||||
|
return { state: nextState, signal, workflow };
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,13 +0,0 @@
|
|||||||
-- Migration: 0001_init
|
|
||||||
-- Creates the hermes_session_message_stats table for hermes-session-message-stats sense.
|
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS hermes_session_message_stats (
|
|
||||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
||||||
ts INTEGER NOT NULL,
|
|
||||||
total_user_messages INTEGER NOT NULL,
|
|
||||||
total_assistant_messages INTEGER NOT NULL,
|
|
||||||
total_tool_messages INTEGER NOT NULL,
|
|
||||||
total_messages INTEGER NOT NULL,
|
|
||||||
active_sessions INTEGER NOT NULL,
|
|
||||||
measurement_window_seconds INTEGER NOT NULL
|
|
||||||
);
|
|
||||||
@ -1,117 +0,0 @@
|
|||||||
import { createReadStream } from "node:fs";
|
|
||||||
import { readdir } from "node:fs/promises";
|
|
||||||
import { homedir } from "node:os";
|
|
||||||
import { join } from "node:path";
|
|
||||||
import { createInterface } from "node:readline";
|
|
||||||
export { hermesSessionMessageStats as table } from "./schema.ts";
|
|
||||||
|
|
||||||
const MEASUREMENT_WINDOW_MS = 900_000;
|
|
||||||
const MEASUREMENT_WINDOW_SECONDS = 900;
|
|
||||||
|
|
||||||
interface MessageCounts {
|
|
||||||
user: number;
|
|
||||||
assistant: number;
|
|
||||||
tool: number;
|
|
||||||
fileHadActivity: boolean;
|
|
||||||
}
|
|
||||||
|
|
||||||
async function aggregateJsonlFile(filePath: string, cutoffMs: number, nowMs: number): Promise<MessageCounts> {
|
|
||||||
let user = 0;
|
|
||||||
let assistant = 0;
|
|
||||||
let tool = 0;
|
|
||||||
let fileHadActivity = false;
|
|
||||||
|
|
||||||
const input = createReadStream(filePath, { encoding: "utf8" });
|
|
||||||
const rl = createInterface({ input, crlfDelay: Infinity });
|
|
||||||
try {
|
|
||||||
for await (const line of rl) {
|
|
||||||
const trimmed = line.trim();
|
|
||||||
if (!trimmed) continue;
|
|
||||||
let obj: unknown;
|
|
||||||
try {
|
|
||||||
obj = JSON.parse(trimmed);
|
|
||||||
} catch {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
if (
|
|
||||||
typeof obj !== "object" || obj === null ||
|
|
||||||
typeof (obj as Record<string, unknown>).role !== "string" ||
|
|
||||||
typeof (obj as Record<string, unknown>).timestamp !== "string"
|
|
||||||
) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
const record = obj as { role: string; timestamp: string };
|
|
||||||
const t = Date.parse(record.timestamp);
|
|
||||||
if (!Number.isFinite(t) || t < cutoffMs || t > nowMs) continue;
|
|
||||||
|
|
||||||
const roleNorm = record.role.trim().toLowerCase();
|
|
||||||
if (roleNorm === "user") {
|
|
||||||
user++;
|
|
||||||
fileHadActivity = true;
|
|
||||||
} else if (roleNorm === "assistant") {
|
|
||||||
assistant++;
|
|
||||||
fileHadActivity = true;
|
|
||||||
} else if (roleNorm === "tool") {
|
|
||||||
tool++;
|
|
||||||
fileHadActivity = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
rl.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
return { user, assistant, tool, fileHadActivity };
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function compute() {
|
|
||||||
const nowMs = Date.now();
|
|
||||||
const cutoffMs = nowMs - MEASUREMENT_WINDOW_MS;
|
|
||||||
const ts = nowMs;
|
|
||||||
|
|
||||||
let totalUserMessages = 0;
|
|
||||||
let totalAssistantMessages = 0;
|
|
||||||
let totalToolMessages = 0;
|
|
||||||
let activeSessions = 0;
|
|
||||||
|
|
||||||
const sessionsDir = join(homedir(), ".hermes", "sessions");
|
|
||||||
let files: string[] = [];
|
|
||||||
try {
|
|
||||||
const entries = await readdir(sessionsDir, { withFileTypes: true });
|
|
||||||
files = entries
|
|
||||||
.filter((e) => e.isFile() && e.name.endsWith(".jsonl"))
|
|
||||||
.map((e) => join(sessionsDir, e.name));
|
|
||||||
} catch (err) {
|
|
||||||
if (err && typeof err === "object" && "code" in err && (err as NodeJS.ErrnoException).code === "ENOENT") {
|
|
||||||
files = [];
|
|
||||||
} else {
|
|
||||||
throw err;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (const filePath of files) {
|
|
||||||
const { user, assistant, tool, fileHadActivity } = await aggregateJsonlFile(
|
|
||||||
filePath,
|
|
||||||
cutoffMs,
|
|
||||||
nowMs,
|
|
||||||
);
|
|
||||||
totalUserMessages += user;
|
|
||||||
totalAssistantMessages += assistant;
|
|
||||||
totalToolMessages += tool;
|
|
||||||
if (fileHadActivity) activeSessions++;
|
|
||||||
}
|
|
||||||
|
|
||||||
const totalMessages =
|
|
||||||
totalUserMessages + totalAssistantMessages + totalToolMessages;
|
|
||||||
|
|
||||||
const row = {
|
|
||||||
ts,
|
|
||||||
totalUserMessages,
|
|
||||||
totalAssistantMessages,
|
|
||||||
totalToolMessages,
|
|
||||||
totalMessages,
|
|
||||||
activeSessions,
|
|
||||||
measurementWindowSeconds: MEASUREMENT_WINDOW_SECONDS,
|
|
||||||
};
|
|
||||||
|
|
||||||
return { signal: row, workflow: null };
|
|
||||||
}
|
|
||||||
@ -1,12 +0,0 @@
|
|||||||
import { integer, sqliteTable } from "drizzle-orm/sqlite-core";
|
|
||||||
|
|
||||||
export const hermesSessionMessageStats = sqliteTable("hermes_session_message_stats", {
|
|
||||||
id: integer("id").primaryKey({ autoIncrement: true }),
|
|
||||||
ts: integer("ts").notNull(),
|
|
||||||
totalUserMessages: integer("total_user_messages").notNull(),
|
|
||||||
totalAssistantMessages: integer("total_assistant_messages").notNull(),
|
|
||||||
totalToolMessages: integer("total_tool_messages").notNull(),
|
|
||||||
totalMessages: integer("total_messages").notNull(),
|
|
||||||
activeSessions: integer("active_sessions").notNull(),
|
|
||||||
measurementWindowSeconds: integer("measurement_window_seconds").notNull(),
|
|
||||||
});
|
|
||||||
@ -1,16 +0,0 @@
|
|||||||
-- Migration: 0001_init
|
|
||||||
-- Creates the snapshots table for linux-system-health sense.
|
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS snapshots (
|
|
||||||
ts INTEGER PRIMARY KEY,
|
|
||||||
cpu_load_1m REAL NOT NULL,
|
|
||||||
cpu_load_5m REAL NOT NULL,
|
|
||||||
cpu_load_15m REAL NOT NULL,
|
|
||||||
mem_total_mb INTEGER NOT NULL,
|
|
||||||
mem_used_mb INTEGER NOT NULL,
|
|
||||||
mem_used_pct REAL NOT NULL,
|
|
||||||
disk_total_gb REAL NOT NULL,
|
|
||||||
disk_used_gb REAL NOT NULL,
|
|
||||||
disk_used_pct REAL NOT NULL,
|
|
||||||
uptime_sec INTEGER NOT NULL
|
|
||||||
);
|
|
||||||
@ -1,6 +0,0 @@
|
|||||||
ALTER TABLE snapshots ADD COLUMN sockets_used INTEGER;
|
|
||||||
ALTER TABLE snapshots ADD COLUMN tcp_inuse INTEGER;
|
|
||||||
ALTER TABLE snapshots ADD COLUMN tcp_orphan INTEGER;
|
|
||||||
ALTER TABLE snapshots ADD COLUMN tcp_tw INTEGER;
|
|
||||||
ALTER TABLE snapshots ADD COLUMN tcp_alloc INTEGER;
|
|
||||||
ALTER TABLE snapshots ADD COLUMN tcp_mem_pages INTEGER;
|
|
||||||
@ -1,88 +0,0 @@
|
|||||||
import { loadavg, totalmem, freemem, uptime } from "node:os";
|
|
||||||
import { execSync } from "node:child_process";
|
|
||||||
import { readFile } from "node:fs/promises";
|
|
||||||
export { snapshots as table } from "./schema.ts";
|
|
||||||
|
|
||||||
const SOCKSTAT_PATH = "/proc/net/sockstat";
|
|
||||||
|
|
||||||
interface SockstatResult {
|
|
||||||
socketsUsed: number;
|
|
||||||
tcpInuse: number;
|
|
||||||
tcpOrphan: number;
|
|
||||||
tcpTw: number;
|
|
||||||
tcpAlloc: number;
|
|
||||||
tcpMemPages: number;
|
|
||||||
}
|
|
||||||
|
|
||||||
function parseSockstat(content: string): SockstatResult {
|
|
||||||
let socketsUsed = 0, tcpInuse = 0, tcpOrphan = 0, tcpTw = 0, tcpAlloc = 0, tcpMemPages = 0;
|
|
||||||
|
|
||||||
for (const line of content.split("\n")) {
|
|
||||||
const trimmed = line.trim();
|
|
||||||
if (trimmed.startsWith("sockets:")) {
|
|
||||||
const parts = trimmed.split(/\s+/);
|
|
||||||
const idx = parts.indexOf("used");
|
|
||||||
if (idx !== -1 && idx + 1 < parts.length) {
|
|
||||||
socketsUsed = Number.parseInt(parts[idx + 1], 10) || 0;
|
|
||||||
}
|
|
||||||
} else if (trimmed.startsWith("TCP:")) {
|
|
||||||
const parts = trimmed.split(/\s+/);
|
|
||||||
const map: Record<string, number> = {};
|
|
||||||
for (let i = 1; i + 1 < parts.length; i += 2) {
|
|
||||||
map[parts[i]] = Number.parseInt(parts[i + 1], 10) || 0;
|
|
||||||
}
|
|
||||||
tcpInuse = map.inuse ?? 0;
|
|
||||||
tcpOrphan = map.orphan ?? 0;
|
|
||||||
tcpTw = map.tw ?? 0;
|
|
||||||
tcpAlloc = map.alloc ?? 0;
|
|
||||||
tcpMemPages = map.mem ?? 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return { socketsUsed, tcpInuse, tcpOrphan, tcpTw, tcpAlloc, tcpMemPages };
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function compute() {
|
|
||||||
const [load1, load5, load15] = loadavg();
|
|
||||||
|
|
||||||
const memTotal = totalmem();
|
|
||||||
const memFree = freemem();
|
|
||||||
const memUsed = memTotal - memFree;
|
|
||||||
const memTotalMB = Math.round(memTotal / 1024 / 1024);
|
|
||||||
const memUsedMB = Math.round(memUsed / 1024 / 1024);
|
|
||||||
const memUsedPct = Math.round((memUsed / memTotal) * 10000) / 100;
|
|
||||||
|
|
||||||
let diskTotalGB = 0, diskUsedGB = 0, diskUsedPct = 0;
|
|
||||||
try {
|
|
||||||
const df = execSync("df -B1 / | tail -1", { encoding: "utf-8" }).trim();
|
|
||||||
const parts = df.split(/\s+/);
|
|
||||||
const total = Number(parts[1]);
|
|
||||||
const used = Number(parts[2]);
|
|
||||||
diskTotalGB = Math.round(total / 1024 / 1024 / 1024 * 100) / 100;
|
|
||||||
diskUsedGB = Math.round(used / 1024 / 1024 / 1024 * 100) / 100;
|
|
||||||
diskUsedPct = total > 0 ? Math.round((used / total) * 10000) / 100 : 0;
|
|
||||||
} catch {}
|
|
||||||
|
|
||||||
let tcp: SockstatResult = { socketsUsed: 0, tcpInuse: 0, tcpOrphan: 0, tcpTw: 0, tcpAlloc: 0, tcpMemPages: 0 };
|
|
||||||
try {
|
|
||||||
const content = await readFile(SOCKSTAT_PATH, "utf8");
|
|
||||||
tcp = parseSockstat(content);
|
|
||||||
} catch {}
|
|
||||||
|
|
||||||
const ts = Date.now();
|
|
||||||
const uptimeSec = Math.round(uptime());
|
|
||||||
|
|
||||||
const data = {
|
|
||||||
ts, cpuLoad1m: load1, cpuLoad5m: load5, cpuLoad15m: load15,
|
|
||||||
memTotalMB, memUsedMB, memUsedPct,
|
|
||||||
diskTotalGB, diskUsedGB, diskUsedPct,
|
|
||||||
uptimeSec,
|
|
||||||
socketsUsed: tcp.socketsUsed,
|
|
||||||
tcpInuse: tcp.tcpInuse,
|
|
||||||
tcpOrphan: tcp.tcpOrphan,
|
|
||||||
tcpTw: tcp.tcpTw,
|
|
||||||
tcpAlloc: tcp.tcpAlloc,
|
|
||||||
tcpMemPages: tcp.tcpMemPages,
|
|
||||||
};
|
|
||||||
return { signal: data, workflow: null };
|
|
||||||
}
|
|
||||||
@ -1,22 +0,0 @@
|
|||||||
import { integer, real, sqliteTable, text } from "drizzle-orm/sqlite-core";
|
|
||||||
|
|
||||||
export const snapshots = sqliteTable("snapshots", {
|
|
||||||
ts: integer("ts").primaryKey(),
|
|
||||||
cpuLoad1m: real("cpu_load_1m").notNull(),
|
|
||||||
cpuLoad5m: real("cpu_load_5m").notNull(),
|
|
||||||
cpuLoad15m: real("cpu_load_15m").notNull(),
|
|
||||||
memTotalMB: integer("mem_total_mb").notNull(),
|
|
||||||
memUsedMB: integer("mem_used_mb").notNull(),
|
|
||||||
memUsedPct: real("mem_used_pct").notNull(),
|
|
||||||
diskTotalGB: real("disk_total_gb").notNull(),
|
|
||||||
diskUsedGB: real("disk_used_gb").notNull(),
|
|
||||||
diskUsedPct: real("disk_used_pct").notNull(),
|
|
||||||
uptimeSec: integer("uptime_sec").notNull(),
|
|
||||||
// TCP socket stats (merged from linux-tcp-socket-stats)
|
|
||||||
socketsUsed: integer("sockets_used"),
|
|
||||||
tcpInuse: integer("tcp_inuse"),
|
|
||||||
tcpOrphan: integer("tcp_orphan"),
|
|
||||||
tcpTw: integer("tcp_tw"),
|
|
||||||
tcpAlloc: integer("tcp_alloc"),
|
|
||||||
tcpMemPages: integer("tcp_mem_pages"),
|
|
||||||
});
|
|
||||||
@ -1,11 +0,0 @@
|
|||||||
-- Migration: 0001_init
|
|
||||||
-- Creates the worker_process_metrics table for worker-process-metrics sense.
|
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS worker_process_metrics (
|
|
||||||
ts INTEGER PRIMARY KEY,
|
|
||||||
pid INTEGER NOT NULL,
|
|
||||||
uptime_sec REAL NOT NULL,
|
|
||||||
heap_used_mb REAL NOT NULL,
|
|
||||||
rss_mb REAL NOT NULL,
|
|
||||||
external_mb REAL NOT NULL
|
|
||||||
);
|
|
||||||
@ -1,26 +0,0 @@
|
|||||||
export { workerProcessMetrics as table } from "./schema.ts";
|
|
||||||
|
|
||||||
function round2(n: number): number {
|
|
||||||
return Math.round(n * 100) / 100;
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function compute() {
|
|
||||||
const ts = Date.now();
|
|
||||||
const pid = process.pid;
|
|
||||||
const uptimeSec = process.uptime();
|
|
||||||
const m = process.memoryUsage();
|
|
||||||
const heapUsedMB = round2(m.heapUsed / 1024 / 1024);
|
|
||||||
const rssMB = round2(m.rss / 1024 / 1024);
|
|
||||||
const externalMB = round2(m.external / 1024 / 1024);
|
|
||||||
|
|
||||||
const row = {
|
|
||||||
ts,
|
|
||||||
pid,
|
|
||||||
uptimeSec,
|
|
||||||
heapUsedMB,
|
|
||||||
rssMB,
|
|
||||||
externalMB,
|
|
||||||
};
|
|
||||||
|
|
||||||
return { signal: row, workflow: null };
|
|
||||||
}
|
|
||||||
@ -1,10 +0,0 @@
|
|||||||
import { integer, real, sqliteTable } from "drizzle-orm/sqlite-core";
|
|
||||||
|
|
||||||
export const workerProcessMetrics = sqliteTable("worker_process_metrics", {
|
|
||||||
ts: integer("ts").primaryKey(),
|
|
||||||
pid: integer("pid").notNull(),
|
|
||||||
uptimeSec: real("uptime_sec").notNull(),
|
|
||||||
heapUsedMB: real("heap_used_mb").notNull(),
|
|
||||||
rssMB: real("rss_mb").notNull(),
|
|
||||||
externalMB: real("external_mb").notNull(),
|
|
||||||
});
|
|
||||||
109
workflows/restart-gateway/src/index.ts
Normal file
109
workflows/restart-gateway/src/index.ts
Normal file
@ -0,0 +1,109 @@
|
|||||||
|
import { execFile } from "node:child_process";
|
||||||
|
import type { RoleResult, ThreadContext, WorkflowDefinition } from "@uncaged/nerve-core";
|
||||||
|
import { END } from "@uncaged/nerve-core";
|
||||||
|
|
||||||
|
const EXEC_TIMEOUT_MS = 30_000;
|
||||||
|
const VERIFY_DELAY_MS = 5_000;
|
||||||
|
|
||||||
|
type ExecResult = {
|
||||||
|
exitCode: number;
|
||||||
|
stdout: string;
|
||||||
|
stderr: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
function exec(file: string, args: string[]): Promise<ExecResult> {
|
||||||
|
return new Promise((resolve) => {
|
||||||
|
execFile(
|
||||||
|
file,
|
||||||
|
args,
|
||||||
|
{
|
||||||
|
encoding: "utf8",
|
||||||
|
timeout: EXEC_TIMEOUT_MS,
|
||||||
|
maxBuffer: 4 * 1024 * 1024,
|
||||||
|
} as Parameters<typeof execFile>[2],
|
||||||
|
(err, stdout, stderr) => {
|
||||||
|
const exitCode =
|
||||||
|
err && typeof (err as NodeJS.ErrnoException).status === "number"
|
||||||
|
? (err as NodeJS.ErrnoException & { status: number }).status
|
||||||
|
: err ? -1 : 0;
|
||||||
|
resolve({
|
||||||
|
exitCode,
|
||||||
|
stdout: String(stdout ?? ""),
|
||||||
|
stderr: String(stderr ?? ""),
|
||||||
|
});
|
||||||
|
},
|
||||||
|
);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function sleep(ms: number): Promise<void> {
|
||||||
|
return new Promise((r) => setTimeout(r, ms));
|
||||||
|
}
|
||||||
|
|
||||||
|
type RestartMeta = {
|
||||||
|
action: string;
|
||||||
|
exitCode: number;
|
||||||
|
output: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
type VerifyMeta = {
|
||||||
|
alive: boolean;
|
||||||
|
activeState: string;
|
||||||
|
subState: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
async function restarter(_ctx: ThreadContext): Promise<RoleResult<RestartMeta>> {
|
||||||
|
const r = await exec("systemctl", ["--user", "restart", "hermes-gateway"]);
|
||||||
|
return {
|
||||||
|
content: r.exitCode === 0
|
||||||
|
? "Gateway restart command succeeded."
|
||||||
|
: `Gateway restart failed (exit ${r.exitCode}): ${r.stderr.trim()}`,
|
||||||
|
meta: {
|
||||||
|
action: "systemctl --user restart hermes-gateway",
|
||||||
|
exitCode: r.exitCode,
|
||||||
|
output: `${r.stdout}\n${r.stderr}`.trim().slice(0, 500),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
async function verifier(_ctx: ThreadContext): Promise<RoleResult<VerifyMeta>> {
|
||||||
|
// Wait a few seconds for the service to come up
|
||||||
|
await sleep(VERIFY_DELAY_MS);
|
||||||
|
|
||||||
|
const r = await exec("systemctl", [
|
||||||
|
"--user",
|
||||||
|
"--no-pager",
|
||||||
|
"show",
|
||||||
|
"hermes-gateway",
|
||||||
|
"-p", "ActiveState",
|
||||||
|
"-p", "SubState",
|
||||||
|
]);
|
||||||
|
|
||||||
|
let activeState = "unknown";
|
||||||
|
let subState = "unknown";
|
||||||
|
for (const line of r.stdout.split("\n")) {
|
||||||
|
const t = line.trim();
|
||||||
|
if (t.startsWith("ActiveState=")) activeState = t.slice("ActiveState=".length);
|
||||||
|
if (t.startsWith("SubState=")) subState = t.slice("SubState=".length);
|
||||||
|
}
|
||||||
|
|
||||||
|
const alive = activeState === "active" && subState === "running";
|
||||||
|
|
||||||
|
return {
|
||||||
|
content: alive
|
||||||
|
? `Gateway recovered: ${activeState} (${subState}).`
|
||||||
|
: `Gateway still down: ${activeState} (${subState}). May need manual intervention.`,
|
||||||
|
meta: { alive, activeState, subState },
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
export const workflow: WorkflowDefinition<Record<"restarter", RestartMeta> & Record<"verifier", VerifyMeta>> = {
|
||||||
|
name: "restart-gateway",
|
||||||
|
roles: { restarter, verifier },
|
||||||
|
moderator(ctx) {
|
||||||
|
// Round 0: restart. Round 1: verify. Done.
|
||||||
|
if (ctx.steps.length === 0) return "restarter";
|
||||||
|
if (ctx.steps.length === 1) return "verifier";
|
||||||
|
return END;
|
||||||
|
},
|
||||||
|
};
|
||||||
Loading…
x
Reference in New Issue
Block a user