feat: @uncaged/pulse-openclaw — OpenClaw adapter package (closes #40)

* feat: @uncaged/pulse-oc — OpenClaw adapter package (closes #40)

- Extract gateway-health & llm-health watchers from core to pulse-oc
- Add gateway-health-guard rule and archive-sessions executor
- Update CI, README, CONTRIBUTING for the new package
- Fix E2E tests to use core ESSENTIAL_PROCESSES after openclaw removal

Made-with: Cursor

* feat: @uncaged/pulse-openclaw — OpenClaw adapter package (closes #40)

* chore: rename pulse-oc → pulse-openclaw

Made-with: Cursor

---------

Co-authored-by: 小橘 <xiaoju@shazhou.work>
This commit is contained in:
小橘 🍊
2026-04-15 00:15:09 +08:00
committed by GitHub
parent 3ced39c6fa
commit a9be34f563
26 changed files with 619 additions and 439 deletions
+12
View File
@@ -42,6 +42,10 @@ jobs:
working-directory: packages/pulse-cursor
run: bun install
- name: Install dependencies (pulse-openclaw)
working-directory: packages/pulse-openclaw
run: bun install
- name: Type check (pulse)
working-directory: packages/pulse
run: bunx tsc --noEmit
@@ -58,6 +62,10 @@ jobs:
working-directory: packages/pulse-cursor
run: bunx tsc --noEmit
- name: Type check (pulse-openclaw)
working-directory: packages/pulse-openclaw
run: bunx tsc --noEmit
- name: Unit tests (pulse)
working-directory: packages/pulse
run: bun test
@@ -70,6 +78,10 @@ jobs:
working-directory: packages/pulse-cursor
run: bun test
- name: Unit tests (pulse-openclaw)
working-directory: packages/pulse-openclaw
run: bun test
- name: Unit tests (upulse)
working-directory: packages/upulse
run: bun test src/config.test.ts
+11 -6
View File
@@ -28,7 +28,6 @@ pulse/
├── packages/
│ ├── pulse/src/ ← 核心引擎(@uncaged/pulse)
│ │ ├── index.ts ← Rule 类型、composeRules、runPulse、rebuildSnapshot
│ │ ├── rules.ts ← 内置 rules:clampTick、errorBackoff、adaptiveInterval、dedup
│ │ ├── store.ts ← 存储层:events.db + vitals.db + objects/ CAS
│ │ ├── watcher.ts ← Watcher 框架:startWatcher、wakeTick、WatcherDef
│ │ ├── watchers/ ← P0 内置 watchers(植物神经,agent 不可改)
@@ -36,21 +35,27 @@ pulse/
│ │ │ ├── process-alive.ts 关键进程存活检测
│ │ │ ├── network.ts DNS + HTTP 连通性
│ │ │ ├── error-log.ts 日志关键词匹配
│ │ │ ├── llm-health.ts LLM 双层探针(轻量 + 深度)
│ │ │ └── index.ts
│ │ ├── survival/ ← P0 保命层(植物神经,agent 不可改)
│ │ ├── rules/ ← P0 保命层(植物神经,agent 不可改)
│ │ │ ├── constants.ts ESSENTIAL_PROCESSES 白名单、阈值常量
│ │ │ ├── health.ts HealthSnapshot 类型 + rebuildHealth
│ │ │ ├── rules.ts 7 个保命 rules(洋葱最外层)
│ │ │ ├── executors.ts 保命 executors(确定性本地命令)
│ │ │ ├── survival.ts 保命 rules(洋葱最外层)
│ │ │ ├── builtin.ts 内置 rules:clampTick、dedup 等
│ │ │ └── index.ts
│ │ └── executors/ ← 业务 executors(agent 可扩展
│ │ └── executors/ ← 保命 executors(确定性本地命令
│ │ ├── survival.ts
│ │ └── index.ts
│ │
│ ├── pulse-cursor/src/ ← Cursor Agent 适配器(@uncaged/pulse-cursor)
│ │ ├── index.ts re-export
│ │ └── cursor-agent.ts Cursor Agent Executor
│ │
│ ├── pulse-openclaw/src/ ← OpenClaw 适配器(@uncaged/pulse-openclaw)
│ │ ├── index.ts re-export
│ │ ├── watchers/ OC Gateway + LLM 健康探针
│ │ ├── rules/ OC 专属保命 rules
│ │ └── executors/ OC 专属 executors
│ │
│ └── upulse/src/ ← CLI 工具(@uncaged/upulse)
│ ├── cli.ts 命令路由
│ ├── daemon.ts daemon 进程管理
+2 -2
View File
@@ -47,6 +47,7 @@ Percept 层(vitals.db) Understand + Execute 层(events.db)
| 包 | 说明 | npm |
|---|---|---|
| `@uncaged/pulse` | 核心引擎 + 保命层 + watchers | [![npm](https://img.shields.io/npm/v/@uncaged/pulse)](https://www.npmjs.com/package/@uncaged/pulse) |
| `@uncaged/pulse-openclaw` | OpenClaw 适配器(Gateway/LLM watchers + rules + executors) | [![npm](https://img.shields.io/npm/v/@uncaged/pulse-openclaw)](https://www.npmjs.com/package/@uncaged/pulse-openclaw) |
| `@uncaged/pulse-cursor` | Cursor Agent CLI executor 适配器 | [![npm](https://img.shields.io/npm/v/@uncaged/pulse-cursor)](https://www.npmjs.com/package/@uncaged/pulse-cursor) |
| `@uncaged/upulse` | CLI:daemon 管理、staging/promote/rollback | [![npm](https://img.shields.io/npm/v/@uncaged/upulse)](https://www.npmjs.com/package/@uncaged/upulse) |
@@ -124,14 +125,13 @@ Watcher 持续采集写 vitals,唤醒判定看最近 1 分钟窗口(~12 条
### 保命层(P0 Survival)
5 个内置 Watcher + 7 个保命 Rule,零 LLM 零网络依赖:
4 个内置 Watcher + 7 个保命 Rule,零 LLM 零网络依赖:
**Watchers:**
- `system-resource` — CPU/内存/磁盘/swap
- `process-alive` — 关键进程存活
- `network` — DNS + HTTP 出站
- `error-log` — 日志关键词匹配
- `llm-health` — 双层探针(轻量 5s + 深度 60s)
**Rules(洋葱顺序,最外层先执行):**
1. `panicRollback` — 保命动作连续失败 → 紧急回滚
+15
View File
@@ -43,6 +43,19 @@
"@uncaged/pulse": ">=0.1.0",
},
},
"packages/pulse-openclaw": {
"name": "@uncaged/pulse-openclaw",
"version": "0.1.0",
"devDependencies": {
"@types/node": "^25.6.0",
"@uncaged/pulse": "workspace:*",
"bun-types": "latest",
"typescript": "^6.0.2",
},
"peerDependencies": {
"@uncaged/pulse": ">=0.1.0",
},
},
"packages/upulse": {
"name": "@uncaged/upulse",
"version": "0.1.0",
@@ -87,6 +100,8 @@
"@uncaged/pulse-hermes": ["@uncaged/pulse-hermes@workspace:packages/pulse-hermes"],
"@uncaged/pulse-openclaw": ["@uncaged/pulse-openclaw@workspace:packages/pulse-openclaw"],
"@uncaged/upulse": ["@uncaged/upulse@workspace:packages/upulse"],
"bun-types": ["bun-types@1.3.12", "", { "dependencies": { "@types/node": "*" } }, "sha512-HqOLj5PoFajAQciOMRiIZGNoKxDJSr6qigAttOX40vJuSp6DN/CxWp9s3C1Xwm4oH7ybueITwiaOcWXoYVoRkA=="],
+38
View File
@@ -0,0 +1,38 @@
{
"name": "@uncaged/pulse-openclaw",
"version": "0.1.0",
"description": "Pulse adapter for OpenClaw — watchers, rules, and executors for OpenClaw Gateway",
"type": "module",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"files": [
"dist"
],
"scripts": {
"build": "tsc",
"test": "bun test"
},
"keywords": [
"pulse",
"openclaw",
"agent",
"watcher",
"executor"
],
"author": "oc-xiaoju",
"license": "MIT",
"repository": {
"type": "git",
"url": "https://github.com/oc-xiaoju/pulse",
"directory": "packages/pulse-openclaw"
},
"peerDependencies": {
"@uncaged/pulse": ">=0.1.0"
},
"devDependencies": {
"@uncaged/pulse": "workspace:*",
"@types/node": "^25.6.0",
"bun-types": "latest",
"typescript": "^6.0.2"
}
}
+12
View File
@@ -0,0 +1,12 @@
/**
* OpenClaw-specific constants
*
* Essential processes that belong to the OpenClaw platform.
* These were moved out of the core @uncaged/pulse package.
*/
/** OpenClaw-specific essential processes */
export const OC_ESSENTIAL_PROCESSES = new Set([
'openclaw', // OC Gateway
'litellm', // LLM proxy
]);
@@ -0,0 +1,170 @@
/**
* Archive sessions executor tests
*
* Extracted from core @uncaged/pulse survival.test.ts
*/
import { beforeEach, describe, expect, jest, test } from 'bun:test';
import {
type ArchiveSessionsDeps,
executeArchiveSessions,
} from './archive-sessions.js';
const mockExecSync = jest.fn();
const mockExistsSync = jest.fn(() => true);
const mockMkdirSync = jest.fn();
const mockReaddirSync = jest.fn(() => [] as string[]);
const mockStatSync = jest.fn();
const mockRenameSync = jest.fn();
const mockFs = {
existsSync: mockExistsSync,
mkdirSync: mockMkdirSync,
readdirSync: mockReaddirSync,
statSync: mockStatSync,
renameSync: mockRenameSync,
} as unknown as ArchiveSessionsDeps['fs'];
const testDeps: ArchiveSessionsDeps = {
fs: mockFs,
execSyncFn: mockExecSync,
};
describe('executeArchiveSessions', () => {
beforeEach(() => {
jest.clearAllMocks();
mockExistsSync.mockReturnValue(true);
mockReaddirSync.mockReturnValue([]);
});
test('should archive checkpoint and stale files', async () => {
const effect = {
type: 'archive-sessions' as const,
sessionsDir: '/test/sessions',
};
mockReaddirSync.mockReturnValueOnce([
'sess-active.jsonl',
'sess-active.jsonl.lock',
'sess-old.checkpoint.1.jsonl',
'sess-old.checkpoint.2.jsonl',
'sess-gone.deleted.1.jsonl',
'sess-gone.reset.1.jsonl',
]);
mockReaddirSync.mockReturnValueOnce([]);
const consoleSpy = jest.spyOn(console, 'log').mockImplementation();
await executeArchiveSessions(effect, testDeps);
expect(mockMkdirSync).toHaveBeenCalledWith('/test/sessions/archive', {
recursive: true,
});
expect(mockRenameSync).toHaveBeenCalledWith(
'/test/sessions/sess-old.checkpoint.1.jsonl',
'/test/sessions/archive/sess-old.checkpoint.1.jsonl',
);
expect(mockRenameSync).toHaveBeenCalledWith(
'/test/sessions/sess-old.checkpoint.2.jsonl',
'/test/sessions/archive/sess-old.checkpoint.2.jsonl',
);
expect(mockRenameSync).toHaveBeenCalledWith(
'/test/sessions/sess-gone.deleted.1.jsonl',
'/test/sessions/archive/sess-gone.deleted.1.jsonl',
);
expect(mockRenameSync).toHaveBeenCalledWith(
'/test/sessions/sess-gone.reset.1.jsonl',
'/test/sessions/archive/sess-gone.reset.1.jsonl',
);
expect(consoleSpy).toHaveBeenCalledWith(
'[survival] Archived 2 checkpoints + 2 stale files',
);
consoleSpy.mockRestore();
});
test('should NOT archive active session checkpoints', async () => {
const effect = {
type: 'archive-sessions' as const,
sessionsDir: '/test/sessions',
};
mockReaddirSync.mockReturnValueOnce([
'sess-active.jsonl',
'sess-active.jsonl.lock',
'sess-active.checkpoint.1.jsonl',
]);
mockReaddirSync.mockReturnValueOnce([]);
const consoleSpy = jest.spyOn(console, 'log').mockImplementation();
await executeArchiveSessions(effect, testDeps);
expect(mockRenameSync).not.toHaveBeenCalled();
consoleSpy.mockRestore();
});
test('should compress jsonl files already in archive', async () => {
const effect = {
type: 'archive-sessions' as const,
sessionsDir: '/test/sessions',
};
mockReaddirSync.mockReturnValueOnce([]);
mockReaddirSync.mockReturnValueOnce(['old-checkpoint.jsonl', 'old2.jsonl']);
const consoleSpy = jest.spyOn(console, 'log').mockImplementation();
await executeArchiveSessions(effect, testDeps);
expect(mockExecSync).toHaveBeenCalledWith(
expect.stringMatching(
/^cd \/test\/sessions\/archive && tar czf sessions-.*\.tar\.gz \*\.jsonl && rm -f \*\.jsonl$/,
),
{ timeout: 30000 },
);
consoleSpy.mockRestore();
});
test('should handle missing sessions dir gracefully', async () => {
const effect = {
type: 'archive-sessions' as const,
sessionsDir: '/missing',
};
mockExistsSync.mockReturnValue(false);
const consoleSpy = jest.spyOn(console, 'log').mockImplementation();
await executeArchiveSessions(effect, testDeps);
expect(consoleSpy).toHaveBeenCalledWith(
'[survival] sessions dir not found: /missing',
);
consoleSpy.mockRestore();
});
test('should handle archive-sessions errors gracefully', async () => {
const effect = {
type: 'archive-sessions' as const,
sessionsDir: '/test/sessions',
};
mockReaddirSync.mockImplementationOnce(() => {
throw new Error('Permission denied');
});
const consoleSpy = jest.spyOn(console, 'error').mockImplementation();
await executeArchiveSessions(effect, testDeps);
expect(consoleSpy).toHaveBeenCalledWith(
'[survival] Failed to archive sessions:',
expect.any(Error),
);
consoleSpy.mockRestore();
});
});
@@ -0,0 +1,93 @@
/**
* Archive sessions executor — OpenClaw specific
*
* Handles archiving stale/zombie session files from OpenClaw Gateway's sessions directory.
* Moved from core @uncaged/pulse to @uncaged/pulse-openclaw adapter.
*/
import { execSync as defaultExecSync } from 'node:child_process';
import * as defaultFs from 'node:fs';
import * as os from 'node:os';
import * as path from 'node:path';
export interface ArchiveSessionsEffect {
type: 'archive-sessions';
sessionsDir?: string;
}
export interface ArchiveSessionsDeps {
fs?: typeof defaultFs;
execSyncFn?: typeof defaultExecSync;
}
/**
* Execute archive-sessions effect — archive stale checkpoints, deleted/reset files, and compress.
*/
export async function executeArchiveSessions(
effect: ArchiveSessionsEffect,
deps?: ArchiveSessionsDeps,
): Promise<void> {
const fs = deps?.fs ?? defaultFs;
const execSync = deps?.execSyncFn ?? defaultExecSync;
const sessionsDir =
(effect.sessionsDir as string) ||
path.join(os.homedir(), '.openclaw/agents/main/sessions');
const archiveDir = path.join(sessionsDir, 'archive');
try {
if (!fs.existsSync(sessionsDir)) {
console.log(`[survival] sessions dir not found: ${sessionsDir}`);
return;
}
// 1. Ensure archive directory exists
fs.mkdirSync(archiveDir, { recursive: true });
// 2. Find current active sessions (most recently modified .lock files)
const allFiles = fs.readdirSync(sessionsDir) as string[];
const lockFiles = allFiles.filter((f) => f.endsWith('.jsonl.lock'));
const activeSessions = new Set(
lockFiles.map((f) => f.replace('.jsonl.lock', '')),
);
// 3. Archive checkpoint files (non-active session's)
const checkpoints = allFiles.filter(
(f) => f.includes('.checkpoint.') && f.endsWith('.jsonl'),
);
let archivedCheckpoints = 0;
for (const cp of checkpoints) {
const sessionId = cp.split('.checkpoint.')[0];
if (!activeSessions.has(sessionId)) {
fs.renameSync(path.join(sessionsDir, cp), path.join(archiveDir, cp));
archivedCheckpoints++;
}
}
// 4. Archive .deleted and .reset files
const staleFiles = allFiles.filter(
(f) => f.includes('.deleted.') || f.includes('.reset.'),
);
for (const sf of staleFiles) {
fs.renameSync(path.join(sessionsDir, sf), path.join(archiveDir, sf));
}
// 5. Compress archive (if there are .jsonl files)
const archiveFiles = (fs.readdirSync(archiveDir) as string[]).filter((f) =>
f.endsWith('.jsonl'),
);
if (archiveFiles.length > 0) {
const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
execSync(
`cd ${archiveDir} && tar czf sessions-${timestamp}.tar.gz *.jsonl && rm -f *.jsonl`,
{ timeout: 30000 },
);
}
console.log(
`[survival] Archived ${archivedCheckpoints} checkpoints + ${staleFiles.length} stale files`,
);
} catch (err) {
console.error('[survival] Failed to archive sessions:', err);
}
}
@@ -0,0 +1,5 @@
export {
type ArchiveSessionsDeps,
type ArchiveSessionsEffect,
executeArchiveSessions,
} from './archive-sessions.js';
+14
View File
@@ -0,0 +1,14 @@
/**
* @uncaged/pulse-openclaw — OpenClaw adapter for Pulse
*
* Provides:
* - Watchers: gateway-health, llm-health
* - Rules: gatewayHealthGuard
* - Executors: archive-sessions
* - Constants: OC-specific essential processes
*/
export { OC_ESSENTIAL_PROCESSES } from './constants.js';
export * from './executors/index.js';
export * from './rules/index.js';
export * from './watchers/index.js';
@@ -0,0 +1,122 @@
/**
* Gateway health guard rule tests
*
* Extracted from core @uncaged/pulse survival.test.ts
*/
import { beforeEach, describe, expect, jest, test } from 'bun:test';
import { gatewayHealthGuard } from './gateway-health-guard.js';
const mockInner = jest.fn(async () => [[], 15000]);
describe('gatewayHealthGuard', () => {
beforeEach(() => {
mockInner.mockClear();
});
test('should archive-sessions when zombies detected', async () => {
const curr = {
gatewayHealth: {
data: {
memoryMb: 800,
totalSessions: 10,
staleCheckpoints: 5,
sessionsDirMb: 50,
zombieSessions: ['z1', 'z2'],
},
},
};
mockInner.mockResolvedValueOnce([[], 15000]);
const [effects, tickMs] = await gatewayHealthGuard({}, curr, mockInner);
expect(effects).toContainEqual({ type: 'archive-sessions' });
expect(tickMs).toBe(15000);
expect(mockInner).toHaveBeenCalledWith({}, curr);
});
test('should archive when staleCheckpoints > 10', async () => {
const curr = {
gatewayHealth: {
data: {
memoryMb: 500,
totalSessions: 10,
staleCheckpoints: 15,
sessionsDirMb: 50,
zombieSessions: [],
},
},
};
mockInner.mockResolvedValueOnce([[], 15000]);
const [effects] = await gatewayHealthGuard({}, curr, mockInner);
expect(effects).toContainEqual({ type: 'archive-sessions' });
expect(mockInner).toHaveBeenCalled();
});
test('should archive when sessionsDirMb > 100', async () => {
const curr = {
gatewayHealth: {
data: {
memoryMb: 500,
totalSessions: 10,
staleCheckpoints: 3,
sessionsDirMb: 200,
zombieSessions: [],
},
},
};
mockInner.mockResolvedValueOnce([[], 15000]);
const [effects] = await gatewayHealthGuard({}, curr, mockInner);
expect(effects).toContainEqual({ type: 'archive-sessions' });
});
test('should bypass inner when memory > 2000 MB (crisis)', async () => {
const curr = {
gatewayHealth: {
data: {
memoryMb: 2200,
totalSessions: 20,
staleCheckpoints: 30,
sessionsDirMb: 300,
zombieSessions: ['z1'],
},
},
};
const [effects, tickMs] = await gatewayHealthGuard({}, curr, mockInner);
expect(effects).toEqual(
expect.arrayContaining([
{ type: 'archive-sessions' },
{ type: 'restart-service', service: 'openclaw' },
]),
);
expect(tickMs).toBe(30000);
expect(mockInner).not.toHaveBeenCalled();
});
test('should pass through when gateway is healthy', async () => {
const curr = {
gatewayHealth: {
data: {
memoryMb: 500,
totalSessions: 5,
staleCheckpoints: 3,
sessionsDirMb: 50,
zombieSessions: [],
},
},
};
await gatewayHealthGuard({}, curr, mockInner);
expect(mockInner).toHaveBeenCalledWith({}, curr);
});
test('should pass through when no gateway data', async () => {
const curr = {};
await gatewayHealthGuard({}, curr, mockInner);
expect(mockInner).toHaveBeenCalledWith({}, curr);
});
});
@@ -0,0 +1,60 @@
/**
* Gateway health guard rule — OpenClaw specific
*
* Handles Gateway memory bloat and session accumulation.
* Moved from core @uncaged/pulse to @uncaged/pulse-openclaw adapter.
*/
import type { Rule, Sensed } from '@uncaged/pulse';
import type { GatewayHealthData } from '../watchers/gateway-health.js';
export interface GatewayHealthSnapshot {
gatewayHealth?: Sensed<GatewayHealthData>;
}
export interface GatewayHealthEffect {
type: string;
[key: string]: unknown;
}
/**
* Gateway health guard - handle Gateway memory bloat and session accumulation
*/
export const gatewayHealthGuard: Rule<
GatewayHealthSnapshot,
GatewayHealthEffect
> = async (prev, curr, inner) => {
const gateway = curr.gatewayHealth?.data;
if (!gateway) {
return await inner(prev, curr);
}
const effects: GatewayHealthEffect[] = [];
// Check for session problems
if (
gateway.zombieSessions?.length > 0 ||
gateway.staleCheckpoints > 10 ||
gateway.sessionsDirMb > 100
) {
effects.push({ type: 'archive-sessions' });
}
// Gateway process memory > 2 GB: archive + restart, bypass inner
if (gateway.memoryMb > 2000) {
effects.push(
{ type: 'archive-sessions' },
{ type: 'restart-service', service: 'openclaw' },
);
return [effects, 30000]; // bypass during memory crisis
}
if (effects.length > 0) {
// Session problems but not critical — don't bypass
const [innerEffects, tickMs] = await inner(prev, curr);
return [effects.concat(innerEffects), tickMs];
}
// Pass through to inner layers
return await inner(prev, curr);
};
@@ -0,0 +1,5 @@
export {
type GatewayHealthEffect,
type GatewayHealthSnapshot,
gatewayHealthGuard,
} from './gateway-health-guard.js';
@@ -1,5 +1,5 @@
import { beforeEach, describe, expect, it, mock } from 'bun:test';
import type { VitalWithData } from '../watcher.js';
import type { VitalWithData } from '@uncaged/pulse';
import type { GatewayHealthData } from './gateway-health.js';
import { gatewayHealthWatcher } from './gateway-health.js';
@@ -1,7 +1,7 @@
import { execSync } from 'node:child_process';
import * as fs from 'node:fs';
import * as path from 'node:path';
import type { WatcherDef } from '../watcher.js';
import type { WatcherDef } from '@uncaged/pulse';
export interface GatewayHealthData {
memoryMb: number; // Gateway 进程 RSS(通过 ps 读取),0 if not found
@@ -0,0 +1,13 @@
export {
collectGatewayMemory,
collectSessionStats,
type GatewayHealthConfig,
type GatewayHealthData,
type GatewayHealthOptions,
gatewayHealthWatcher,
} from './gateway-health.js';
export {
type LlmHealthData,
type LlmHealthOptions,
llmHealthWatcher,
} from './llm-health.js';
@@ -1,5 +1,5 @@
import { beforeEach, describe, expect, it, mock } from 'bun:test';
import type { VitalWithData } from '../watcher.js';
import type { VitalWithData } from '@uncaged/pulse';
import type { LlmHealthData } from './llm-health.js';
import { llmHealthWatcher } from './llm-health.js';
@@ -1,4 +1,4 @@
import type { WatcherDef } from '../watcher.js';
import type { WatcherDef } from '@uncaged/pulse';
export interface LlmHealthData {
processOk: boolean; // 轻量探针:LiteLLM 进程响应
+17
View File
@@ -0,0 +1,17 @@
{
"compilerOptions": {
"target": "ES2022",
"module": "ES2022",
"moduleResolution": "bundler",
"declaration": true,
"outDir": "dist",
"rootDir": "src",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"types": ["bun-types"]
},
"include": ["src"],
"exclude": ["src/**/*.test.ts"]
}
+6 -10
View File
@@ -76,9 +76,7 @@ describe('E2E survival chain', () => {
test('scenario 1: process crash → processWatchdog → restart effect', async () => {
const processData = {
processes: {
'openclaw-service': false, // dead essential process
'litellm-proxy': true,
'upulse-daemon': true,
'upulse-daemon': false, // dead essential process
sshd: true,
systemd: true,
},
@@ -93,10 +91,10 @@ describe('E2E survival chain', () => {
>([processWatchdog]);
const [effects, tickMs] = await pulse(prev, curr);
// Must produce restart-service for openclaw
// Must produce restart-service for upulse
expect(
effects.some(
(e) => e.type === 'restart-service' && e.service === 'openclaw',
(e) => e.type === 'restart-service' && e.service === 'upulse',
),
).toBe(true);
// Fast retry interval
@@ -259,13 +257,13 @@ describe('E2E survival chain', () => {
store.appendEvent({
occurredAt: now - (MAX_RESTART_COUNT - i) * 10_000,
kind: 'effect',
meta: JSON.stringify({ type: 'restart-service', service: 'openclaw' }),
meta: JSON.stringify({ type: 'restart-service', service: 'upulse' }),
});
}
const processData = {
processes: {
'openclaw-service': false, // still dead after restarts
'upulse-daemon': false, // still dead after restarts
},
};
@@ -290,7 +288,7 @@ describe('E2E survival chain', () => {
// Should NOT emit another restart
expect(
effects.some(
(e) => e.type === 'restart-service' && e.service === 'openclaw',
(e) => e.type === 'restart-service' && e.service === 'upulse',
),
).toBe(false);
});
@@ -400,8 +398,6 @@ describe('E2E survival chain', () => {
const systemData = { diskPct: 40, memoryPct: 60, cpuPct: 20, swapPct: 0 };
const processData = {
processes: {
'openclaw-gateway': true,
'litellm-proxy': true,
'upulse-daemon': true,
sshd: true,
systemd: true,
+5 -169
View File
@@ -7,28 +7,10 @@ import * as os from 'node:os';
import * as path from 'node:path';
import { executeSurvivalEffect, type SurvivalExecDeps } from './survival.js';
// All mocks via dependency injection — NO global jest.mock to avoid cross-file pollution
const mockExecSync = jest.fn();
const mockExecFileSync = jest.fn();
const mockFetch = jest.fn();
const mockExistsSync = jest.fn(() => true);
const mockMkdirSync = jest.fn();
const mockReaddirSync = jest.fn(() => [] as string[]);
const mockStatSync = jest.fn();
const mockRenameSync = jest.fn();
const mockFs = {
existsSync: mockExistsSync,
mkdirSync: mockMkdirSync,
readdirSync: mockReaddirSync,
statSync: mockStatSync,
renameSync: mockRenameSync,
} as unknown as SurvivalExecDeps['fs'];
const testDeps: SurvivalExecDeps = {
fs: mockFs,
execSyncFn: mockExecSync,
execFileSyncFn: mockExecFileSync,
};
@@ -38,24 +20,22 @@ global.fetch = mockFetch;
describe('executeSurvivalEffect', () => {
beforeEach(() => {
jest.clearAllMocks();
mockExistsSync.mockReturnValue(true);
mockReaddirSync.mockReturnValue([]);
});
test('should restart service using execFileSync', async () => {
const effect = { type: 'restart-service', service: 'openclaw' };
const effect = { type: 'restart-service', service: 'upulse' };
await executeSurvivalEffect(effect, testDeps);
expect(mockExecFileSync).toHaveBeenCalledWith(
'systemctl',
['restart', 'openclaw'],
['restart', 'upulse'],
{ timeout: 30000 },
);
});
test('should reject unsafe service name', async () => {
const effect = { type: 'restart-service', service: 'openclaw; rm -rf /' };
const effect = { type: 'restart-service', service: 'svc; rm -rf /' };
const consoleSpy = jest.spyOn(console, 'error').mockImplementation();
await executeSurvivalEffect(effect, testDeps);
@@ -69,7 +49,7 @@ describe('executeSurvivalEffect', () => {
});
test('should handle restart service failure gracefully', async () => {
const effect = { type: 'restart-service', service: 'openclaw' };
const effect = { type: 'restart-service', service: 'upulse' };
const consoleSpy = jest.spyOn(console, 'error').mockImplementation();
mockExecFileSync.mockImplementationOnce(() => {
@@ -79,155 +59,13 @@ describe('executeSurvivalEffect', () => {
await executeSurvivalEffect(effect, testDeps);
expect(consoleSpy).toHaveBeenCalledWith(
'[survival] Failed to restart openclaw:',
'[survival] Failed to restart upulse:',
expect.any(Error),
);
consoleSpy.mockRestore();
});
// ── archive-sessions ──────────────────────────────────────
test('should archive checkpoint and stale files', async () => {
const effect = {
type: 'archive-sessions',
sessionsDir: '/test/sessions',
};
// First readdirSync → sessions dir
mockReaddirSync.mockReturnValueOnce([
'sess-active.jsonl',
'sess-active.jsonl.lock',
'sess-old.checkpoint.1.jsonl',
'sess-old.checkpoint.2.jsonl',
'sess-gone.deleted.1.jsonl',
'sess-gone.reset.1.jsonl',
]);
// Second readdirSync → archive dir (for compression check)
mockReaddirSync.mockReturnValueOnce([]);
const consoleSpy = jest.spyOn(console, 'log').mockImplementation();
await executeSurvivalEffect(effect, testDeps);
// Should create archive dir
expect(mockMkdirSync).toHaveBeenCalledWith('/test/sessions/archive', {
recursive: true,
});
// Should move 2 stale checkpoints (sess-old is not active)
expect(mockRenameSync).toHaveBeenCalledWith(
'/test/sessions/sess-old.checkpoint.1.jsonl',
'/test/sessions/archive/sess-old.checkpoint.1.jsonl',
);
expect(mockRenameSync).toHaveBeenCalledWith(
'/test/sessions/sess-old.checkpoint.2.jsonl',
'/test/sessions/archive/sess-old.checkpoint.2.jsonl',
);
// Should move deleted and reset files
expect(mockRenameSync).toHaveBeenCalledWith(
'/test/sessions/sess-gone.deleted.1.jsonl',
'/test/sessions/archive/sess-gone.deleted.1.jsonl',
);
expect(mockRenameSync).toHaveBeenCalledWith(
'/test/sessions/sess-gone.reset.1.jsonl',
'/test/sessions/archive/sess-gone.reset.1.jsonl',
);
expect(consoleSpy).toHaveBeenCalledWith(
'[survival] Archived 2 checkpoints + 2 stale files',
);
consoleSpy.mockRestore();
});
test('should NOT archive active session checkpoints', async () => {
const effect = {
type: 'archive-sessions',
sessionsDir: '/test/sessions',
};
mockReaddirSync.mockReturnValueOnce([
'sess-active.jsonl',
'sess-active.jsonl.lock',
'sess-active.checkpoint.1.jsonl', // belongs to active → skip
]);
mockReaddirSync.mockReturnValueOnce([]); // archive dir
const consoleSpy = jest.spyOn(console, 'log').mockImplementation();
await executeSurvivalEffect(effect, testDeps);
// renameSync should NOT be called for active session's checkpoint
expect(mockRenameSync).not.toHaveBeenCalled();
consoleSpy.mockRestore();
});
test('should compress jsonl files already in archive', async () => {
const effect = {
type: 'archive-sessions',
sessionsDir: '/test/sessions',
};
// sessions dir: nothing to move
mockReaddirSync.mockReturnValueOnce([]);
// archive dir: has jsonl files → compress
mockReaddirSync.mockReturnValueOnce(['old-checkpoint.jsonl', 'old2.jsonl']);
const consoleSpy = jest.spyOn(console, 'log').mockImplementation();
await executeSurvivalEffect(effect, testDeps);
// Should run tar + rm via execSync (archive compression uses shell)
expect(mockExecSync).toHaveBeenCalledWith(
expect.stringMatching(
/^cd \/test\/sessions\/archive && tar czf sessions-.*\.tar\.gz \*\.jsonl && rm -f \*\.jsonl$/,
),
{ timeout: 30000 },
);
consoleSpy.mockRestore();
});
test('should handle missing sessions dir gracefully', async () => {
const effect = {
type: 'archive-sessions',
sessionsDir: '/missing',
};
mockExistsSync.mockReturnValue(false);
const consoleSpy = jest.spyOn(console, 'log').mockImplementation();
await executeSurvivalEffect(effect, testDeps);
expect(consoleSpy).toHaveBeenCalledWith(
'[survival] sessions dir not found: /missing',
);
consoleSpy.mockRestore();
});
test('should handle archive-sessions errors gracefully', async () => {
const effect = {
type: 'archive-sessions',
sessionsDir: '/test/sessions',
};
mockReaddirSync.mockImplementationOnce(() => {
throw new Error('Permission denied');
});
const consoleSpy = jest.spyOn(console, 'error').mockImplementation();
await executeSurvivalEffect(effect, testDeps);
expect(consoleSpy).toHaveBeenCalledWith(
'[survival] Failed to archive sessions:',
expect.any(Error),
);
consoleSpy.mockRestore();
});
// ── other effects ─────────────────────────────────────────
test('should handle gc-vitals as no-op', async () => {
@@ -235,7 +73,6 @@ describe('executeSurvivalEffect', () => {
await executeSurvivalEffect(effect, testDeps);
expect(mockExecSync).not.toHaveBeenCalled();
expect(mockExecFileSync).not.toHaveBeenCalled();
});
@@ -354,7 +191,6 @@ describe('executeSurvivalEffect', () => {
await expect(
executeSurvivalEffect(effect, testDeps),
).resolves.toBeUndefined();
expect(mockExecSync).not.toHaveBeenCalled();
expect(mockExecFileSync).not.toHaveBeenCalled();
expect(mockFetch).not.toHaveBeenCalled();
});
+4 -73
View File
@@ -6,9 +6,9 @@
import {
execFileSync as defaultExecFileSync,
execSync as defaultExecSync,
type execSync,
} from 'node:child_process';
import * as defaultFs from 'node:fs';
import type * as fs from 'node:fs';
import * as os from 'node:os';
import * as path from 'node:path';
@@ -22,8 +22,8 @@ export interface SurvivalEffect {
/** Dependencies that can be injected for testing */
export interface SurvivalExecDeps {
fs?: typeof defaultFs;
execSyncFn?: typeof defaultExecSync;
fs?: typeof fs;
execSyncFn?: typeof execSync;
execFileSyncFn?: typeof defaultExecFileSync;
}
@@ -34,8 +34,6 @@ export async function executeSurvivalEffect(
effect: SurvivalEffect,
deps?: SurvivalExecDeps,
): Promise<void> {
const fs = deps?.fs ?? defaultFs;
const execSync = deps?.execSyncFn ?? defaultExecSync;
const execFileSync = deps?.execFileSyncFn ?? defaultExecFileSync;
switch (effect.type) {
@@ -53,73 +51,6 @@ export async function executeSurvivalEffect(
break;
}
case 'archive-sessions': {
const sessionsDir =
(effect.sessionsDir as string) ||
path.join(os.homedir(), '.openclaw/agents/main/sessions');
const archiveDir = path.join(sessionsDir, 'archive');
try {
if (!fs.existsSync(sessionsDir)) {
console.log(`[survival] sessions dir not found: ${sessionsDir}`);
break;
}
// 1. Ensure archive directory exists
fs.mkdirSync(archiveDir, { recursive: true });
// 2. Find current active sessions (most recently modified .lock files)
const allFiles = fs.readdirSync(sessionsDir) as string[];
const lockFiles = allFiles.filter((f) => f.endsWith('.jsonl.lock'));
const activeSessions = new Set(
lockFiles.map((f) => f.replace('.jsonl.lock', '')),
);
// 3. Archive checkpoint files (non-active session's)
const checkpoints = allFiles.filter(
(f) => f.includes('.checkpoint.') && f.endsWith('.jsonl'),
);
let archivedCheckpoints = 0;
for (const cp of checkpoints) {
const sessionId = cp.split('.checkpoint.')[0];
if (!activeSessions.has(sessionId)) {
fs.renameSync(
path.join(sessionsDir, cp),
path.join(archiveDir, cp),
);
archivedCheckpoints++;
}
}
// 4. Archive .deleted and .reset files
const staleFiles = allFiles.filter(
(f) => f.includes('.deleted.') || f.includes('.reset.'),
);
for (const sf of staleFiles) {
fs.renameSync(path.join(sessionsDir, sf), path.join(archiveDir, sf));
}
// 5. Compress archive (if there are .jsonl files)
const archiveFiles = (fs.readdirSync(archiveDir) as string[]).filter(
(f) => f.endsWith('.jsonl'),
);
if (archiveFiles.length > 0) {
const timestamp = new Date().toISOString().replace(/[:.]/g, '-');
execSync(
`cd ${archiveDir} && tar czf sessions-${timestamp}.tar.gz *.jsonl && rm -f *.jsonl`,
{ timeout: 30000 },
);
}
console.log(
`[survival] Archived ${archivedCheckpoints} checkpoints + ${staleFiles.length} stale files`,
);
} catch (err) {
console.error('[survival] Failed to archive sessions:', err);
}
break;
}
case 'gc-vitals': {
// Handled at runPulse level with store access
// This effect is processed in runPulse layer, not here
-2
View File
@@ -6,8 +6,6 @@
/** Critical process whitelist — hardcoded, agent cannot change */
export const ESSENTIAL_PROCESSES = new Set([
'openclaw', // OC Gateway
'litellm', // LLM proxy
'upulse', // Pulse itself
'sshd', // SSH rescue channel
'systemd', // System
+4 -116
View File
@@ -7,7 +7,6 @@ import { MAX_RESTART_COUNT, ROLLBACK_ERROR_THRESHOLD } from './constants.js';
import {
autoRollback,
errorEscalate,
gatewayHealthGuard,
llmWatchdog,
networkWatchdog,
panicRollback,
@@ -114,9 +113,8 @@ describe('Survival Rules', () => {
const curr = {
processes: {
data: {
// Record<string, boolean>: name → alive
processes: {
'openclaw-service': false,
'upulse-daemon': false,
'other-service': true,
},
},
@@ -130,7 +128,7 @@ describe('Survival Rules', () => {
expect(effects).toContainEqual({
type: 'restart-service',
service: 'openclaw',
service: 'upulse',
});
expect(tickMs).toBe(10000);
expect(mockInner).not.toHaveBeenCalled(); // bypassed due to dead process
@@ -141,13 +139,13 @@ describe('Survival Rules', () => {
processes: {
data: {
processes: {
'openclaw-service': false,
'upulse-daemon': false,
},
},
},
health: {
lastRestart: {
openclaw: { count: MAX_RESTART_COUNT, ts: Date.now() },
upulse: { count: MAX_RESTART_COUNT, ts: Date.now() },
},
},
};
@@ -166,8 +164,6 @@ describe('Survival Rules', () => {
processes: {
data: {
processes: {
'openclaw-service': true,
'litellm-service': true,
'upulse-daemon': true,
sshd: true,
systemd: true,
@@ -404,112 +400,4 @@ describe('Survival Rules', () => {
expect(mockInner).toHaveBeenCalledWith({}, curr);
});
});
describe('gatewayHealthGuard', () => {
test('should archive-sessions when zombies detected', async () => {
const curr = {
gatewayHealth: {
data: {
memoryMb: 800,
totalSessions: 10,
staleCheckpoints: 5,
sessionsDirMb: 50,
zombieSessions: ['z1', 'z2'],
},
},
};
mockInner.mockResolvedValueOnce([[], 15000]);
const [effects, tickMs] = await gatewayHealthGuard({}, curr, mockInner);
expect(effects).toContainEqual({ type: 'archive-sessions' });
expect(tickMs).toBe(15000);
expect(mockInner).toHaveBeenCalledWith({}, curr);
});
test('should archive when staleCheckpoints > 10', async () => {
const curr = {
gatewayHealth: {
data: {
memoryMb: 500,
totalSessions: 10,
staleCheckpoints: 15,
sessionsDirMb: 50,
zombieSessions: [],
},
},
};
mockInner.mockResolvedValueOnce([[], 15000]);
const [effects] = await gatewayHealthGuard({}, curr, mockInner);
expect(effects).toContainEqual({ type: 'archive-sessions' });
expect(mockInner).toHaveBeenCalled();
});
test('should archive when sessionsDirMb > 100', async () => {
const curr = {
gatewayHealth: {
data: {
memoryMb: 500,
totalSessions: 10,
staleCheckpoints: 3,
sessionsDirMb: 200,
zombieSessions: [],
},
},
};
mockInner.mockResolvedValueOnce([[], 15000]);
const [effects] = await gatewayHealthGuard({}, curr, mockInner);
expect(effects).toContainEqual({ type: 'archive-sessions' });
});
test('should bypass inner when memory > 2000 MB (crisis)', async () => {
const curr = {
gatewayHealth: {
data: {
memoryMb: 2200,
totalSessions: 20,
staleCheckpoints: 30,
sessionsDirMb: 300,
zombieSessions: ['z1'],
},
},
};
const [effects, tickMs] = await gatewayHealthGuard({}, curr, mockInner);
expect(effects).toEqual(
expect.arrayContaining([
{ type: 'archive-sessions' },
{ type: 'restart-service', service: 'openclaw' },
]),
);
expect(tickMs).toBe(30000);
expect(mockInner).not.toHaveBeenCalled();
});
test('should pass through when gateway is healthy', async () => {
const curr = {
gatewayHealth: {
data: {
memoryMb: 500,
totalSessions: 5,
staleCheckpoints: 3,
sessionsDirMb: 50,
zombieSessions: [],
},
},
};
await gatewayHealthGuard({}, curr, mockInner);
expect(mockInner).toHaveBeenCalledWith({}, curr);
});
test('should pass through when no gateway data', async () => {
const curr = {};
await gatewayHealthGuard({}, curr, mockInner);
expect(mockInner).toHaveBeenCalledWith({}, curr);
});
});
});
+7 -47
View File
@@ -8,8 +8,6 @@
import type { SurvivalEffect } from '../executors/survival.js';
import type { Rule, Sensed } from '../index.js';
import type { ErrorLogData } from '../watchers/error-log.js';
import type { GatewayHealthData } from '../watchers/gateway-health.js';
import type { LlmHealthData } from '../watchers/llm-health.js';
import type { NetworkData } from '../watchers/network.js';
import type { ProcessAliveData } from '../watchers/process-alive.js';
import type { SystemResourceData } from '../watchers/system-resource.js';
@@ -21,6 +19,12 @@ import {
} from './constants.js';
import type { HealthSnapshot } from './health.js';
/** Minimal LLM health shape used by llmWatchdog (full type lives in @uncaged/pulse-openclaw) */
interface LlmHealthLike {
processOk: boolean;
completionOk?: boolean;
}
// Define proper type for survival snapshot
export interface SurvivalSnapshot {
timestamp: number;
@@ -28,8 +32,7 @@ export interface SurvivalSnapshot {
processes?: Sensed<ProcessAliveData>;
network?: Sensed<NetworkData>;
errorLog?: Sensed<ErrorLogData>;
llm?: Sensed<LlmHealthData>;
gatewayHealth?: Sensed<GatewayHealthData>;
llm?: Sensed<LlmHealthLike>;
health?: HealthSnapshot;
}
@@ -189,48 +192,6 @@ export const resourceGuard: Rule<SurvivalSnapshot, SurvivalEffect> = async (
return await inner(prev, curr);
};
/**
* Gateway health guard - handle Gateway memory bloat and session accumulation
*/
export const gatewayHealthGuard: Rule<
SurvivalSnapshot,
SurvivalEffect
> = async (prev, curr, inner) => {
const gateway = curr.gatewayHealth?.data;
if (!gateway) {
return await inner(prev, curr);
}
const effects: SurvivalEffect[] = [];
// Check for session problems
if (
gateway.zombieSessions?.length > 0 ||
gateway.staleCheckpoints > 10 ||
gateway.sessionsDirMb > 100
) {
effects.push({ type: 'archive-sessions' });
}
// Gateway process memory > 2 GB: archive + restart, bypass inner
if (gateway.memoryMb > 2000) {
effects.push(
{ type: 'archive-sessions' },
{ type: 'restart-service', service: 'openclaw' },
);
return [effects, 30000]; // bypass during memory crisis
}
if (effects.length > 0) {
// Session problems but not critical — don't bypass
const [innerEffects, tickMs] = await inner(prev, curr);
return [effects.concat(innerEffects), tickMs];
}
// Pass through to inner layers
return await inner(prev, curr);
};
/**
* LLM watchdog - monitor LLM service health
*/
@@ -331,7 +292,6 @@ export const survivalRules = [
autoRollback,
processWatchdog,
resourceGuard,
gatewayHealthGuard,
llmWatchdog,
networkWatchdog,
errorEscalate,
-10
View File
@@ -3,16 +3,6 @@ export {
type ErrorLogOptions,
errorLogWatcher,
} from './error-log.js';
export {
type GatewayHealthData,
type GatewayHealthOptions,
gatewayHealthWatcher,
} from './gateway-health.js';
export {
type LlmHealthData,
type LlmHealthOptions,
llmHealthWatcher,
} from './llm-health.js';
export {
type NetworkData,
type NetworkOptions,