fix: address review BLOCKER + 4 MAJORs from 凌瞰
BLOCKER:
- survival/executors.ts: replace execSync string interpolation with
execFileSync for restart-service, clear-cache, rollback-code,
rollback-config. Add SAFE_ARG whitelist validation. Preserve DI
interface by adding execFileSyncFn to SurvivalExecDeps.
MAJOR fixes:
1. store.ts: hashObject 64→128 bits (birthday collision safe)
2. index.ts/findEffectiveEpoch: try-catch on JSON.parse
3. index.ts/rebuildSnapshot: track CAS misses as _error:cas_miss
4. watcher.ts: write _error:{key} vital on collection failure
Tests: 235 pass, 0 fail (including new unsafe input rejection tests)
Ref: oc-xiaoju/pulse#11
This commit is contained in:
@@ -100,7 +100,13 @@ export function findEffectiveEpoch(store: PulseStore): EventRecord | null {
|
||||
const rollback = store.getLatest('rollback');
|
||||
if (rollback) {
|
||||
// rollback.meta should contain { to: 'v1' } — the code_rev to roll back to
|
||||
const meta = rollback.meta ? JSON.parse(rollback.meta) : {};
|
||||
let meta: Record<string, unknown> = {};
|
||||
try {
|
||||
meta = rollback.meta ? JSON.parse(rollback.meta) : {};
|
||||
} catch {
|
||||
// Corrupted meta — skip this rollback event, fall through to latest promote
|
||||
return store.getLatest('promote');
|
||||
}
|
||||
const targetRev = meta.to || rollback.codeRev;
|
||||
if (targetRev) {
|
||||
return store.getLatestWhere({ kind: 'promote', codeRev: targetRev });
|
||||
@@ -122,6 +128,7 @@ export function rebuildSnapshot<S extends { timestamp: number }>(
|
||||
epoch?: EventRecord | null,
|
||||
): S {
|
||||
const snapshot: Record<string, unknown> = { timestamp: Date.now() };
|
||||
const casMisses: string[] = [];
|
||||
for (const key of senseKeys) {
|
||||
// Priority 1: read latest from vitals table
|
||||
const latestVital = store.getLatestVital(key);
|
||||
@@ -134,6 +141,8 @@ export function rebuildSnapshot<S extends { timestamp: number }>(
|
||||
} as Sensed<unknown>;
|
||||
continue;
|
||||
}
|
||||
// CAS object missing — track it
|
||||
casMisses.push(key);
|
||||
}
|
||||
|
||||
// Priority 2: fallback to events table (migrate/init events)
|
||||
@@ -208,6 +217,10 @@ export function rebuildSnapshot<S extends { timestamp: number }>(
|
||||
}
|
||||
}
|
||||
}
|
||||
// Surface CAS misses so rules can detect data integrity issues
|
||||
if (casMisses.length > 0) {
|
||||
snapshot['_error:cas_miss'] = { keys: casMisses, count: casMisses.length };
|
||||
}
|
||||
return snapshot as S;
|
||||
}
|
||||
|
||||
|
||||
@@ -310,7 +310,7 @@ describe('createStore (events table + CAS)', () => {
|
||||
const hash = store.putObject(data);
|
||||
|
||||
expect(typeof hash === 'string').toBeTruthy();
|
||||
expect(hash.length).toBe(16);
|
||||
expect(hash.length).toBe(32);
|
||||
|
||||
// File should exist on disk
|
||||
const filePath = join(objectsDir, `${hash}.json`);
|
||||
|
||||
@@ -128,7 +128,7 @@ function hashObject(data: unknown): string {
|
||||
return createHash('sha256')
|
||||
.update(JSON.stringify(data))
|
||||
.digest('hex')
|
||||
.slice(0, 16); // 16 hex chars = 64 bits, sufficient for dedup
|
||||
.slice(0, 32); // 32 hex chars = 128 bits, safe against birthday collisions
|
||||
}
|
||||
|
||||
// ── Schema ─────────────────────────────────────────────────────
|
||||
|
||||
@@ -3,10 +3,13 @@
|
||||
*/
|
||||
|
||||
import { beforeEach, describe, expect, jest, test } from 'bun:test';
|
||||
import * as os from 'node:os';
|
||||
import * as path from 'node:path';
|
||||
import { executeSurvivalEffect, type SurvivalExecDeps } from './executors';
|
||||
|
||||
// All mocks via dependency injection — NO global jest.mock to avoid cross-file pollution
|
||||
const mockExecSync = jest.fn();
|
||||
const mockExecFileSync = jest.fn();
|
||||
const mockFetch = jest.fn();
|
||||
|
||||
const mockExistsSync = jest.fn(() => true);
|
||||
@@ -26,6 +29,7 @@ const mockFs = {
|
||||
const testDeps: SurvivalExecDeps = {
|
||||
fs: mockFs,
|
||||
execSyncFn: mockExecSync,
|
||||
execFileSyncFn: mockExecFileSync,
|
||||
};
|
||||
|
||||
// Mock global fetch
|
||||
@@ -38,21 +42,37 @@ describe('executeSurvivalEffect', () => {
|
||||
mockReaddirSync.mockReturnValue([]);
|
||||
});
|
||||
|
||||
test('should restart service', async () => {
|
||||
test('should restart service using execFileSync', async () => {
|
||||
const effect = { type: 'restart-service', service: 'openclaw' };
|
||||
|
||||
await executeSurvivalEffect(effect, testDeps);
|
||||
|
||||
expect(mockExecSync).toHaveBeenCalledWith('systemctl restart openclaw', {
|
||||
timeout: 30000,
|
||||
});
|
||||
expect(mockExecFileSync).toHaveBeenCalledWith(
|
||||
'systemctl',
|
||||
['restart', 'openclaw'],
|
||||
{ timeout: 30000 },
|
||||
);
|
||||
});
|
||||
|
||||
test('should reject unsafe service name', async () => {
|
||||
const effect = { type: 'restart-service', service: 'openclaw; rm -rf /' };
|
||||
const consoleSpy = jest.spyOn(console, 'error').mockImplementation();
|
||||
|
||||
await executeSurvivalEffect(effect, testDeps);
|
||||
|
||||
expect(mockExecFileSync).not.toHaveBeenCalled();
|
||||
expect(consoleSpy).toHaveBeenCalledWith(
|
||||
expect.stringContaining('Rejected unsafe service name'),
|
||||
);
|
||||
|
||||
consoleSpy.mockRestore();
|
||||
});
|
||||
|
||||
test('should handle restart service failure gracefully', async () => {
|
||||
const effect = { type: 'restart-service', service: 'openclaw' };
|
||||
const consoleSpy = jest.spyOn(console, 'error').mockImplementation();
|
||||
|
||||
mockExecSync.mockImplementationOnce(() => {
|
||||
mockExecFileSync.mockImplementationOnce(() => {
|
||||
throw new Error('Service not found');
|
||||
});
|
||||
|
||||
@@ -159,7 +179,7 @@ describe('executeSurvivalEffect', () => {
|
||||
|
||||
await executeSurvivalEffect(effect, testDeps);
|
||||
|
||||
// Should run tar + rm
|
||||
// Should run tar + rm via execSync (archive compression uses shell)
|
||||
expect(mockExecSync).toHaveBeenCalledWith(
|
||||
expect.stringMatching(
|
||||
/^cd \/test\/sessions\/archive && tar czf sessions-.*\.tar\.gz \*\.jsonl && rm -f \*\.jsonl$/,
|
||||
@@ -208,7 +228,7 @@ describe('executeSurvivalEffect', () => {
|
||||
consoleSpy.mockRestore();
|
||||
});
|
||||
|
||||
// ── other effects (unchanged from original) ───────────────
|
||||
// ── other effects ─────────────────────────────────────────
|
||||
|
||||
test('should handle gc-vitals as no-op', async () => {
|
||||
const effect = { type: 'gc-vitals' };
|
||||
@@ -216,55 +236,81 @@ describe('executeSurvivalEffect', () => {
|
||||
await executeSurvivalEffect(effect, testDeps);
|
||||
|
||||
expect(mockExecSync).not.toHaveBeenCalled();
|
||||
expect(mockExecFileSync).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
test('should clear journal cache', async () => {
|
||||
test('should clear journal cache using execFileSync', async () => {
|
||||
const effect = { type: 'clear-cache', target: 'journal' };
|
||||
|
||||
await executeSurvivalEffect(effect, testDeps);
|
||||
|
||||
expect(mockExecSync).toHaveBeenCalledWith('journalctl --vacuum-size=100M', {
|
||||
timeout: 30000,
|
||||
});
|
||||
expect(mockExecFileSync).toHaveBeenCalledWith(
|
||||
'journalctl',
|
||||
['--vacuum-size=100M'],
|
||||
{ timeout: 30000 },
|
||||
);
|
||||
});
|
||||
|
||||
test('should clear tmp cache', async () => {
|
||||
test('should clear tmp cache using execFileSync', async () => {
|
||||
const effect = { type: 'clear-cache', target: 'tmp' };
|
||||
|
||||
await executeSurvivalEffect(effect, testDeps);
|
||||
|
||||
expect(mockExecSync).toHaveBeenCalledWith(
|
||||
'find /tmp -type f -mtime +1 -delete 2>/dev/null || true',
|
||||
{ timeout: 30000 },
|
||||
expect(mockExecFileSync).toHaveBeenCalledWith(
|
||||
'find',
|
||||
['/tmp', '-type', 'f', '-mtime', '+1', '-delete'],
|
||||
{ timeout: 30000, stdio: 'ignore' },
|
||||
);
|
||||
});
|
||||
|
||||
test('should rollback code', async () => {
|
||||
test('should rollback code using execFileSync', async () => {
|
||||
const effect = { type: 'rollback-code', to: 'abc123' };
|
||||
|
||||
await executeSurvivalEffect(effect, testDeps);
|
||||
|
||||
expect(mockExecSync).toHaveBeenCalledWith('upulse rollback abc123', {
|
||||
timeout: 30000,
|
||||
});
|
||||
expect(mockExecFileSync).toHaveBeenCalledWith(
|
||||
'git',
|
||||
['checkout', 'abc123'],
|
||||
{
|
||||
cwd: path.join(os.homedir(), '.upulse', 'engine'),
|
||||
timeout: 30000,
|
||||
},
|
||||
);
|
||||
});
|
||||
|
||||
test('should rollback config for all services', async () => {
|
||||
test('should reject unsafe git ref', async () => {
|
||||
const effect = { type: 'rollback-code', to: 'abc; rm -rf /' };
|
||||
const consoleSpy = jest.spyOn(console, 'error').mockImplementation();
|
||||
|
||||
await executeSurvivalEffect(effect, testDeps);
|
||||
|
||||
expect(mockExecFileSync).not.toHaveBeenCalled();
|
||||
expect(consoleSpy).toHaveBeenCalledWith(
|
||||
expect.stringContaining('Rejected unsafe git ref'),
|
||||
);
|
||||
|
||||
consoleSpy.mockRestore();
|
||||
});
|
||||
|
||||
test('should rollback config for all services using execFileSync', async () => {
|
||||
const effect = { type: 'rollback-config' };
|
||||
|
||||
await executeSurvivalEffect(effect, testDeps);
|
||||
|
||||
expect(mockExecSync).toHaveBeenCalledTimes(3);
|
||||
expect(mockExecSync).toHaveBeenCalledWith(
|
||||
'cp ~/.upulse/config.json.bak ~/.upulse/config.json 2>/dev/null && systemctl restart pulse',
|
||||
{ timeout: 30000 },
|
||||
// 3 configs × 2 calls each (cp + systemctl restart) = 6 calls
|
||||
expect(mockExecFileSync).toHaveBeenCalledTimes(6);
|
||||
// Check pulse config
|
||||
expect(mockExecFileSync).toHaveBeenCalledWith(
|
||||
'cp',
|
||||
[
|
||||
`${path.join(os.homedir(), '.upulse', 'config.json')}.bak`,
|
||||
path.join(os.homedir(), '.upulse', 'config.json'),
|
||||
],
|
||||
{ timeout: 10000, stdio: 'ignore' },
|
||||
);
|
||||
expect(mockExecSync).toHaveBeenCalledWith(
|
||||
'cp /etc/litellm/config.yaml.bak /etc/litellm/config.yaml 2>/dev/null && systemctl restart litellm',
|
||||
{ timeout: 30000 },
|
||||
);
|
||||
expect(mockExecSync).toHaveBeenCalledWith(
|
||||
'cp ~/.openclaw/openclaw.json.bak ~/.openclaw/openclaw.json 2>/dev/null && systemctl restart openclaw',
|
||||
expect(mockExecFileSync).toHaveBeenCalledWith(
|
||||
'systemctl',
|
||||
['restart', 'pulse'],
|
||||
{ timeout: 30000 },
|
||||
);
|
||||
});
|
||||
@@ -309,6 +355,7 @@ describe('executeSurvivalEffect', () => {
|
||||
executeSurvivalEffect(effect, testDeps),
|
||||
).resolves.toBeUndefined();
|
||||
expect(mockExecSync).not.toHaveBeenCalled();
|
||||
expect(mockExecFileSync).not.toHaveBeenCalled();
|
||||
expect(mockFetch).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -4,11 +4,17 @@
|
||||
* Execute survival effects - all deterministic local commands.
|
||||
*/
|
||||
|
||||
import { execSync as defaultExecSync } from 'node:child_process';
|
||||
import {
|
||||
execFileSync as defaultExecFileSync,
|
||||
execSync as defaultExecSync,
|
||||
} from 'node:child_process';
|
||||
import * as defaultFs from 'node:fs';
|
||||
import * as os from 'node:os';
|
||||
import * as path from 'node:path';
|
||||
|
||||
/** Allowlist pattern for safe shell arguments (service names, git refs, etc.) */
|
||||
const SAFE_ARG = /^[a-zA-Z0-9._\/@:-]+$/;
|
||||
|
||||
export interface SurvivalEffect {
|
||||
type: string;
|
||||
[key: string]: unknown;
|
||||
@@ -18,6 +24,7 @@ export interface SurvivalEffect {
|
||||
export interface SurvivalExecDeps {
|
||||
fs?: typeof defaultFs;
|
||||
execSyncFn?: typeof defaultExecSync;
|
||||
execFileSyncFn?: typeof defaultExecFileSync;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -29,12 +36,17 @@ export async function executeSurvivalEffect(
|
||||
): Promise<void> {
|
||||
const fs = deps?.fs ?? defaultFs;
|
||||
const execSync = deps?.execSyncFn ?? defaultExecSync;
|
||||
const execFileSync = deps?.execFileSyncFn ?? defaultExecFileSync;
|
||||
|
||||
switch (effect.type) {
|
||||
case 'restart-service': {
|
||||
const service = effect.service as string;
|
||||
if (!SAFE_ARG.test(service)) {
|
||||
console.error(`[survival] Rejected unsafe service name: ${service}`);
|
||||
break;
|
||||
}
|
||||
try {
|
||||
execSync(`systemctl restart ${service}`, { timeout: 30000 });
|
||||
execFileSync('systemctl', ['restart', service], { timeout: 30000 });
|
||||
} catch (err) {
|
||||
console.error(`[survival] Failed to restart ${service}:`, err);
|
||||
}
|
||||
@@ -118,11 +130,15 @@ export async function executeSurvivalEffect(
|
||||
const target = effect.target as string;
|
||||
try {
|
||||
if (target === 'journal') {
|
||||
execSync('journalctl --vacuum-size=100M', { timeout: 30000 });
|
||||
} else if (target === 'tmp') {
|
||||
execSync('find /tmp -type f -mtime +1 -delete 2>/dev/null || true', {
|
||||
execFileSync('journalctl', ['--vacuum-size=100M'], {
|
||||
timeout: 30000,
|
||||
});
|
||||
} else if (target === 'tmp') {
|
||||
execFileSync(
|
||||
'find',
|
||||
['/tmp', '-type', 'f', '-mtime', '+1', '-delete'],
|
||||
{ timeout: 30000, stdio: 'ignore' },
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(`[survival] Failed to clear ${target}:`, err);
|
||||
@@ -132,8 +148,15 @@ export async function executeSurvivalEffect(
|
||||
|
||||
case 'rollback-code': {
|
||||
const to = effect.to as string;
|
||||
if (!SAFE_ARG.test(to)) {
|
||||
console.error(`[survival] Rejected unsafe git ref: ${to}`);
|
||||
break;
|
||||
}
|
||||
try {
|
||||
execSync(`upulse rollback ${to}`, { timeout: 30000 });
|
||||
execFileSync('git', ['checkout', to], {
|
||||
cwd: path.join(os.homedir(), '.upulse', 'engine'),
|
||||
timeout: 30000,
|
||||
});
|
||||
} catch (err) {
|
||||
console.error(`[survival] Failed to rollback to ${to}:`, err);
|
||||
}
|
||||
@@ -142,16 +165,18 @@ export async function executeSurvivalEffect(
|
||||
|
||||
case 'rollback-config': {
|
||||
// Three-layer rollback: Pulse → LiteLLM → OC Gateway
|
||||
for (const [name, bakPath] of [
|
||||
['pulse', '~/.upulse/config.json'],
|
||||
const configs: Array<[string, string]> = [
|
||||
['pulse', path.join(os.homedir(), '.upulse', 'config.json')],
|
||||
['litellm', '/etc/litellm/config.yaml'],
|
||||
['openclaw', '~/.openclaw/openclaw.json'],
|
||||
]) {
|
||||
['openclaw', path.join(os.homedir(), '.openclaw', 'openclaw.json')],
|
||||
];
|
||||
for (const [name, configPath] of configs) {
|
||||
try {
|
||||
execSync(
|
||||
`cp ${bakPath}.bak ${bakPath} 2>/dev/null && systemctl restart ${name}`,
|
||||
{ timeout: 30000 },
|
||||
);
|
||||
execFileSync('cp', [`${configPath}.bak`, configPath], {
|
||||
timeout: 10000,
|
||||
stdio: 'ignore',
|
||||
});
|
||||
execFileSync('systemctl', ['restart', name], { timeout: 30000 });
|
||||
} catch {}
|
||||
}
|
||||
break;
|
||||
|
||||
@@ -122,6 +122,19 @@ export function startWatcher(
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(`[watcher:${def.name}] error during tick:`, err);
|
||||
// Surface collection failure as a vital so rules can detect it
|
||||
try {
|
||||
store.appendVital({
|
||||
occurredAt: Date.now(),
|
||||
key: `_error:${def.key}`,
|
||||
hash: store.putObject({
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
watcher: def.name,
|
||||
}),
|
||||
});
|
||||
} catch {
|
||||
// Best-effort: if even error recording fails, just log
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user