style: apply biome formatting after merge

This commit is contained in:
2026-04-14 11:35:51 +00:00
parent b7d7ae9774
commit 7720202a68
12 changed files with 1137 additions and 14 deletions
+5
View File
@@ -359,7 +359,12 @@ export { adaptiveInterval, clampTick, dedup, errorBackoff } from './rules.js';
export {
startWatcher,
type VitalWithData,
type WakeCondition,
type WatcherDef,
type WatcherHandle,
} from './watcher.js';
// ── P0 Watchers ─────────────────────────────────────────────────
export * from './watchers/index.js';
+39 -7
View File
@@ -6,6 +6,7 @@ import { afterEach, beforeEach, describe, expect, test, vi } from 'vitest';
import type { PulseStore, VitalRecord } from './store.js';
import {
startWatcher,
type VitalWithData,
type WatcherDef,
type WatcherHandle,
} from './watcher.js';
@@ -13,14 +14,22 @@ import {
// Mock store implementation
const createMockStore = (): PulseStore => ({
putObject: vi.fn().mockReturnValue('mock-hash'),
getObject: vi.fn().mockReturnValue(null),
appendVital: vi.fn(),
appendVitals: vi.fn(),
getVitalHistory: vi.fn().mockReturnValue([]),
// Add other required methods as no-ops
getLatestVital: vi.fn(),
archiveVitals: vi.fn(),
downsampleVitals: vi.fn(),
appendEvent: vi.fn(),
getObjectByHash: vi.fn(),
rebuildSnapshot: vi.fn(),
appendEvents: vi.fn(),
getLatest: vi.fn(),
getLatestWhere: vi.fn(),
getRecent: vi.fn(),
queryByKind: vi.fn(),
getAfter: vi.fn(),
hasEvents: vi.fn(),
close: vi.fn(),
gc: vi.fn(),
});
describe('watcher', () => {
@@ -193,11 +202,34 @@ describe('watcher', () => {
const mockShouldWake = vi.fn().mockReturnValue(false);
const mockVitals: VitalRecord[] = [
{ occurredAt: Date.now(), key: 'test-key', hash: 'hash-1' },
{ occurredAt: Date.now() - 5000, key: 'test-key', hash: 'hash-2' },
{
id: 'id-1',
occurredAt: Date.now(),
key: 'history-key',
hash: 'hash-1',
},
{
id: 'id-2',
occurredAt: Date.now() - 5000,
key: 'history-key',
hash: 'hash-2',
},
];
const mockObjectsByHash: Record<string, unknown> = {
'hash-1': { value: 'data-for-hash-1' },
'hash-2': { value: 'data-for-hash-2' },
};
(mockStore.getVitalHistory as any).mockReturnValue(mockVitals);
(mockStore.getObject as any).mockImplementation(
(hash: string) => mockObjectsByHash[hash] ?? null,
);
const expectedWindow: VitalWithData[] = mockVitals.map((v) => ({
...v,
data: v.hash ? (mockObjectsByHash[v.hash] ?? null) : null,
}));
const def: WatcherDef = {
name: 'history-watcher',
@@ -211,6 +243,6 @@ describe('watcher', () => {
vi.advanceTimersByTime(5000);
expect(mockStore.getVitalHistory).toHaveBeenCalledWith('history-key', 12);
expect(mockShouldWake).toHaveBeenCalledWith(mockVitals);
expect(mockShouldWake).toHaveBeenCalledWith(expectedWindow);
});
});
+24 -7
View File
@@ -10,15 +10,27 @@ import type { PulseStore, VitalRecord } from './store.js';
// ── Types ──────────────────────────────────────────────────────
/**
* Predicate evaluated against a recent window of vital records.
* A vital record with the resolved data payload attached.
* Passed to `shouldWake` so conditions can operate on typed data
* without re-fetching from the object store.
*/
export interface VitalWithData<T = unknown> extends VitalRecord {
data: T;
}
/**
* Predicate evaluated against a recent window of vital records
* with their resolved data payloads.
* Return `true` to fire the wake callback.
*/
export type WakeCondition = (window: VitalRecord[]) => boolean;
export type WakeCondition<T = unknown> = (
window: VitalWithData<T>[],
) => boolean;
/**
* Definition of a watcher: what to collect, how often, and when to wake.
*/
export interface WatcherDef {
export interface WatcherDef<T = unknown> {
/** Human-readable name, used in logs and the returned handle. */
name: string;
@@ -29,13 +41,14 @@ export interface WatcherDef {
* Async function that collects a snapshot of data.
* The returned value is persisted via CAS and referenced in the vital record.
*/
collect: () => Promise<unknown>;
collect: () => Promise<T>;
/**
* Evaluated after each collection against the last 12 vital records.
* Evaluated after each collection against the last 12 vital records
* with their resolved data payloads.
* When it returns `true`, `wakeTick` is invoked.
*/
shouldWake: WakeCondition;
shouldWake: WakeCondition<T>;
/**
* Collection interval in milliseconds.
@@ -99,8 +112,12 @@ export function startWatcher(
});
const window = store.getVitalHistory(def.key, 12);
const resolvedWindow: VitalWithData<any>[] = window.map((v) => ({
...v,
data: v.hash ? store.getObject(v.hash) : null,
}));
if (def.shouldWake(window)) {
if (def.shouldWake(resolvedWindow)) {
wakeTick();
}
} catch (err) {
@@ -0,0 +1,173 @@
import { afterEach, beforeEach, describe, expect, it } from 'bun:test';
import * as fs from 'node:fs';
import * as os from 'node:os';
import * as path from 'node:path';
import type { VitalWithData } from '../watcher.js';
import type { ErrorLogData } from './error-log.js';
import { errorLogWatcher } from './error-log.js';
// Use real temp files for error-log tests since the watcher does low-level
// fd reads that are hard to mock correctly with Buffer.alloc/readSync.
let tmpDir: string;
function tmpFile(name: string): string {
return path.join(tmpDir, name);
}
describe('errorLogWatcher', () => {
beforeEach(() => {
tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), 'pulse-errlog-test-'));
});
afterEach(() => {
fs.rmSync(tmpDir, { recursive: true, force: true });
});
it('should create a watcher with correct configuration', () => {
const w = errorLogWatcher({ logFiles: ['/tmp/a.log'] });
expect(w.name).toBe('error-log');
expect(w.key).toBe('errorlog');
expect(w.intervalMs).toBe(5000);
});
it('should detect default keywords in log file', async () => {
const f = tmpFile('app.log');
fs.writeFileSync(f, 'INFO started\npanic: something broke\nINFO ok\n');
const w = errorLogWatcher({ logFiles: [f] });
const d = await w.collect();
expect(d.matches.length).toBe(1);
expect(d.matches[0]).toContain('panic');
expect(d.source).toBe(f);
});
it('should detect custom keywords', async () => {
const f = tmpFile('app.log');
fs.writeFileSync(f, 'INFO started\ncustom-error detected\nall good\n');
const w = errorLogWatcher({ logFiles: [f], keywords: ['custom-error'] });
const d = await w.collect();
expect(d.matches.length).toBe(1);
expect(d.matches[0]).toContain('custom-error');
});
it('should handle non-existent files gracefully', async () => {
const w = errorLogWatcher({ logFiles: ['/nonexistent/file.log'] });
const d = await w.collect();
expect(d.matches).toEqual([]);
});
it('should only read new content on subsequent collections', async () => {
const f = tmpFile('app.log');
fs.writeFileSync(f, 'INFO started ok\n');
const w = errorLogWatcher({ logFiles: [f] });
// First collect – no errors
let d = await w.collect();
expect(d.matches).toEqual([]);
// Append an error
fs.appendFileSync(f, 'fatal crash happened\n');
// Second collect – should find the new error
d = await w.collect();
expect(d.matches.length).toBe(1);
expect(d.matches[0]).toContain('fatal');
});
it('should handle file truncation / rotation', async () => {
const f = tmpFile('app.log');
fs.writeFileSync(f, 'AAAAAAAAAAAAAAAAA lots of old content\n');
const w = errorLogWatcher({ logFiles: [f] });
await w.collect(); // establish position
// Simulate log rotation – file is now smaller
fs.writeFileSync(f, 'OOM killer\n');
const d = await w.collect();
expect(d.matches.length).toBe(1);
expect(d.matches[0]).toContain('OOM');
});
it('should match keywords case-insensitively', async () => {
const f = tmpFile('app.log');
fs.writeFileSync(f, 'FATAL: System failure\nPANIC in kernel\nINFO ok\n');
const w = errorLogWatcher({ logFiles: [f], keywords: ['fatal', 'panic'] });
const d = await w.collect();
expect(d.matches.length).toBe(2);
expect(d.matches[0]).toContain('FATAL');
expect(d.matches[1]).toContain('PANIC');
});
it('should only match each line once even with multiple keyword hits', async () => {
const f = tmpFile('app.log');
fs.writeFileSync(f, 'fatal panic error\n');
const w = errorLogWatcher({ logFiles: [f], keywords: ['fatal', 'panic'] });
const d = await w.collect();
// Line matches 'fatal' first, then breaks – so only 1 entry
expect(d.matches.length).toBe(1);
expect(d.matches[0]).toBe('fatal panic error');
});
it('should scan multiple log files', async () => {
const f1 = tmpFile('a.log');
const f2 = tmpFile('b.log');
fs.writeFileSync(f1, 'panic in a\n');
fs.writeFileSync(f2, 'fatal in b\n');
const w = errorLogWatcher({ logFiles: [f1, f2] });
const d = await w.collect();
expect(d.matches.length).toBe(2);
});
// ── shouldWake tests ──
it('should wake when error keywords are found', () => {
const w = errorLogWatcher({ logFiles: [] });
const window: VitalWithData<ErrorLogData>[] = [
{
occurredAt: Date.now(),
key: 'errorlog',
data: { matches: ['fatal crash'], source: '/var/log/app.log' },
},
];
expect(w.shouldWake(window)).toBe(true);
});
it('should not wake when no errors found', () => {
const w = errorLogWatcher({ logFiles: [] });
const window: VitalWithData<ErrorLogData>[] = [
{
occurredAt: Date.now(),
key: 'errorlog',
data: { matches: [], source: '/var/log/app.log' },
},
];
expect(w.shouldWake(window)).toBe(false);
});
it('should not wake with empty window', () => {
expect(errorLogWatcher({ logFiles: [] }).shouldWake([])).toBe(false);
});
it('should not wake with null data', () => {
const w = errorLogWatcher({ logFiles: [] });
const window: VitalWithData<ErrorLogData>[] = [
{
occurredAt: Date.now(),
key: 'errorlog',
data: null as any,
},
];
expect(w.shouldWake(window)).toBe(false);
});
});
+121
View File
@@ -0,0 +1,121 @@
import * as fs from 'node:fs';
import type { WatcherDef } from '../watcher.js';
export interface ErrorLogData {
matches: string[]; // 匹配到的行
source: string; // 日志文件路径
}
export interface ErrorLogOptions {
/** 要监控的日志文件路径列表 */
logFiles: string[];
/** 触发唤醒的关键词 */
keywords?: string[]; // 默认 ['panic', 'fatal', 'OOM', 'SIGKILL', 'unhandled rejection']
}
export function errorLogWatcher(
opts: ErrorLogOptions,
): WatcherDef<ErrorLogData> {
const keywords = opts.keywords ?? [
'panic',
'fatal',
'OOM',
'SIGKILL',
'unhandled rejection',
];
// 为每个文件保存最后读取的位置
const filePositions = new Map<string, number>();
return {
name: 'error-log',
key: 'errorlog',
intervalMs: 5000,
collect: async () => {
const allMatches: string[] = [];
let sourceFile = '';
for (const logFile of opts.logFiles) {
try {
// 检查文件是否存在
if (!fs.existsSync(logFile)) {
continue;
}
const stats = fs.statSync(logFile);
const currentSize = stats.size;
let lastPosition = filePositions.get(logFile) ?? 0;
// 如果文件被轮转或截断,重置位置
if (currentSize < lastPosition) {
lastPosition = 0;
filePositions.set(logFile, 0);
}
// 只读取新增的部分
if (currentSize > lastPosition) {
const fd = fs.openSync(logFile, 'r');
try {
const bufferSize = currentSize - lastPosition;
const buffer = Buffer.alloc(bufferSize);
const bytesRead = fs.readSync(
fd,
buffer,
0,
bufferSize,
lastPosition,
);
const newContent = buffer.subarray(0, bytesRead).toString('utf8');
const lines = newContent
.split('\n')
.filter((line) => line.trim());
// 在新行中搜索关键词
for (const line of lines) {
const lowerLine = line.toLowerCase();
for (const keyword of keywords) {
if (lowerLine.includes(keyword.toLowerCase())) {
allMatches.push(line);
sourceFile = logFile;
break; // 每行只匹配一次
}
}
}
// 更新文件位置
filePositions.set(logFile, currentSize);
} finally {
fs.closeSync(fd);
}
}
} catch (error) {
console.error(`[error-log] Failed to read ${logFile}:`, error);
// 读取失败时不更新位置,下次重试
}
}
return {
matches: allMatches,
source: sourceFile || opts.logFiles[0] || '',
};
},
shouldWake: (window) => {
if (window.length === 0) return false;
const latest = window[0];
if (!latest?.data) return false;
// 如果有任何匹配的错误关键词,立即唤醒
if (latest.data.matches.length > 0) {
console.warn(
`[error-log] Found ${latest.data.matches.length} error patterns:`,
latest.data.matches.slice(0, 3),
); // 只显示前3个匹配
return true;
}
return false;
},
};
}
+20
View File
@@ -0,0 +1,20 @@
export {
type ErrorLogData,
type ErrorLogOptions,
errorLogWatcher,
} from './error-log.js';
export {
type NetworkData,
type NetworkOptions,
networkWatcher,
} from './network.js';
export {
type ProcessAliveData,
type ProcessAliveOptions,
processAliveWatcher,
} from './process-alive.js';
export {
type SystemResourceData,
type SystemResourceOptions,
systemResourceWatcher,
} from './system-resource.js';
+137
View File
@@ -0,0 +1,137 @@
import { beforeEach, describe, expect, it, mock, spyOn } from 'bun:test';
import type { VitalWithData } from '../watcher.js';
import type { NetworkData } from './network.js';
// We must mock dns.promises.resolve and global.fetch BEFORE importing the watcher.
// Bun's mock.module rewrites the import so the production code picks up our fakes.
const mockResolve = mock(() => Promise.resolve(['1.1.1.1']));
mock.module('node:dns', () => ({
default: { promises: { resolve: mockResolve } },
promises: { resolve: mockResolve },
}));
// Save original fetch so we can restore after each test
const origFetch = globalThis.fetch;
let mockFetch: ReturnType<typeof mock>;
// Dynamic import so the module sees the mocked dns
const { networkWatcher } = await import('./network.js');
describe('networkWatcher', () => {
beforeEach(() => {
mockResolve.mockClear();
mockResolve.mockResolvedValue(['1.1.1.1']);
mockFetch = mock(() =>
Promise.resolve(new Response('ok', { status: 200 })),
);
globalThis.fetch = mockFetch as any;
});
it('should create a watcher with correct default configuration', () => {
const w = networkWatcher();
expect(w.name).toBe('network-connectivity');
expect(w.key).toBe('network');
expect(w.intervalMs).toBe(5000);
});
it('should collect network data when both DNS and HTTP work', async () => {
const w = networkWatcher();
const d = await w.collect();
expect(d.dnsOk).toBe(true);
expect(d.httpOk).toBe(true);
expect(d.latencyMs).toBeGreaterThanOrEqual(0);
});
it('should handle DNS failure', async () => {
mockResolve.mockRejectedValue(new Error('ENOTFOUND'));
const d = await networkWatcher().collect();
expect(d.dnsOk).toBe(false);
expect(d.httpOk).toBe(true);
});
it('should handle HTTP failure', async () => {
globalThis.fetch = mock(() =>
Promise.reject(new Error('fetch failed')),
) as any;
const d = await networkWatcher().collect();
expect(d.dnsOk).toBe(true);
expect(d.httpOk).toBe(false);
});
it('should handle both DNS and HTTP failure', async () => {
mockResolve.mockRejectedValue(new Error('ENOTFOUND'));
globalThis.fetch = mock(() =>
Promise.reject(new Error('fetch failed')),
) as any;
const d = await networkWatcher().collect();
expect(d.dnsOk).toBe(false);
expect(d.httpOk).toBe(false);
});
it('should consider 3xx as successful', async () => {
globalThis.fetch = mock(() =>
Promise.resolve(new Response(null, { status: 301 })),
) as any;
const d = await networkWatcher().collect();
expect(d.httpOk).toBe(true);
});
it('should consider 4xx/5xx as failures', async () => {
globalThis.fetch = mock(() =>
Promise.resolve(new Response(null, { status: 500 })),
) as any;
const d = await networkWatcher().collect();
expect(d.httpOk).toBe(false);
});
// ── shouldWake tests ──
const mkEntry = (
dnsOk: boolean,
httpOk: boolean,
age = 0,
): VitalWithData<NetworkData> => ({
occurredAt: Date.now() - age,
key: 'network',
data: { dnsOk, httpOk, latencyMs: 100 },
});
it('should wake on 2 consecutive total failures', () => {
const w = networkWatcher();
expect(w.shouldWake([mkEntry(false, false), mkEntry(false, false)])).toBe(
true,
);
});
it('should not wake when failure is not consecutive', () => {
const w = networkWatcher();
expect(w.shouldWake([mkEntry(false, false), mkEntry(true, true)])).toBe(
false,
);
});
it('should not wake when at least one method works', () => {
const w = networkWatcher();
expect(w.shouldWake([mkEntry(true, false), mkEntry(false, true)])).toBe(
false,
);
});
it('should not wake with only 1 data point', () => {
const w = networkWatcher();
expect(w.shouldWake([mkEntry(false, false)])).toBe(false);
});
it('should treat null data as failure', () => {
const w = networkWatcher();
const bad: VitalWithData<NetworkData> = {
occurredAt: Date.now(),
key: 'network',
data: null as any,
};
expect(w.shouldWake([bad, mkEntry(false, false)])).toBe(true);
});
});
+70
View File
@@ -0,0 +1,70 @@
import * as dns from 'node:dns';
import type { WatcherDef } from '../watcher.js';
export interface NetworkData {
dnsOk: boolean;
httpOk: boolean;
latencyMs: number;
}
export interface NetworkOptions {
dnsHost?: string; // 默认 'cloudflare.com'
httpUrl?: string; // 默认 'https://1.1.1.1/cdn-cgi/trace'
timeoutMs?: number; // 默认 5000
}
export function networkWatcher(opts?: NetworkOptions): WatcherDef<NetworkData> {
const dnsHost = opts?.dnsHost ?? 'cloudflare.com';
const httpUrl = opts?.httpUrl ?? 'https://1.1.1.1/cdn-cgi/trace';
const timeoutMs = opts?.timeoutMs ?? 5000;
return {
name: 'network-connectivity',
key: 'network',
intervalMs: 5000,
collect: async () => {
const startTime = Date.now();
let dnsOk = false;
let httpOk = false;
// DNS 解析测试
try {
await Promise.race([
dns.promises.resolve(dnsHost),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('DNS timeout')), timeoutMs),
),
]);
dnsOk = true;
} catch {
dnsOk = false;
}
// HTTP 连接测试
try {
const controller = new AbortController();
const tid = setTimeout(() => controller.abort(), timeoutMs);
const response = await fetch(httpUrl, {
method: 'GET',
signal: controller.signal,
redirect: 'manual',
});
clearTimeout(tid);
httpOk = response.status >= 200 && response.status < 400;
} catch {
httpOk = false;
}
return { dnsOk, httpOk, latencyMs: Date.now() - startTime };
},
shouldWake: (window) => {
if (window.length < 2) return false;
// 连续 2 次 DNS 和 HTTP 全部失败 → 唤醒
const recentFailures = window.slice(0, 2).filter((v) => {
if (!v?.data) return true;
return !v.data.dnsOk && !v.data.httpOk;
});
return recentFailures.length >= 2;
},
};
}
@@ -0,0 +1,168 @@
import { afterEach, beforeEach, describe, expect, it, mock } from 'bun:test';
import type { VitalWithData } from '../watcher.js';
import type { ProcessAliveData } from './process-alive.js';
import { processAliveWatcher } from './process-alive.js';
// Mock execSync
const mockExecSync = mock(
() => 'user 1234 openclaw-gateway\\nuser 5678 litellm-server\\n',
);
mock.module('node:child_process', () => ({
execSync: mockExecSync,
}));
describe('processAliveWatcher', () => {
const testProcesses = {
'oc-gateway': 'openclaw-gateway',
litellm: 'litellm',
};
beforeEach(() => {
mockExecSync.mockClear();
mockExecSync.mockReturnValue(
'user 1234 openclaw-gateway\\nuser 5678 litellm-server\\n',
);
});
it('should create a watcher with correct configuration', () => {
const watcher = processAliveWatcher({ processes: testProcesses });
expect(watcher.name).toBe('process-alive');
expect(watcher.key).toBe('processes');
expect(watcher.intervalMs).toBe(5000);
expect(typeof watcher.collect).toBe('function');
expect(typeof watcher.shouldWake).toBe('function');
});
it('should collect process status correctly when all processes are running', async () => {
const watcher = processAliveWatcher({ processes: testProcesses });
const data = await watcher.collect();
expect(data.processes).toHaveProperty('oc-gateway');
expect(data.processes).toHaveProperty('litellm');
expect(data.processes['oc-gateway']).toBe(true);
expect(data.processes['litellm']).toBe(true);
});
it('should detect when a process is not running', async () => {
mockExecSync.mockReturnValue('user 1234 openclaw-gateway\\n'); // litellm missing
const watcher = processAliveWatcher({ processes: testProcesses });
const data = await watcher.collect();
expect(data.processes['oc-gateway']).toBe(true);
expect(data.processes['litellm']).toBe(false);
});
it('should handle ps command failure gracefully', async () => {
mockExecSync.mockImplementation(() => {
throw new Error('ps command failed');
});
const watcher = processAliveWatcher({ processes: testProcesses });
const data = await watcher.collect();
// All processes should be marked as not running when ps fails
expect(data.processes['oc-gateway']).toBe(false);
expect(data.processes['litellm']).toBe(false);
});
it('should wake when any process is not running', () => {
const watcher = processAliveWatcher({ processes: testProcesses });
const window: VitalWithData<ProcessAliveData>[] = [
{
occurredAt: Date.now(),
key: 'processes',
data: {
processes: {
'oc-gateway': true,
litellm: false, // Process is down
},
},
},
];
expect(watcher.shouldWake(window)).toBe(true);
});
it('should wake when a process transitions from running to stopped', () => {
const watcher = processAliveWatcher({ processes: testProcesses });
const baseTime = Date.now();
const window: VitalWithData<ProcessAliveData>[] = [
{
occurredAt: baseTime,
key: 'processes',
data: {
processes: {
'oc-gateway': true,
litellm: false, // Process stopped
},
},
},
{
occurredAt: baseTime - 5000,
key: 'processes',
data: {
processes: {
'oc-gateway': true,
litellm: true, // Process was running before
},
},
},
];
expect(watcher.shouldWake(window)).toBe(true);
});
it('should not wake when all processes are running', () => {
const watcher = processAliveWatcher({ processes: testProcesses });
const window: VitalWithData<ProcessAliveData>[] = [
{
occurredAt: Date.now(),
key: 'processes',
data: {
processes: {
'oc-gateway': true,
litellm: true,
},
},
},
];
expect(watcher.shouldWake(window)).toBe(false);
});
it('should not wake with empty window', () => {
const watcher = processAliveWatcher({ processes: testProcesses });
expect(watcher.shouldWake([])).toBe(false);
});
it('should not wake with null data', () => {
const watcher = processAliveWatcher({ processes: testProcesses });
const window: VitalWithData<ProcessAliveData>[] = [
{
occurredAt: Date.now(),
key: 'processes',
data: null as any,
},
];
expect(watcher.shouldWake(window)).toBe(false);
});
it('should handle case-insensitive process matching', async () => {
mockExecSync.mockReturnValue(
'user 1234 OPENCLAW-GATEWAY\\nuser 5678 LiteLLM-server\\n',
);
const watcher = processAliveWatcher({ processes: testProcesses });
const data = await watcher.collect();
expect(data.processes['oc-gateway']).toBe(true);
expect(data.processes['litellm']).toBe(true);
});
});
@@ -0,0 +1,81 @@
import { execSync } from 'node:child_process';
import type { WatcherDef } from '../watcher.js';
export interface ProcessAliveData {
processes: Record<string, boolean>; // name → alive
}
export interface ProcessAliveOptions {
/** 要监控的进程列表:name → 匹配命令行的关键词 */
processes: Record<string, string>; // e.g. { 'oc-gateway': 'openclaw', 'litellm': 'litellm' }
}
export function processAliveWatcher(
opts: ProcessAliveOptions,
): WatcherDef<ProcessAliveData> {
return {
name: 'process-alive',
key: 'processes',
intervalMs: 5000,
collect: async () => {
const processes: Record<string, boolean> = {};
try {
// 获取所有进程信息
const psOutput = execSync('ps aux --no-headers', { encoding: 'utf8' });
// 检查每个配置的进程
for (const [name, keyword] of Object.entries(opts.processes)) {
// 在进程列表中搜索包含关键词的进程
const isRunning = psOutput
.split('\n')
.some((line) => line.toLowerCase().includes(keyword.toLowerCase()));
processes[name] = isRunning;
}
} catch (error) {
// ps 命令失败时,标记所有进程为未知状态(false)
console.error('[process-alive] Failed to get process list:', error);
for (const name of Object.keys(opts.processes)) {
processes[name] = false;
}
}
return { processes };
},
shouldWake: (window) => {
if (window.length === 0) return false;
const latest = window[0];
if (!latest?.data) return false;
const currentProcesses = latest.data.processes;
// 检查是否有任何进程消失
for (const [name, isAlive] of Object.entries(currentProcesses)) {
if (!isAlive) {
console.warn(`[process-alive] Process '${name}' is not running`);
return true;
}
}
// 如果有历史数据,检查是否有进程从运行变为停止
if (window.length > 1) {
const previous = window[1];
if (previous?.data) {
const prevProcesses = previous.data.processes;
for (const [name, isAlive] of Object.entries(currentProcesses)) {
const wasAlive = prevProcesses[name];
if (wasAlive && !isAlive) {
console.warn(`[process-alive] Process '${name}' stopped running`);
return true;
}
}
}
}
return false;
},
};
}
@@ -0,0 +1,192 @@
import { afterEach, beforeEach, describe, expect, it, mock } from 'bun:test';
import type { VitalWithData } from '../watcher.js';
import type { SystemResourceData } from './system-resource.js';
import { systemResourceWatcher } from './system-resource.js';
// Mock modules
const mockExecSync = mock(() => '85%\\n');
const mockReadFileSync = mock(
() => 'SwapTotal: 8388608 kB\\nSwapFree: 4194304 kB\\n',
);
const mockTotalmem = mock(() => 16777216 * 1024); // 16GB
const mockFreemem = mock(() => 1677721.6 * 1024); // 1.6GB free
const mockCpus = mock(() => [
{ times: { user: 100, nice: 0, sys: 50, idle: 850, irq: 0 } },
{ times: { user: 120, nice: 0, sys: 60, idle: 820, irq: 0 } },
]);
mock.module('node:child_process', () => ({
execSync: mockExecSync,
}));
mock.module('node:fs', () => ({
readFileSync: mockReadFileSync,
}));
mock.module('node:os', () => ({
totalmem: mockTotalmem,
freemem: mockFreemem,
cpus: mockCpus,
}));
describe('systemResourceWatcher', () => {
beforeEach(() => {
// Reset mocks
mockExecSync.mockClear();
mockReadFileSync.mockClear();
mockTotalmem.mockClear();
mockFreemem.mockClear();
mockCpus.mockClear();
// Set default return values
mockExecSync.mockReturnValue('85%\\n');
mockReadFileSync.mockReturnValue(
'SwapTotal: 8388608 kB\\nSwapFree: 4194304 kB\\n',
);
mockTotalmem.mockReturnValue(16777216 * 1024);
mockFreemem.mockReturnValue(1677721.6 * 1024);
mockCpus.mockReturnValue([
{ times: { user: 100, nice: 0, sys: 50, idle: 850, irq: 0 } },
{ times: { user: 120, nice: 0, sys: 60, idle: 820, irq: 0 } },
]);
});
it('should create a watcher with correct default configuration', () => {
const watcher = systemResourceWatcher();
expect(watcher.name).toBe('system-resource');
expect(watcher.key).toBe('system');
expect(watcher.intervalMs).toBe(5000);
expect(typeof watcher.collect).toBe('function');
expect(typeof watcher.shouldWake).toBe('function');
});
it('should collect system resource data correctly', async () => {
const watcher = systemResourceWatcher();
const data = await watcher.collect();
expect(data).toHaveProperty('cpuPct');
expect(data).toHaveProperty('memoryPct');
expect(data).toHaveProperty('diskPct');
expect(data).toHaveProperty('swapPct');
// Memory: (16GB - 1.6GB) / 16GB = 90%
expect(data.memoryPct).toBeCloseTo(90.0, 1);
// Disk from df command
expect(data.diskPct).toBe(85);
// Swap: (8GB - 4GB) / 8GB = 50%
expect(data.swapPct).toBe(50.0);
// CPU calculation: (total - idle) / total * 100
expect(typeof data.cpuPct).toBe('number');
expect(data.cpuPct).toBeGreaterThanOrEqual(0);
expect(data.cpuPct).toBeLessThanOrEqual(100);
});
it('should handle disk command failure gracefully', async () => {
mockExecSync.mockImplementation(() => {
throw new Error('Command failed');
});
const watcher = systemResourceWatcher();
const data = await watcher.collect();
expect(data.diskPct).toBe(0);
});
it('should handle swap file read failure gracefully', async () => {
mockReadFileSync.mockImplementation(() => {
throw new Error('File not found');
});
const watcher = systemResourceWatcher();
const data = await watcher.collect();
expect(data.swapPct).toBe(0);
});
it('should wake when disk usage exceeds threshold', () => {
const watcher = systemResourceWatcher({ diskThreshold: 80 });
const window: VitalWithData<SystemResourceData>[] = [
{
occurredAt: Date.now(),
key: 'system',
data: {
cpuPct: 50,
memoryPct: 70,
diskPct: 95, // Above threshold
swapPct: 10,
},
},
];
expect(watcher.shouldWake(window)).toBe(true);
});
it('should wake when memory usage is sustained above threshold', () => {
const watcher = systemResourceWatcher({
memoryThreshold: 85,
sustainedSeconds: 10, // 2 intervals at 5s each
});
const baseTime = Date.now();
const window: VitalWithData<SystemResourceData>[] = [
{
occurredAt: baseTime,
key: 'system',
data: { cpuPct: 50, memoryPct: 95, diskPct: 70, swapPct: 10 },
},
{
occurredAt: baseTime - 5000,
key: 'system',
data: { cpuPct: 50, memoryPct: 92, diskPct: 70, swapPct: 10 },
},
];
expect(watcher.shouldWake(window)).toBe(true);
});
it('should not wake when memory usage is high but not sustained', () => {
const watcher = systemResourceWatcher({
memoryThreshold: 85,
sustainedSeconds: 10,
});
const baseTime = Date.now();
const window: VitalWithData<SystemResourceData>[] = [
{
occurredAt: baseTime,
key: 'system',
data: { cpuPct: 50, memoryPct: 95, diskPct: 70, swapPct: 10 },
},
{
occurredAt: baseTime - 5000,
key: 'system',
data: { cpuPct: 50, memoryPct: 75, diskPct: 70, swapPct: 10 }, // Below threshold
},
];
expect(watcher.shouldWake(window)).toBe(false);
});
it('should not wake with empty window', () => {
const watcher = systemResourceWatcher();
expect(watcher.shouldWake([])).toBe(false);
});
it('should not wake with null data', () => {
const watcher = systemResourceWatcher();
const window: VitalWithData<SystemResourceData>[] = [
{
occurredAt: Date.now(),
key: 'system',
data: null as any,
},
];
expect(watcher.shouldWake(window)).toBe(false);
});
});
@@ -0,0 +1,107 @@
import { execSync } from 'node:child_process';
import * as fs from 'node:fs';
import * as os from 'node:os';
import type { WatcherDef } from '../watcher.js';
export interface SystemResourceData {
cpuPct: number; // CPU 使用率 %
memoryPct: number; // 内存使用率 %
diskPct: number; // 磁盘使用率 % (根分区)
swapPct: number; // swap 使用率 %
}
export interface SystemResourceOptions {
memoryThreshold?: number; // 默认 90
diskThreshold?: number; // 默认 95
sustainedSeconds?: number; // 持续超阈值多久才唤醒,默认 30
}
export function systemResourceWatcher(
opts?: SystemResourceOptions,
): WatcherDef<SystemResourceData> {
const memThreshold = opts?.memoryThreshold ?? 90;
const diskThreshold = opts?.diskThreshold ?? 95;
const sustained = opts?.sustainedSeconds ?? 30;
return {
name: 'system-resource',
key: 'system',
intervalMs: 5000,
collect: async () => {
const totalMem = os.totalmem();
const freeMem = os.freemem();
const memoryPct = ((totalMem - freeMem) / totalMem) * 100;
// CPU:用 os.cpus() 算 idle 百分比(简化版,使用瞬时值)
const cpus = os.cpus();
const totalIdle = cpus.reduce((sum, cpu) => sum + cpu.times.idle, 0);
const totalTick = cpus.reduce(
(sum, cpu) => sum + Object.values(cpu.times).reduce((a, b) => a + b, 0),
0,
);
const cpuPct = totalTick > 0 ? (1 - totalIdle / totalTick) * 100 : 0;
// 磁盘:使用 df 命令解析根分区使用率
let diskPct = 0;
try {
const output = execSync('df / --output=pcent | tail -1', {
encoding: 'utf8',
});
diskPct = parseFloat(output.trim().replace('%', '')) || 0;
} catch {
diskPct = 0;
}
// Swap:从 /proc/meminfo 读取
let swapPct = 0;
try {
const meminfo = fs.readFileSync('/proc/meminfo', 'utf8');
const swapTotal = parseInt(
meminfo.match(/SwapTotal:\s+(\d+)/)?.[1] ?? '0',
);
const swapFree = parseInt(
meminfo.match(/SwapFree:\s+(\d+)/)?.[1] ?? '0',
);
swapPct =
swapTotal > 0 ? ((swapTotal - swapFree) / swapTotal) * 100 : 0;
} catch {
swapPct = 0;
}
return {
cpuPct: Math.round(cpuPct * 10) / 10,
memoryPct: Math.round(memoryPct * 10) / 10,
diskPct,
swapPct: Math.round(swapPct * 10) / 10,
};
},
shouldWake: (window) => {
if (window.length === 0) return false;
const latest = window[0];
if (!latest?.data) return false;
const data = latest.data;
// 磁盘 > 95% 立即唤醒(单点就够危险)
if (data.diskPct >= diskThreshold) {
return true;
}
// 内存持续超阈值检查
if (data.memoryPct >= memThreshold) {
// 检查持续时间:最近 N 条记录都超阈值
const sustainedCount = Math.ceil((sustained * 1000) / 5000); // 5s interval
const recentOverThreshold = window
.slice(0, sustainedCount)
.filter((v) => v.data && v.data.memoryPct >= memThreshold);
if (recentOverThreshold.length >= sustainedCount) {
return true;
}
}
return false;
},
};
}