diff --git a/packages/pulse/src/index.ts b/packages/pulse/src/index.ts index 10f074b..c346853 100644 --- a/packages/pulse/src/index.ts +++ b/packages/pulse/src/index.ts @@ -100,7 +100,13 @@ export function findEffectiveEpoch(store: PulseStore): EventRecord | null { const rollback = store.getLatest('rollback'); if (rollback) { // rollback.meta should contain { to: 'v1' } — the code_rev to roll back to - const meta = rollback.meta ? JSON.parse(rollback.meta) : {}; + let meta: Record = {}; + try { + meta = rollback.meta ? JSON.parse(rollback.meta) : {}; + } catch { + // Corrupted meta — skip this rollback event, fall through to latest promote + return store.getLatest('promote'); + } const targetRev = meta.to || rollback.codeRev; if (targetRev) { return store.getLatestWhere({ kind: 'promote', codeRev: targetRev }); @@ -122,6 +128,7 @@ export function rebuildSnapshot( epoch?: EventRecord | null, ): S { const snapshot: Record = { timestamp: Date.now() }; + const casMisses: string[] = []; for (const key of senseKeys) { // Priority 1: read latest from vitals table const latestVital = store.getLatestVital(key); @@ -134,6 +141,8 @@ export function rebuildSnapshot( } as Sensed; continue; } + // CAS object missing — track it + casMisses.push(key); } // Priority 2: fallback to events table (migrate/init events) @@ -208,6 +217,10 @@ export function rebuildSnapshot( } } } + // Surface CAS misses so rules can detect data integrity issues + if (casMisses.length > 0) { + snapshot['_error:cas_miss'] = { keys: casMisses, count: casMisses.length }; + } return snapshot as S; } diff --git a/packages/pulse/src/store.test.ts b/packages/pulse/src/store.test.ts index b729e24..e199304 100644 --- a/packages/pulse/src/store.test.ts +++ b/packages/pulse/src/store.test.ts @@ -310,7 +310,7 @@ describe('createStore (events table + CAS)', () => { const hash = store.putObject(data); expect(typeof hash === 'string').toBeTruthy(); - expect(hash.length).toBe(16); + expect(hash.length).toBe(32); // File should exist on disk const filePath = join(objectsDir, `${hash}.json`); diff --git a/packages/pulse/src/store.ts b/packages/pulse/src/store.ts index 37686db..e0cae68 100644 --- a/packages/pulse/src/store.ts +++ b/packages/pulse/src/store.ts @@ -128,7 +128,7 @@ function hashObject(data: unknown): string { return createHash('sha256') .update(JSON.stringify(data)) .digest('hex') - .slice(0, 16); // 16 hex chars = 64 bits, sufficient for dedup + .slice(0, 32); // 32 hex chars = 128 bits, safe against birthday collisions } // ── Schema ───────────────────────────────────────────────────── diff --git a/packages/pulse/src/survival/executors.test.ts b/packages/pulse/src/survival/executors.test.ts index fffcb44..2c4ae43 100644 --- a/packages/pulse/src/survival/executors.test.ts +++ b/packages/pulse/src/survival/executors.test.ts @@ -3,10 +3,13 @@ */ import { beforeEach, describe, expect, jest, test } from 'bun:test'; +import * as os from 'node:os'; +import * as path from 'node:path'; import { executeSurvivalEffect, type SurvivalExecDeps } from './executors'; // All mocks via dependency injection — NO global jest.mock to avoid cross-file pollution const mockExecSync = jest.fn(); +const mockExecFileSync = jest.fn(); const mockFetch = jest.fn(); const mockExistsSync = jest.fn(() => true); @@ -26,6 +29,7 @@ const mockFs = { const testDeps: SurvivalExecDeps = { fs: mockFs, execSyncFn: mockExecSync, + execFileSyncFn: mockExecFileSync, }; // Mock global fetch @@ -38,21 +42,37 @@ describe('executeSurvivalEffect', () => { mockReaddirSync.mockReturnValue([]); }); - test('should restart service', async () => { + test('should restart service using execFileSync', async () => { const effect = { type: 'restart-service', service: 'openclaw' }; await executeSurvivalEffect(effect, testDeps); - expect(mockExecSync).toHaveBeenCalledWith('systemctl restart openclaw', { - timeout: 30000, - }); + expect(mockExecFileSync).toHaveBeenCalledWith( + 'systemctl', + ['restart', 'openclaw'], + { timeout: 30000 }, + ); + }); + + test('should reject unsafe service name', async () => { + const effect = { type: 'restart-service', service: 'openclaw; rm -rf /' }; + const consoleSpy = jest.spyOn(console, 'error').mockImplementation(); + + await executeSurvivalEffect(effect, testDeps); + + expect(mockExecFileSync).not.toHaveBeenCalled(); + expect(consoleSpy).toHaveBeenCalledWith( + expect.stringContaining('Rejected unsafe service name'), + ); + + consoleSpy.mockRestore(); }); test('should handle restart service failure gracefully', async () => { const effect = { type: 'restart-service', service: 'openclaw' }; const consoleSpy = jest.spyOn(console, 'error').mockImplementation(); - mockExecSync.mockImplementationOnce(() => { + mockExecFileSync.mockImplementationOnce(() => { throw new Error('Service not found'); }); @@ -159,7 +179,7 @@ describe('executeSurvivalEffect', () => { await executeSurvivalEffect(effect, testDeps); - // Should run tar + rm + // Should run tar + rm via execSync (archive compression uses shell) expect(mockExecSync).toHaveBeenCalledWith( expect.stringMatching( /^cd \/test\/sessions\/archive && tar czf sessions-.*\.tar\.gz \*\.jsonl && rm -f \*\.jsonl$/, @@ -208,7 +228,7 @@ describe('executeSurvivalEffect', () => { consoleSpy.mockRestore(); }); - // ── other effects (unchanged from original) ─────────────── + // ── other effects ───────────────────────────────────────── test('should handle gc-vitals as no-op', async () => { const effect = { type: 'gc-vitals' }; @@ -216,55 +236,81 @@ describe('executeSurvivalEffect', () => { await executeSurvivalEffect(effect, testDeps); expect(mockExecSync).not.toHaveBeenCalled(); + expect(mockExecFileSync).not.toHaveBeenCalled(); }); - test('should clear journal cache', async () => { + test('should clear journal cache using execFileSync', async () => { const effect = { type: 'clear-cache', target: 'journal' }; await executeSurvivalEffect(effect, testDeps); - expect(mockExecSync).toHaveBeenCalledWith('journalctl --vacuum-size=100M', { - timeout: 30000, - }); + expect(mockExecFileSync).toHaveBeenCalledWith( + 'journalctl', + ['--vacuum-size=100M'], + { timeout: 30000 }, + ); }); - test('should clear tmp cache', async () => { + test('should clear tmp cache using execFileSync', async () => { const effect = { type: 'clear-cache', target: 'tmp' }; await executeSurvivalEffect(effect, testDeps); - expect(mockExecSync).toHaveBeenCalledWith( - 'find /tmp -type f -mtime +1 -delete 2>/dev/null || true', - { timeout: 30000 }, + expect(mockExecFileSync).toHaveBeenCalledWith( + 'find', + ['/tmp', '-type', 'f', '-mtime', '+1', '-delete'], + { timeout: 30000, stdio: 'ignore' }, ); }); - test('should rollback code', async () => { + test('should rollback code using execFileSync', async () => { const effect = { type: 'rollback-code', to: 'abc123' }; await executeSurvivalEffect(effect, testDeps); - expect(mockExecSync).toHaveBeenCalledWith('upulse rollback abc123', { - timeout: 30000, - }); + expect(mockExecFileSync).toHaveBeenCalledWith( + 'git', + ['checkout', 'abc123'], + { + cwd: path.join(os.homedir(), '.upulse', 'engine'), + timeout: 30000, + }, + ); }); - test('should rollback config for all services', async () => { + test('should reject unsafe git ref', async () => { + const effect = { type: 'rollback-code', to: 'abc; rm -rf /' }; + const consoleSpy = jest.spyOn(console, 'error').mockImplementation(); + + await executeSurvivalEffect(effect, testDeps); + + expect(mockExecFileSync).not.toHaveBeenCalled(); + expect(consoleSpy).toHaveBeenCalledWith( + expect.stringContaining('Rejected unsafe git ref'), + ); + + consoleSpy.mockRestore(); + }); + + test('should rollback config for all services using execFileSync', async () => { const effect = { type: 'rollback-config' }; await executeSurvivalEffect(effect, testDeps); - expect(mockExecSync).toHaveBeenCalledTimes(3); - expect(mockExecSync).toHaveBeenCalledWith( - 'cp ~/.upulse/config.json.bak ~/.upulse/config.json 2>/dev/null && systemctl restart pulse', - { timeout: 30000 }, + // 3 configs × 2 calls each (cp + systemctl restart) = 6 calls + expect(mockExecFileSync).toHaveBeenCalledTimes(6); + // Check pulse config + expect(mockExecFileSync).toHaveBeenCalledWith( + 'cp', + [ + `${path.join(os.homedir(), '.upulse', 'config.json')}.bak`, + path.join(os.homedir(), '.upulse', 'config.json'), + ], + { timeout: 10000, stdio: 'ignore' }, ); - expect(mockExecSync).toHaveBeenCalledWith( - 'cp /etc/litellm/config.yaml.bak /etc/litellm/config.yaml 2>/dev/null && systemctl restart litellm', - { timeout: 30000 }, - ); - expect(mockExecSync).toHaveBeenCalledWith( - 'cp ~/.openclaw/openclaw.json.bak ~/.openclaw/openclaw.json 2>/dev/null && systemctl restart openclaw', + expect(mockExecFileSync).toHaveBeenCalledWith( + 'systemctl', + ['restart', 'pulse'], { timeout: 30000 }, ); }); @@ -309,6 +355,7 @@ describe('executeSurvivalEffect', () => { executeSurvivalEffect(effect, testDeps), ).resolves.toBeUndefined(); expect(mockExecSync).not.toHaveBeenCalled(); + expect(mockExecFileSync).not.toHaveBeenCalled(); expect(mockFetch).not.toHaveBeenCalled(); }); }); diff --git a/packages/pulse/src/survival/executors.ts b/packages/pulse/src/survival/executors.ts index be1a134..03f4806 100644 --- a/packages/pulse/src/survival/executors.ts +++ b/packages/pulse/src/survival/executors.ts @@ -4,11 +4,17 @@ * Execute survival effects - all deterministic local commands. */ -import { execSync as defaultExecSync } from 'node:child_process'; +import { + execFileSync as defaultExecFileSync, + execSync as defaultExecSync, +} from 'node:child_process'; import * as defaultFs from 'node:fs'; import * as os from 'node:os'; import * as path from 'node:path'; +/** Allowlist pattern for safe shell arguments (service names, git refs, etc.) */ +const SAFE_ARG = /^[a-zA-Z0-9._\/@:-]+$/; + export interface SurvivalEffect { type: string; [key: string]: unknown; @@ -18,6 +24,7 @@ export interface SurvivalEffect { export interface SurvivalExecDeps { fs?: typeof defaultFs; execSyncFn?: typeof defaultExecSync; + execFileSyncFn?: typeof defaultExecFileSync; } /** @@ -29,12 +36,17 @@ export async function executeSurvivalEffect( ): Promise { const fs = deps?.fs ?? defaultFs; const execSync = deps?.execSyncFn ?? defaultExecSync; + const execFileSync = deps?.execFileSyncFn ?? defaultExecFileSync; switch (effect.type) { case 'restart-service': { const service = effect.service as string; + if (!SAFE_ARG.test(service)) { + console.error(`[survival] Rejected unsafe service name: ${service}`); + break; + } try { - execSync(`systemctl restart ${service}`, { timeout: 30000 }); + execFileSync('systemctl', ['restart', service], { timeout: 30000 }); } catch (err) { console.error(`[survival] Failed to restart ${service}:`, err); } @@ -118,11 +130,15 @@ export async function executeSurvivalEffect( const target = effect.target as string; try { if (target === 'journal') { - execSync('journalctl --vacuum-size=100M', { timeout: 30000 }); - } else if (target === 'tmp') { - execSync('find /tmp -type f -mtime +1 -delete 2>/dev/null || true', { + execFileSync('journalctl', ['--vacuum-size=100M'], { timeout: 30000, }); + } else if (target === 'tmp') { + execFileSync( + 'find', + ['/tmp', '-type', 'f', '-mtime', '+1', '-delete'], + { timeout: 30000, stdio: 'ignore' }, + ); } } catch (err) { console.error(`[survival] Failed to clear ${target}:`, err); @@ -132,8 +148,15 @@ export async function executeSurvivalEffect( case 'rollback-code': { const to = effect.to as string; + if (!SAFE_ARG.test(to)) { + console.error(`[survival] Rejected unsafe git ref: ${to}`); + break; + } try { - execSync(`upulse rollback ${to}`, { timeout: 30000 }); + execFileSync('git', ['checkout', to], { + cwd: path.join(os.homedir(), '.upulse', 'engine'), + timeout: 30000, + }); } catch (err) { console.error(`[survival] Failed to rollback to ${to}:`, err); } @@ -142,16 +165,18 @@ export async function executeSurvivalEffect( case 'rollback-config': { // Three-layer rollback: Pulse → LiteLLM → OC Gateway - for (const [name, bakPath] of [ - ['pulse', '~/.upulse/config.json'], + const configs: Array<[string, string]> = [ + ['pulse', path.join(os.homedir(), '.upulse', 'config.json')], ['litellm', '/etc/litellm/config.yaml'], - ['openclaw', '~/.openclaw/openclaw.json'], - ]) { + ['openclaw', path.join(os.homedir(), '.openclaw', 'openclaw.json')], + ]; + for (const [name, configPath] of configs) { try { - execSync( - `cp ${bakPath}.bak ${bakPath} 2>/dev/null && systemctl restart ${name}`, - { timeout: 30000 }, - ); + execFileSync('cp', [`${configPath}.bak`, configPath], { + timeout: 10000, + stdio: 'ignore', + }); + execFileSync('systemctl', ['restart', name], { timeout: 30000 }); } catch {} } break; diff --git a/packages/pulse/src/watcher.ts b/packages/pulse/src/watcher.ts index ba1409b..0041a38 100644 --- a/packages/pulse/src/watcher.ts +++ b/packages/pulse/src/watcher.ts @@ -122,6 +122,19 @@ export function startWatcher( } } catch (err) { console.error(`[watcher:${def.name}] error during tick:`, err); + // Surface collection failure as a vital so rules can detect it + try { + store.appendVital({ + occurredAt: Date.now(), + key: `_error:${def.key}`, + hash: store.putObject({ + error: err instanceof Error ? err.message : String(err), + watcher: def.name, + }), + }); + } catch { + // Best-effort: if even error recording fails, just log + } } } }