fix: resolve lint errors in test files for async refactor

- Make non-async helper functions async (triggerCoding, triggerWorkflow, writeVitalAndRebuild, writeEffectEvent, cleanup, trigger) - Fix bad await placement in object literals (await hash: -> hash: await) - Fix await in non-async arrow callbacks and await data: object property - Convert expect(async).toThrow() to rejects.toThrow() for async functions - Fix Promise.all pattern for async getObject calls in map - Remove stray await before return statements - Fix db.close() (sync bun:sqlite API) wrongly awaited - Auto-fix formatting issues via biome

Made-with: Cursor
This commit is contained in:
2026-04-18 03:01:24 +00:00
parent 6f753ae68e
commit 3313f29954
31 changed files with 438 additions and 189 deletions
+18 -11
View File
@@ -11,16 +11,21 @@
import { mkdirSync, unlinkSync, writeFileSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import type { PulseStore, TaskState, ProjectState, TaskType } from '@uncaged/pulse';
import type {
ProjectState,
PulseStore,
TaskState,
TaskType,
} from '@uncaged/pulse';
// ── Types ──────────────────────────────────────────────────────
/** New thin CursorCall interface */
export interface CursorCall {
repoDir: string; // 工作目录
prompt: string; // 前置 LLM 生成的简短指令
timeoutMs?: number; // 超时(默认 600_000)
readOnly?: boolean; // review/debug 场景不改代码(未来扩展,先预留)
repoDir: string; // 工作目录
prompt: string; // 前置 LLM 生成的简短指令
timeoutMs?: number; // 超时(默认 600_000)
readOnly?: boolean; // review/debug 场景不改代码(未来扩展,先预留)
}
/** Legacy CursorEffect for backward compatibility */
@@ -42,7 +47,7 @@ export interface CodingExecutorOptions {
apiKey?: string;
defaultTimeoutMs?: number;
tmpDir?: string;
workflowStore?: PulseStore; // Made optional for new thin executor
workflowStore?: PulseStore; // Made optional for new thin executor
}
// Legacy types kept for backward compatibility
@@ -171,7 +176,7 @@ export function writeTaskResponded(
/**
* @deprecated Legacy effect-based executor. Use executeCursorTask() directly.
*
*
* Creates a legacy executor that handles CursorEffect objects.
* Maintained for backward compatibility.
*/
@@ -179,7 +184,9 @@ export function createCursorExecutor(opts: CodingExecutorOptions) {
const { workflowStore } = opts;
if (!workflowStore) {
throw new Error('workflowStore is required for legacy createCursorExecutor');
throw new Error(
'workflowStore is required for legacy createCursorExecutor',
);
}
return async function executeLegacyCursorTask(
@@ -215,10 +222,10 @@ export function createCursorExecutor(opts: CodingExecutorOptions) {
};
const result = await executeCursorTask(call, opts);
// 5. Write task-responded event (legacy format)
writeLegacyTaskResponded(workflowStore, effect, result);
return result;
};
}
@@ -307,4 +314,4 @@ async function resolveApiKey(command: string): Promise<string> {
} catch {
throw new Error(`Failed to resolve CURSOR_API_KEY via: ${command}`);
}
}
}
+3 -3
View File
@@ -12,10 +12,10 @@ export {
type CodingExecutorOptions,
type CodingResult,
type CodingScenario,
type CursorCall, // New thin interface
type CursorCall, // New thin interface
type CursorEffect,
createCursorExecutor, // Legacy
executeCursorTask, // New thin executor
createCursorExecutor, // Legacy
executeCursorTask, // New thin executor
writeTaskResponded,
} from './cursor-agent.js';
// Rule
+11 -5
View File
@@ -31,8 +31,8 @@ const REPO_DIR = join(import.meta.dir, '../../../..');
const DATA_DIR = join(homedir(), '.upulse/scopes');
const ENGINE_DIR = join(homedir(), '.upulse/engine');
// Adaptive tick config
const BASE_TICK_MS = 5_000; // 5s when active
const MAX_TICK_MS = 300_000; // 5min when idle
const BASE_TICK_MS = 5_000; // 5s when active
const MAX_TICK_MS = 300_000; // 5min when idle
const BACKOFF_FACTOR = 2;
// Ensure dirs
@@ -99,7 +99,9 @@ console.log('🍊 Pulse Workflow Daemon started');
console.log(` Repo: ${REPO_DIR}`);
console.log(` Engine: ${ENGINE_DIR}`);
console.log(` Store: ${DATA_DIR}/workflows.db`);
console.log(` Tick: adaptive ${BASE_TICK_MS / 1000}s → ${MAX_TICK_MS / 1000}s (×${BACKOFF_FACTOR})`);
console.log(
` Tick: adaptive ${BASE_TICK_MS / 1000}s → ${MAX_TICK_MS / 1000}s (×${BACKOFF_FACTOR})`,
);
console.log(` Workflows: coding, meta`);
console.log('');
@@ -120,10 +122,14 @@ const logTick = async () => {
if (nowCount > lastEventCount) {
currentTickMs = BASE_TICK_MS; // reset to fast
lastEventCount = nowCount;
console.log(`[${ts}] tick done (${elapsed}ms) — active, next in ${currentTickMs / 1000}s`);
console.log(
`[${ts}] tick done (${elapsed}ms) — active, next in ${currentTickMs / 1000}s`,
);
} else {
currentTickMs = Math.min(currentTickMs * BACKOFF_FACTOR, MAX_TICK_MS);
console.log(`[${ts}] tick done (${elapsed}ms) — idle, next in ${currentTickMs / 1000}s`);
console.log(
`[${ts}] tick done (${elapsed}ms) — idle, next in ${currentTickMs / 1000}s`,
);
}
};
+3 -1
View File
@@ -342,7 +342,9 @@ describe('Definition Layer', () => {
// Verify all definitions can be retrieved
expect(await getObjectDef(db, 'Invoice', 'v1.2.0')).toEqual(objDef);
expect(await getEventDef(db, 'InvoiceCreated', 'v1.2.0')).toEqual(eventDef);
expect(await getProjectionDef(db, 'InvoiceSummary', 'v1.2.0')).toEqual(projDef);
expect(await getProjectionDef(db, 'InvoiceSummary', 'v1.2.0')).toEqual(
projDef,
);
// Verify listing works
const events = await listEventDefs(db, { codeRev: 'v1.2.0' });
+1 -1
View File
@@ -152,7 +152,7 @@ while (tickNum < 5) {
const allEvents = await reportStore.getAfter(0);
const rendererEvt = allEvents.find((e) => e.kind === 'report.renderer');
if (rendererEvt?.hash) {
const html = await reportStore.getObject(rendererEvt.hash) as string;
const html = (await reportStore.getObject(rendererEvt.hash)) as string;
const outPath = join(tmpDir, `report-${workflowKey}.html`);
writeFileSync(outPath, html, 'utf-8');
console.log(
@@ -31,7 +31,7 @@ describe('Council v2 E2E', () => {
if (tmpDir) rmSync(tmpDir, { recursive: true, force: true });
});
function triggerCoding(
async function triggerCoding(
topicId: string,
title: string,
description: string,
+26 -22
View File
@@ -48,11 +48,11 @@ function makeStores(tmpDir: string): {
const SENSE_KEYS = ['system', 'processes', 'network', 'errorLog', 'llm'];
function writeVitalAndRebuild(
async function writeVitalAndRebuild(
stores: { system: PulseStore; vitals: PulseStore },
key: string,
data: unknown,
): SurvivalSnapshot {
): Promise<SurvivalSnapshot> {
const hash = await stores.vitals.putObject(data);
await stores.vitals.appendEvent({
occurredAt: Date.now(),
@@ -60,8 +60,8 @@ function writeVitalAndRebuild(
key,
hash,
});
const snap = rebuildSnapshot<SurvivalSnapshot>(stores, SENSE_KEYS);
snap.health = rebuildHealth(stores.system);
const snap = await rebuildSnapshot<SurvivalSnapshot>(stores, SENSE_KEYS);
snap.health = await rebuildHealth(stores.system);
return snap;
}
@@ -93,7 +93,7 @@ describe('E2E survival chain', () => {
},
};
const curr = writeVitalAndRebuild(stores, 'processes', processData);
const curr = await writeVitalAndRebuild(stores, 'processes', processData);
const prev: SurvivalSnapshot = { timestamp: curr.timestamp - 1000 };
const pulse = composeRules<
@@ -128,7 +128,7 @@ describe('E2E survival chain', () => {
swapPct: 10,
};
const curr = writeVitalAndRebuild(stores, 'system', systemData);
const curr = await writeVitalAndRebuild(stores, 'system', systemData);
const prev: SurvivalSnapshot = { timestamp: curr.timestamp - 1000 };
const pulse = composeRules<
@@ -157,7 +157,7 @@ describe('E2E survival chain', () => {
swapPct: 0,
};
const curr = writeVitalAndRebuild(stores, 'system', systemData);
const curr = await writeVitalAndRebuild(stores, 'system', systemData);
const prev: SurvivalSnapshot = { timestamp: curr.timestamp - 1000 };
const pulse = composeRules<
@@ -185,7 +185,7 @@ describe('E2E survival chain', () => {
completionOk: false,
};
const curr = writeVitalAndRebuild(stores, 'llm', llmData);
const curr = await writeVitalAndRebuild(stores, 'llm', llmData);
const prev: SurvivalSnapshot = { timestamp: curr.timestamp - 1000 };
// Track whether inner was called
@@ -243,7 +243,7 @@ describe('E2E survival chain', () => {
const curr: SurvivalSnapshot = {
timestamp: now,
health: rebuildHealth(stores.system),
health: await rebuildHealth(stores.system),
};
const prev: SurvivalSnapshot = { timestamp: now - 1000 };
@@ -278,8 +278,8 @@ describe('E2E survival chain', () => {
},
};
const curr = writeVitalAndRebuild(stores, 'processes', processData);
curr.health = rebuildHealth(stores.system);
const curr = await writeVitalAndRebuild(stores, 'processes', processData);
curr.health = await rebuildHealth(stores.system);
const prev: SurvivalSnapshot = { timestamp: curr.timestamp - 1000 };
const pulse = composeRules<
@@ -362,8 +362,8 @@ describe('E2E survival chain', () => {
hash: h0,
});
const prev = rebuildSnapshot<SurvivalSnapshot>(stores, SENSE_KEYS);
prev.health = rebuildHealth(stores.system);
const prev = await rebuildSnapshot<SurvivalSnapshot>(stores, SENSE_KEYS);
prev.health = await rebuildHealth(stores.system);
let wakeTriggered = false;
const wakeTick = () => {
@@ -378,12 +378,16 @@ describe('E2E survival chain', () => {
hash: h1,
});
const latestVitals = stores.vitals
.queryByKind('vital', { key: 'system', limit: 12 })
.map((v) => ({
const vitalsRaw = await stores.vitals.queryByKind('vital', {
key: 'system',
limit: 12,
});
const latestVitals = await Promise.all(
vitalsRaw.map(async (v) => ({
...v,
await data: v.hash ? stores.vitals.getObject(v.hash) : null,
}));
data: v.hash ? await stores.vitals.getObject(v.hash) : null,
})),
);
const shouldWakeResult = latestVitals.some(
(v) => v.data && (v.data as { diskPct?: number }).diskPct! > 95,
);
@@ -391,8 +395,8 @@ describe('E2E survival chain', () => {
expect(wakeTriggered).toBe(true);
const curr = rebuildSnapshot<SurvivalSnapshot>(stores, SENSE_KEYS);
curr.health = rebuildHealth(stores.system);
const curr = await rebuildSnapshot<SurvivalSnapshot>(stores, SENSE_KEYS);
curr.health = await rebuildHealth(stores.system);
// Run survival rules
const pulse = composeRules<
@@ -435,8 +439,8 @@ describe('E2E survival chain', () => {
});
}
const curr = rebuildSnapshot<SurvivalSnapshot>(stores, SENSE_KEYS);
curr.health = rebuildHealth(stores.system);
const curr = await rebuildSnapshot<SurvivalSnapshot>(stores, SENSE_KEYS);
curr.health = await rebuildHealth(stores.system);
const prev: SurvivalSnapshot = { timestamp: curr.timestamp - 1000 };
// Business rule that should execute (reached only when survival passes through)
+11 -3
View File
@@ -212,7 +212,7 @@ describe('E2E T9: INT ID lifecycle', () => {
occurredAt: 1000 + i * 100,
kind: 'vital',
key: 'cpu',
await hash: store.putObject({ cpu: i * 10 }),
hash: await store.putObject({ cpu: i * 10 }),
});
}
@@ -511,7 +511,11 @@ describe('E2E T9: INT ID through rebuildSnapshot + rules', () => {
});
test('16: rollback epoch references int IDs', async () => {
await store.appendEvent({ occurredAt: 1000, kind: 'promote', codeRev: 'v1' });
await store.appendEvent({
occurredAt: 1000,
kind: 'promote',
codeRev: 'v1',
});
const h1 = await store.putObject({ value: 'v1-state' });
await store.appendEvent({
occurredAt: 1100,
@@ -521,7 +525,11 @@ describe('E2E T9: INT ID through rebuildSnapshot + rules', () => {
codeRev: 'v1',
});
await store.appendEvent({ occurredAt: 2000, kind: 'promote', codeRev: 'v2' });
await store.appendEvent({
occurredAt: 2000,
kind: 'promote',
codeRev: 'v2',
});
const h2 = await store.putObject({ value: 'v2-state' });
await store.appendEvent({
occurredAt: 2100,
+6 -3
View File
@@ -24,7 +24,10 @@ function makeStore(tmpDir: string, name = 'events'): PulseStore {
});
}
function writeEffectEvent(store: PulseStore, effect: TestEffect): string {
async function writeEffectEvent(
store: PulseStore,
effect: TestEffect,
): Promise<string> {
const hash = await store.putObject(effect);
const event = await store.appendEvent({
occurredAt: Date.now(),
@@ -148,7 +151,7 @@ describe('executorLoop', () => {
});
const effect: TestEffect = { type: 'notify' };
const effectEventId = writeEffectEvent(store, effect);
const effectEventId = await writeEffectEvent(store, effect);
// Wait until execute has returned (so .then() has written effect-acked)
await execDone;
@@ -182,7 +185,7 @@ describe('executorLoop', () => {
});
const effect: TestEffect = { type: 'risky-op' };
const effectEventId = writeEffectEvent(store, effect);
const effectEventId = await writeEffectEvent(store, effect);
await new Promise((r) => setTimeout(r, 150));
ac.abort();
+5 -5
View File
@@ -307,23 +307,23 @@ describe('createGcTrigger', () => {
// Tick 1, 2: no GC
trigger();
trigger();
await new Promise(r => setTimeout(r, 50));
await new Promise((r) => setTimeout(r, 50));
expect((await system.queryByKind('gc', {})).length).toBe(0);
// Tick 3: GC fires
trigger();
await new Promise(r => setTimeout(r, 200));
await new Promise((r) => setTimeout(r, 200));
expect((await system.queryByKind('gc', {})).length).toBe(1);
// Tick 4, 5: no GC again
trigger();
trigger();
await new Promise(r => setTimeout(r, 50));
await new Promise((r) => setTimeout(r, 50));
expect((await system.queryByKind('gc', {})).length).toBe(1);
// Tick 6: GC fires again
trigger();
await new Promise(r => setTimeout(r, 200));
await new Promise((r) => setTimeout(r, 200));
expect((await system.queryByKind('gc', {})).length).toBe(2);
});
@@ -345,7 +345,7 @@ describe('createGcTrigger', () => {
});
for (let i = 0; i < 300; i++) trigger();
await new Promise(r => setTimeout(r, 50));
await new Promise((r) => setTimeout(r, 50));
expect((await system.queryByKind('gc', {})).length).toBe(0);
// Old event should still be there
expect((await vitals.queryByKind('vital', {})).length).toBe(1);
+102 -21
View File
@@ -499,23 +499,43 @@ describe('findEffectiveEpoch', () => {
});
it('with promote → returns it', async () => {
await store.appendEvent({ occurredAt: 1000, kind: 'promote', codeRev: 'v1' });
await store.appendEvent({
occurredAt: 1000,
kind: 'promote',
codeRev: 'v1',
});
const epoch = await findEffectiveEpoch(store);
expect(epoch).toBeTruthy();
expect(epoch?.codeRev).toBe('v1');
});
it('multiple promotes → returns latest', async () => {
await store.appendEvent({ occurredAt: 1000, kind: 'promote', codeRev: 'v1' });
await store.appendEvent({ occurredAt: 2000, kind: 'promote', codeRev: 'v2' });
await store.appendEvent({
occurredAt: 1000,
kind: 'promote',
codeRev: 'v1',
});
await store.appendEvent({
occurredAt: 2000,
kind: 'promote',
codeRev: 'v2',
});
const epoch = await findEffectiveEpoch(store);
expect(epoch).toBeTruthy();
expect(epoch?.codeRev).toBe('v2');
});
it('with rollback → returns rolled-back-to promote', async () => {
await store.appendEvent({ occurredAt: 1000, kind: 'promote', codeRev: 'v1' });
await store.appendEvent({ occurredAt: 2000, kind: 'promote', codeRev: 'v2' });
await store.appendEvent({
occurredAt: 1000,
kind: 'promote',
codeRev: 'v1',
});
await store.appendEvent({
occurredAt: 2000,
kind: 'promote',
codeRev: 'v2',
});
await store.appendEvent({
occurredAt: 3000,
kind: 'rollback',
@@ -528,8 +548,16 @@ describe('findEffectiveEpoch', () => {
});
it('rollback with only codeRev (no meta.to) → uses codeRev', async () => {
await store.appendEvent({ occurredAt: 1000, kind: 'promote', codeRev: 'v1' });
await store.appendEvent({ occurredAt: 2000, kind: 'promote', codeRev: 'v2' });
await store.appendEvent({
occurredAt: 1000,
kind: 'promote',
codeRev: 'v1',
});
await store.appendEvent({
occurredAt: 2000,
kind: 'promote',
codeRev: 'v2',
});
await store.appendEvent({
occurredAt: 3000,
kind: 'rollback',
@@ -565,7 +593,11 @@ describe('rebuildSnapshot with epoch', () => {
const h1 = await store.putObject({ value: 'v1-data' });
const h2 = await store.putObject({ value: 'v2-data' });
await store.appendEvent({ occurredAt: 1000, kind: 'promote', codeRev: 'v1' });
await store.appendEvent({
occurredAt: 1000,
kind: 'promote',
codeRev: 'v1',
});
await store.appendEvent({
occurredAt: 1100,
kind: 'collect',
@@ -573,7 +605,11 @@ describe('rebuildSnapshot with epoch', () => {
hash: h1,
codeRev: 'v1',
});
await store.appendEvent({ occurredAt: 2000, kind: 'promote', codeRev: 'v2' });
await store.appendEvent({
occurredAt: 2000,
kind: 'promote',
codeRev: 'v2',
});
await store.appendEvent({
occurredAt: 2100,
kind: 'collect',
@@ -583,7 +619,10 @@ describe('rebuildSnapshot with epoch', () => {
});
// Using v1 epoch should get v1 data
const epochV1 = await store.getLatestWhere({ kind: 'promote', codeRev: 'v1' });
const epochV1 = await store.getLatestWhere({
kind: 'promote',
codeRev: 'v1',
});
const snapshotV1 = rebuildSnapshot<{
timestamp: number;
system: Sensed<{ value: string }>;
@@ -591,7 +630,10 @@ describe('rebuildSnapshot with epoch', () => {
expect(snapshotV1.system.data.value).toBe('v1-data');
// Using v2 epoch should get v2 data
const epochV2 = await store.getLatestWhere({ kind: 'promote', codeRev: 'v2' });
const epochV2 = await store.getLatestWhere({
kind: 'promote',
codeRev: 'v2',
});
const snapshotV2 = rebuildSnapshot<{
timestamp: number;
system: Sensed<{ value: string }>;
@@ -603,7 +645,11 @@ describe('rebuildSnapshot with epoch', () => {
const h1 = await store.putObject({ value: 'v1-data' });
const h2 = await store.putObject({ value: 'v2-data' });
await store.appendEvent({ occurredAt: 1000, kind: 'promote', codeRev: 'v1' });
await store.appendEvent({
occurredAt: 1000,
kind: 'promote',
codeRev: 'v1',
});
await store.appendEvent({
occurredAt: 1100,
kind: 'collect',
@@ -611,7 +657,11 @@ describe('rebuildSnapshot with epoch', () => {
hash: h1,
codeRev: 'v1',
});
await store.appendEvent({ occurredAt: 2000, kind: 'promote', codeRev: 'v2' });
await store.appendEvent({
occurredAt: 2000,
kind: 'promote',
codeRev: 'v2',
});
await store.appendEvent({
occurredAt: 2100,
kind: 'collect',
@@ -621,7 +671,10 @@ describe('rebuildSnapshot with epoch', () => {
});
// v1 epoch should NOT see v2 collect (different code_rev)
const epochV1 = await store.getLatestWhere({ kind: 'promote', codeRev: 'v1' });
const epochV1 = await store.getLatestWhere({
kind: 'promote',
codeRev: 'v1',
});
const snapshot = rebuildSnapshot<{
timestamp: number;
system: Sensed<{ value: string }>;
@@ -632,7 +685,11 @@ describe('rebuildSnapshot with epoch', () => {
it('migrate events used for initial snapshot after promote', async () => {
const hMigrated = await store.putObject({ value: 'migrated' });
await store.appendEvent({ occurredAt: 1000, kind: 'promote', codeRev: 'v2' });
await store.appendEvent({
occurredAt: 1000,
kind: 'promote',
codeRev: 'v2',
});
await store.appendEvent({
occurredAt: 1001,
kind: 'migrate',
@@ -652,7 +709,11 @@ describe('rebuildSnapshot with epoch', () => {
it('init events used when no collect or migrate exist', async () => {
const hInit = await store.putObject({ value: 'initial' });
await store.appendEvent({ occurredAt: 1000, kind: 'promote', codeRev: 'v1' });
await store.appendEvent({
occurredAt: 1000,
kind: 'promote',
codeRev: 'v1',
});
await store.appendEvent({
occurredAt: 1001,
kind: 'init',
@@ -674,7 +735,11 @@ describe('rebuildSnapshot with epoch', () => {
const hMigrate = await store.putObject({ value: 'migrated' });
const hCollect = await store.putObject({ value: 'collected' });
await store.appendEvent({ occurredAt: 1000, kind: 'promote', codeRev: 'v2' });
await store.appendEvent({
occurredAt: 1000,
kind: 'promote',
codeRev: 'v2',
});
await store.appendEvent({
occurredAt: 1001,
kind: 'init',
@@ -732,7 +797,11 @@ describe('rebuildSnapshot with epoch', () => {
const h1 = await store.putObject({ value: 'v1-data' });
const h2 = await store.putObject({ value: 'v2-data' });
await store.appendEvent({ occurredAt: 1000, kind: 'promote', codeRev: 'v1' });
await store.appendEvent({
occurredAt: 1000,
kind: 'promote',
codeRev: 'v1',
});
await store.appendEvent({
occurredAt: 1100,
kind: 'collect',
@@ -740,7 +809,11 @@ describe('rebuildSnapshot with epoch', () => {
hash: h1,
codeRev: 'v1',
});
await store.appendEvent({ occurredAt: 2000, kind: 'promote', codeRev: 'v2' });
await store.appendEvent({
occurredAt: 2000,
kind: 'promote',
codeRev: 'v2',
});
await store.appendEvent({
occurredAt: 2100,
kind: 'collect',
@@ -838,7 +911,11 @@ describe('rebuildSnapshot vitals priority', () => {
const hVital = await vitalsStore.putObject({ value: 'vital-data' });
const hEvent = await store.putObject({ value: 'event-data' });
await store.appendEvent({ occurredAt: 1000, kind: 'promote', codeRev: 'v1' });
await store.appendEvent({
occurredAt: 1000,
kind: 'promote',
codeRev: 'v1',
});
await store.appendEvent({
occurredAt: 1100,
kind: 'collect',
@@ -864,7 +941,11 @@ describe('rebuildSnapshot vitals priority', () => {
it('falls back to migrate events when no vitals', async () => {
const hMigrate = await store.putObject({ value: 'migrated' });
await store.appendEvent({ occurredAt: 1000, kind: 'promote', codeRev: 'v2' });
await store.appendEvent({
occurredAt: 1000,
kind: 'promote',
codeRev: 'v2',
});
await store.appendEvent({
occurredAt: 1001,
kind: 'migrate',
+18 -7
View File
@@ -17,7 +17,6 @@ import { foldAllProjections, getProjectionState } from './projection-engine.js';
import type { EventRecord, PulseStore, ScopedStore } from './store.js';
import { startWatcher, type WatcherDef } from './watcher.js';
// ── Sensed Types ───────────────────────────────────────────────
/**
@@ -162,7 +161,9 @@ export function composeRules<S, E>(
* If no rollback, use the latest promote event.
* If no promote at all, return null (cold start).
*/
export async function findEffectiveEpoch(store: PulseStore): Promise<EventRecord | null> {
export async function findEffectiveEpoch(
store: PulseStore,
): Promise<EventRecord | null> {
const rollback = await store.getLatest('rollback');
if (rollback) {
// rollback.meta should contain { to: 'v1' } — the code_rev to roll back to
@@ -175,7 +176,10 @@ export async function findEffectiveEpoch(store: PulseStore): Promise<EventRecord
}
const targetRev = (meta.to as string | undefined) || rollback.codeRev;
if (targetRev) {
return await store.getLatestWhere({ kind: 'promote', codeRev: targetRev });
return await store.getLatestWhere({
kind: 'promote',
codeRev: targetRev,
});
}
}
return await store.getLatest('promote');
@@ -319,7 +323,9 @@ export async function rebuildSnapshot<S extends { timestamp: number }>(
* Read each declared projection's current value, using "scope/name" as key.
* If projection doesn't exist, value is null (graceful degradation).
*/
export async function buildSnapshotFromProjections<S extends { timestamp: number }>(
export async function buildSnapshotFromProjections<
S extends { timestamp: number },
>(
scopedStore: ScopedStore,
projectionPaths: string[], // ["_vitals/cpu_usage", "neko/session_count"]
): Promise<S> {
@@ -629,7 +635,10 @@ export async function runPulseV2<S extends { timestamp: number }, E>(options: {
const pulse = composeRules(rules, defaultTickMs);
// Build initial snapshot
let prev = await buildSnapshotFromProjections<S>(scopedStore, projectionPaths);
let prev = await buildSnapshotFromProjections<S>(
scopedStore,
projectionPaths,
);
let tickMs = defaultTickMs;
// ── Wake mechanism ─────────────────────────────────────────────
@@ -698,7 +707,10 @@ export async function runPulseV2<S extends { timestamp: number }, E>(options: {
}
// Build current snapshot from projections
const curr = await buildSnapshotFromProjections<S>(scopedStore, projectionPaths);
const curr = await buildSnapshotFromProjections<S>(
scopedStore,
projectionPaths,
);
const tickStart = Date.now();
const [effects, nextTickMs] = await pulse(prev, curr);
@@ -1025,4 +1037,3 @@ export type {
ToolResponseMeta,
TraceMessage,
} from './task-events.js';
+18 -4
View File
@@ -60,8 +60,16 @@ describe('objects table (createStore)', () => {
});
it('3. queryObjectsByType filters by type', async () => {
await store.createObject({ objectType: 'user', externalId: 'a', codeRev: 'v1' });
await store.createObject({ objectType: 'user', externalId: 'b', codeRev: 'v1' });
await store.createObject({
objectType: 'user',
externalId: 'a',
codeRev: 'v1',
});
await store.createObject({
objectType: 'user',
externalId: 'b',
codeRev: 'v1',
});
await store.createObject({
objectType: 'order',
externalId: 'o1',
@@ -136,8 +144,14 @@ describe('objects table (createStore)', () => {
});
it('createObject without externalId creates distinct rows', async () => {
const id1 = await store.createObject({ objectType: 'ephemeral', codeRev: 'v1' });
const id2 = await store.createObject({ objectType: 'ephemeral', codeRev: 'v1' });
const id1 = await store.createObject({
objectType: 'ephemeral',
codeRev: 'v1',
});
const id2 = await store.createObject({
objectType: 'ephemeral',
codeRev: 'v1',
});
expect(id1).not.toBe(id2);
});
});
+6 -2
View File
@@ -19,12 +19,16 @@ export interface HealthSnapshot {
* Rebuild health field from events table.
* This function is in core package, agent cannot change.
*/
export async function rebuildHealth(store: PulseStore): Promise<HealthSnapshot> {
export async function rebuildHealth(
store: PulseStore,
): Promise<HealthSnapshot> {
const now = Date.now();
const windowStart = now - 5 * 60 * 1000; // 5 minute window
// Query recent events from events table
const recentEvents = await store.queryByKind('effect', { since: windowStart });
const recentEvents = await store.queryByKind('effect', {
since: windowStart,
});
// Count restarts
const lastRestart: Record<string, { ts: number; count: number }> = {};
+10 -7
View File
@@ -185,12 +185,12 @@ describe('createScopedStore', () => {
await scopedStore.close();
// After close, accessing the db should throw
expect(() =>
await system.appendEvent({ occurredAt: 3000, kind: 'tick' }),
).toThrow();
expect(() =>
await neko.appendEvent({ occurredAt: 3000, kind: 'tick' }),
).toThrow();
await expect(
async () => await system.appendEvent({ occurredAt: 3000, kind: 'tick' }),
).rejects.toThrow();
await expect(
async () => await neko.appendEvent({ occurredAt: 3000, kind: 'tick' }),
).rejects.toThrow();
});
// ── 15. full PulseStore API on scope ────────────────────────
@@ -252,7 +252,10 @@ describe('createScopedStore', () => {
codeRev: 'v2',
});
const latest = await store.getLatestWhere({ kind: 'promote', codeRev: 'v1' });
const latest = await store.getLatestWhere({
kind: 'promote',
codeRev: 'v1',
});
expect(latest).toBeTruthy();
expect(latest?.codeRev).toBe('v1');
expect(latest?.occurredAt).toBe(1000);
+32 -7
View File
@@ -256,9 +256,21 @@ describe('createStore (events table + CAS)', () => {
it('12. queryByKind with codeRev filter', async () => {
const store = createStore({ eventsDbPath, objectsDir });
await store.appendEvent({ occurredAt: 1000, kind: 'tick', codeRev: 'abc123' });
await store.appendEvent({ occurredAt: 2000, kind: 'tick', codeRev: 'def456' });
await store.appendEvent({ occurredAt: 3000, kind: 'tick', codeRev: 'abc123' });
await store.appendEvent({
occurredAt: 1000,
kind: 'tick',
codeRev: 'abc123',
});
await store.appendEvent({
occurredAt: 2000,
kind: 'tick',
codeRev: 'def456',
});
await store.appendEvent({
occurredAt: 3000,
kind: 'tick',
codeRev: 'abc123',
});
const result = await store.queryByKind('tick', { codeRev: 'abc123' });
expect(result.length).toBe(2);
@@ -346,8 +358,16 @@ describe('createStore (events table + CAS)', () => {
await store.appendEvent({ occurredAt: 1000, kind: 'tick', key: 'sys' });
await store.appendEvent({ occurredAt: 1000, kind: 'collect', key: 'cpu' });
await store.appendEvent({ occurredAt: 1000, kind: 'effect', key: 'notify' });
await store.appendEvent({ occurredAt: 1000, kind: 'error', key: 'timeout' });
await store.appendEvent({
occurredAt: 1000,
kind: 'effect',
key: 'notify',
});
await store.appendEvent({
occurredAt: 1000,
kind: 'error',
key: 'timeout',
});
expect((await store.queryByKind('tick')).length).toBe(1);
expect((await store.queryByKind('collect')).length).toBe(1);
@@ -421,7 +441,9 @@ describe('createStore (events table + CAS)', () => {
it('getLatestWhere returns null when no match', async () => {
const store = createStore({ eventsDbPath, objectsDir });
await store.appendEvent({ occurredAt: 1000, kind: 'tick', codeRev: 'v1' });
expect(await store.getLatestWhere({ kind: 'tick', codeRev: 'v99' })).toBeNull();
expect(
await store.getLatestWhere({ kind: 'tick', codeRev: 'v99' }),
).toBeNull();
await store.close();
});
@@ -532,7 +554,10 @@ describe('archiveEvents and downsampleEvents', () => {
const deleted = await store.downsampleEvents('vital', 'cpu', 1000, 3000);
expect(deleted).toBe(4);
const remaining = await store.queryByKind('vital', { key: 'cpu', limit: 10 });
const remaining = await store.queryByKind('vital', {
key: 'cpu',
limit: 10,
});
expect(remaining.length).toBe(3);
const hashes = remaining.map((v) => v.hash);
expect(hashes.includes('w0_3')).toBeTruthy();
+3 -1
View File
@@ -288,7 +288,9 @@ export function createStore(options: CreateStoreOptions): PulseStore {
return doAppendEvent(event);
},
async appendEvents(events: Omit<EventRecord, 'id'>[]): Promise<EventRecord[]> {
async appendEvents(
events: Omit<EventRecord, 'id'>[],
): Promise<EventRecord[]> {
return appendManyTx(events);
},
+12 -12
View File
@@ -25,14 +25,14 @@ describe('coding-tdd WorkflowType', () => {
});
}
function cleanup() {
async function cleanup() {
try {
await store?.close();
} catch {}
if (tmpDir) rmSync(tmpDir, { recursive: true, force: true });
}
function trigger(topicId: string, title: string, description: string) {
async function trigger(topicId: string, title: string, description: string) {
const hash = await store.putObject(description);
await store.appendEvent({
occurredAt: Date.now(),
@@ -63,7 +63,7 @@ describe('coding-tdd WorkflowType', () => {
try {
const wf = createTddCodingWorkflow();
const rule = createWorkflowRule(wf, store, undefined, ruleOpts);
trigger('t1', 'Feature', 'desc');
await trigger('t1', 'Feature', 'desc');
const order: string[] = [];
for (let i = 0; i < 30; i++) {
@@ -86,7 +86,7 @@ describe('coding-tdd WorkflowType', () => {
const r = await rule.tick();
expect(r.executed).toEqual([]);
} finally {
cleanup();
await cleanup();
}
});
@@ -354,7 +354,7 @@ describe('coding-tdd WorkflowType', () => {
},
});
const rule = createWorkflowRule(wf, store, undefined, ruleOpts);
trigger('loop-tr', 'L', 'd');
await trigger('loop-tr', 'L', 'd');
const roles: string[] = [];
for (let i = 0; i < 40; i++) {
@@ -372,7 +372,7 @@ describe('coding-tdd WorkflowType', () => {
expect(roles).toContain('test-coder');
expect(roles).toContain('closer');
} finally {
cleanup();
await cleanup();
}
});
@@ -401,7 +401,7 @@ describe('coding-tdd WorkflowType', () => {
},
});
const rule = createWorkflowRule(wf, store, undefined, ruleOpts);
trigger('loop-auto', 'L', 'd');
await trigger('loop-auto', 'L', 'd');
const roles: string[] = [];
for (let i = 0; i < 40; i++) {
@@ -417,7 +417,7 @@ describe('coding-tdd WorkflowType', () => {
expect(secondCoder).toBeGreaterThan(firstAuto);
expect(roles).toContain('closer');
} finally {
cleanup();
await cleanup();
}
});
@@ -438,7 +438,7 @@ describe('coding-tdd WorkflowType', () => {
},
});
const rule = createWorkflowRule(wf, store, undefined, ruleOpts);
trigger('loop-man', 'L', 'd');
await trigger('loop-man', 'L', 'd');
const roles: string[] = [];
for (let i = 0; i < 40; i++) {
@@ -454,7 +454,7 @@ describe('coding-tdd WorkflowType', () => {
);
expect(roles).toContain('closer');
} finally {
cleanup();
await cleanup();
}
});
@@ -485,7 +485,7 @@ describe('coding-tdd WorkflowType', () => {
},
});
const rule = createWorkflowRule(wf, store, undefined, ruleOpts);
trigger('loop-rev', 'L', 'd');
await trigger('loop-rev', 'L', 'd');
const roles: string[] = [];
for (let i = 0; i < 40; i++) {
@@ -497,7 +497,7 @@ describe('coding-tdd WorkflowType', () => {
expect(roles.filter((r) => r === 'reviewer').length).toBe(2);
expect(roles).toContain('closer');
} finally {
cleanup();
await cleanup();
}
});
});
+14 -7
View File
@@ -31,7 +31,7 @@ describe('CodingTask WorkflowType', () => {
if (tmpDir) rmSync(tmpDir, { recursive: true, force: true });
});
function triggerCoding(
async function triggerCoding(
topicId: string,
title: string,
description: string,
@@ -56,7 +56,7 @@ describe('CodingTask WorkflowType', () => {
cooldownMs: 0,
});
triggerCoding(
await triggerCoding(
'task-1',
'Fix login bug',
'Users cannot log in with SSO',
@@ -111,7 +111,7 @@ describe('CodingTask WorkflowType', () => {
cooldownMs: 0,
});
triggerCoding('task-2', 'Add feature X', 'New feature', '/tmp/repo');
await triggerCoding('task-2', 'Add feature X', 'New feature', '/tmp/repo');
await rule.tick(); // architect
await rule.tick(); // coder
@@ -159,7 +159,7 @@ describe('CodingTask WorkflowType', () => {
cooldownMs: 0,
});
triggerCoding(
await triggerCoding(
'task-retry',
'Retry limit test',
'Test max 3 retries',
@@ -195,7 +195,12 @@ describe('CodingTask WorkflowType', () => {
cooldownMs: 0,
});
triggerCoding('task-3', 'CAS test', 'Verify CAS storage', '/tmp/repo');
await triggerCoding(
'task-3',
'CAS test',
'Verify CAS storage',
'/tmp/repo',
);
await rule.tick(); // architect
const events = await store.queryByKind('coding.architect');
@@ -219,7 +224,7 @@ describe('CodingTask WorkflowType', () => {
const codingTask = createCodingWorkflow();
const rule = createWorkflowRule(codingTask, store, logStore);
triggerCoding(
await triggerCoding(
'task-4',
'Test log isolation',
'Role logs go to logStore',
@@ -228,7 +233,9 @@ describe('CodingTask WorkflowType', () => {
await rule.tick();
await expect(logStore.queryByKind('coding.role-started').length).toBe(1);
await expect(logStore.queryByKind('coding.role-completed').length).toBe(1);
await expect(logStore.queryByKind('coding.role-completed').length).toBe(
1,
);
await expect(store.queryByKind('coding.role-started').length).toBe(0);
} finally {
await logStore.close();
@@ -34,7 +34,7 @@ describe('cursor-health', () => {
stmt.run(baseTime + offset);
}
await db.close();
db.close();
return tempDbPath;
}
+1 -1
View File
@@ -16,7 +16,7 @@ function tmpStore(): { store: Store; cleanup: () => void } {
eventsDbPath: `${dir}/events.db`,
objectsDir: `${dir}/objects`,
});
await return { store, cleanup: () => store.close() };
return { store, cleanup: () => store.close() };
}
describe('Report Workflow', () => {
@@ -82,7 +82,7 @@ describe('coder-cursor role', () => {
expect(coded).toBeDefined();
expect(coded!.hash).toBeTruthy();
const content = await store.getObject(coded!.hash!) as any;
const content = (await store.getObject(coded!.hash!)) as any;
expect(content.content).toBe('Mock implementation done');
expect(content.artifacts.filesChanged).toEqual(['src/utils.ts']);
expect(content.artifacts.testsPassed).toBe(true);
@@ -38,7 +38,9 @@ describe('reviewer-cursor role', () => {
topicId: string,
s: PulseStore,
) => {
const hash = await s.putObject({ content: 'LGTM, code looks good. APPROVED.' });
const hash = await s.putObject({
content: 'LGTM, code looks good. APPROVED.',
});
await s.appendEvent({
occurredAt: Date.now(),
kind: 'coding.reviewer',
@@ -84,7 +86,7 @@ describe('reviewer-cursor role', () => {
const meta = JSON.parse(reviewed!.meta!);
expect(meta.verdict).toBe('approved');
const content = await store.getObject(reviewed!.hash!) as any;
const content = (await store.getObject(reviewed!.hash!)) as any;
expect(content.content).toContain('APPROVED');
});
@@ -39,7 +39,7 @@ describe('createWorkflowRule', () => {
if (tmpDir) rmSync(tmpDir, { recursive: true, force: true });
});
function triggerWorkflow(
async function triggerWorkflow(
name: string,
topicId: string,
content: string,
@@ -81,7 +81,7 @@ describe('createWorkflowRule', () => {
const rule = createWorkflowRule(echoType, store, logStore);
triggerWorkflow('echo', 't1', 'hello');
await triggerWorkflow('echo', 't1', 'hello');
const r1 = await rule.tick();
expect(r1.executed).toMatchObject([{ topicId: 't1', role: 'echo' }]);
@@ -123,7 +123,7 @@ describe('createWorkflowRule', () => {
};
const rule = createWorkflowRule(failType, store, logStore);
triggerWorkflow('fail', 't1', 'will fail');
await triggerWorkflow('fail', 't1', 'will fail');
const r1 = await rule.tick();
expect(r1.executed).toEqual([]);
@@ -150,7 +150,7 @@ describe('createWorkflowRule', () => {
};
const rule = createWorkflowRule(echoType, store);
triggerWorkflow('echo', 't1', 'no log');
await triggerWorkflow('echo', 't1', 'no log');
const r1 = await rule.tick();
expect(r1.executed).toMatchObject([{ topicId: 't1', role: 'echo' }]);
@@ -178,7 +178,7 @@ describe('createWorkflowRule', () => {
};
const rule = createWorkflowRule(wf, store);
triggerWorkflow('sticky', 't1', 'go');
await triggerWorkflow('sticky', 't1', 'go');
await rule.tick();
expect(execCount).toBe(1);
@@ -213,7 +213,7 @@ describe('createWorkflowRule', () => {
};
const rule = createWorkflowRule(slowType, store);
triggerWorkflow('slow', 't1', 'go');
await triggerWorkflow('slow', 't1', 'go');
const tick1 = rule.tick();
await new Promise((r) => setTimeout(r, 50));
@@ -250,7 +250,7 @@ describe('createWorkflowRule', () => {
};
const rule = createWorkflowRule(chainType, store);
triggerWorkflow('chain', 't1', 'hello world');
await triggerWorkflow('chain', 't1', 'hello world');
await rule.tick();
@@ -276,7 +276,7 @@ describe('createWorkflowRule', () => {
};
const rule = createWorkflowRule(wf, store);
triggerWorkflow('nm', 't1', 'go');
await triggerWorkflow('nm', 't1', 'go');
await rule.tick();
@@ -321,7 +321,7 @@ describe('createWorkflowRule', () => {
};
const rule = createWorkflowRule(loopType, store, logStore);
triggerWorkflow('loop', 't1', 'start loop');
await triggerWorkflow('loop', 't1', 'start loop');
// Each tick advances one role
await rule.tick(); // loop 1
@@ -358,9 +358,9 @@ describe('createWorkflowRule', () => {
const rule = createWorkflowRule(workType, store, logStore);
triggerWorkflow('work', 't1', 'work1');
triggerWorkflow('work', 't2', 'work2');
triggerWorkflow('work', 't3', 'work3');
await triggerWorkflow('work', 't1', 'work1');
await triggerWorkflow('work', 't2', 'work2');
await triggerWorkflow('work', 't3', 'work3');
const r1 = await rule.tick();
expect(r1.executed.length).toBe(3);
@@ -400,20 +400,20 @@ describe('createWorkflowRule', () => {
});
// 触发第一个 workflow
triggerWorkflow('cool', 't1', 'work1');
await triggerWorkflow('cool', 't1', 'work1');
const r1 = await rule.tick();
expect(execCount).toBe(1);
expect(r1.executed.length).toBe(1);
// 立即触发第二个 workflow,应该被 cooldown 阻止
triggerWorkflow('cool', 't1', 'work2');
await triggerWorkflow('cool', 't1', 'work2');
const r2 = await rule.tick();
expect(r2.executed).toEqual([]);
expect(execCount).toBe(1);
// 等待 cooldown 结束后再次触发
await new Promise((resolve) => setTimeout(resolve, 600));
triggerWorkflow('cool', 't1', 'work3');
await triggerWorkflow('cool', 't1', 'work3');
const r3 = await rule.tick();
expect(execCount).toBe(2);
expect(r3.executed.length).toBe(1);
@@ -471,7 +471,7 @@ describe('createWorkflowRule', () => {
// 不传 options,应该使用默认值
const rule = createWorkflowRule(testType, store, logStore);
triggerWorkflow('test', 't1', 'start');
await triggerWorkflow('test', 't1', 'start');
const r1 = await rule.tick();
expect(r1.executed).toMatchObject([{ topicId: 't1', role: 'test' }]);
@@ -201,7 +201,10 @@ export function createWorkflowRule(
// Update checkpoint with the newly written event
if (checkpoint && written) {
checkpoint.lastEventId = written.id;
const parsedMeta = result.meta != null ? (result.meta as Record<string, unknown>) : null;
const parsedMeta =
result.meta != null
? (result.meta as Record<string, unknown>)
: null;
checkpoint.topicSummaries.set(action.topicId, {
lastRole: action.role,
meta: parsedMeta,
+5 -5
View File
@@ -5,9 +5,9 @@
import type { Command } from 'commander';
import { loadConfig, resolveDir } from '../config.js';
import {
type EventRecord,
openOrCreateWorkflowsStore,
openWorkflowsStore,
type EventRecord,
} from '../store.js';
export function registerInspectCommand(program: Command): void {
@@ -27,7 +27,9 @@ export function registerInspectCommand(program: Command): void {
const limit = Math.max(1, parseInt(opts.limit, 10) || 20);
if (!store) {
console.log('No workflows.db yet. Run the workflow daemon or `upulse workflow submit`.');
console.log(
'No workflows.db yet. Run the workflow daemon or `upulse workflow submit`.',
);
return;
}
@@ -87,8 +89,6 @@ export function registerInspectCommand(program: Command): void {
process.exit(1);
}
console.log(
typeof obj === 'string' ? obj : JSON.stringify(obj, null, 2),
);
console.log(typeof obj === 'string' ? obj : JSON.stringify(obj, null, 2));
});
}
+39 -13
View File
@@ -4,16 +4,20 @@
* 🍊 (NEKO Team)
*/
import { describe, test, expect, beforeEach, afterEach } from 'bun:test';
import { execSync } from 'node:child_process';
import { mkdirSync, existsSync, rmSync } from 'node:fs';
import { join } from 'node:path';
import { tmpdir } from 'node:os';
import { Database } from 'bun:sqlite';
import { afterEach, beforeEach, describe, expect, test } from 'bun:test';
import { execSync } from 'node:child_process';
import { existsSync, mkdirSync, rmSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { performRollback } from './rollback.js';
function git(cmd: string, cwd: string): string {
return execSync(`git ${cmd}`, { cwd, encoding: 'utf-8', timeout: 10_000 }).trim();
return execSync(`git ${cmd}`, {
cwd,
encoding: 'utf-8',
timeout: 10_000,
}).trim();
}
interface TestEngine {
@@ -81,13 +85,23 @@ function setupTestEngine(tmpDir: string): TestEngine {
insert.run(v2Timestamp + 1000, 'task-started', 'b', 'h5', v2Commit, null);
db.close();
return { engineDir, eventsPath, v1Commit, v2Commit, v1Timestamp, v2Timestamp };
return {
engineDir,
eventsPath,
v1Commit,
v2Commit,
v1Timestamp,
v2Timestamp,
};
}
let tmpDir: string;
beforeEach(() => {
tmpDir = join(tmpdir(), `rollback-test-${Date.now()}-${Math.random().toString(36).slice(2)}`);
tmpDir = join(
tmpdir(),
`rollback-test-${Date.now()}-${Math.random().toString(36).slice(2)}`,
);
mkdirSync(tmpDir, { recursive: true });
});
@@ -99,7 +113,9 @@ describe('performRollback', () => {
test('dry run 不修改任何东西', () => {
const t = setupTestEngine(tmpDir);
const db = new Database(t.eventsPath, { readonly: true });
const beforeCount = (db.prepare('SELECT COUNT(*) as c FROM events').get() as any).c;
const beforeCount = (
db.prepare('SELECT COUNT(*) as c FROM events').get() as any
).c;
db.close();
const beforeHead = git('rev-parse HEAD', t.engineDir);
@@ -115,7 +131,9 @@ describe('performRollback', () => {
// Nothing changed
const db2 = new Database(t.eventsPath, { readonly: true });
const afterCount = (db2.prepare('SELECT COUNT(*) as c FROM events').get() as any).c;
const afterCount = (
db2.prepare('SELECT COUNT(*) as c FROM events').get() as any
).c;
db2.close();
expect(afterCount).toBe(beforeCount);
expect(git('rev-parse HEAD', t.engineDir)).toBe(beforeHead);
@@ -135,14 +153,18 @@ describe('performRollback', () => {
// Main db should have 3 events left (v1 ones)
const db = new Database(t.eventsPath, { readonly: true });
const remaining = (db.prepare('SELECT COUNT(*) as c FROM events').get() as any).c;
const remaining = (
db.prepare('SELECT COUNT(*) as c FROM events').get() as any
).c;
db.close();
expect(remaining).toBe(3);
// Dump should have 2 events
expect(result.dumpPath).toBeTruthy();
const dumpDb = new Database(result.dumpPath!, { readonly: true });
const dumped = (dumpDb.prepare('SELECT COUNT(*) as c FROM events').get() as any).c;
const dumped = (
dumpDb.prepare('SELECT COUNT(*) as c FROM events').get() as any
).c;
dumpDb.close();
expect(dumped).toBe(2);
});
@@ -200,7 +222,11 @@ describe('performRollback', () => {
const dumpDb = new Database(result.dumpPath!, { readonly: true });
const getInfo = (key: string) =>
(dumpDb.prepare('SELECT value FROM rollback_info WHERE key = ?').get(key) as any)?.value;
(
dumpDb
.prepare('SELECT value FROM rollback_info WHERE key = ?')
.get(key) as any
)?.value;
expect(getInfo('from_commit')).toBe(result.fromCommit);
expect(getInfo('to_commit')).toBe(result.toCommit);
+46 -13
View File
@@ -16,10 +16,10 @@
* 🍊 (NEKO Team)
*/
import { Database } from 'bun:sqlite';
import { execSync } from 'node:child_process';
import { existsSync, mkdirSync, unlinkSync } from 'node:fs';
import { join } from 'node:path';
import { Database } from 'bun:sqlite';
import type { Command } from 'commander';
import { loadConfig, resolveDir } from '../config.js';
@@ -66,7 +66,12 @@ export function performRollback(opts: RollbackOptions): RollbackResult {
const currentCommit = run('git rev-parse --short HEAD', engineDir);
if (targetShort === currentCommit) {
return { status: 'noop', fromCommit: currentCommit, toCommit: targetShort, eventsRemoved: 0 };
return {
status: 'noop',
fromCommit: currentCommit,
toCommit: targetShort,
eventsRemoved: 0,
};
}
// 2. Find cutoff by target commit author date
@@ -85,7 +90,9 @@ export function performRollback(opts: RollbackOptions): RollbackResult {
};
}
const db = dryRun ? new Database(eventsPath, { readonly: true }) : new Database(eventsPath);
const db = dryRun
? new Database(eventsPath, { readonly: true })
: new Database(eventsPath);
const cutoffRow = db
.prepare('SELECT MAX(id) as maxId FROM events WHERE occurred_at <= ?')
@@ -145,19 +152,31 @@ export function performRollback(opts: RollbackOptions): RollbackResult {
);
`);
const events = db.prepare('SELECT * FROM events WHERE id > ?').all(cutoffId) as any[];
const events = db
.prepare('SELECT * FROM events WHERE id > ?')
.all(cutoffId) as any[];
const insertStmt = dumpDb.prepare(
'INSERT INTO events (id, occurred_at, kind, key, hash, code_rev, meta) VALUES (?, ?, ?, ?, ?, ?, ?)',
);
const insertTx = dumpDb.transaction(() => {
for (const e of events) {
insertStmt.run(e.id, e.occurred_at, e.kind, e.key, e.hash, e.code_rev, e.meta);
insertStmt.run(
e.id,
e.occurred_at,
e.kind,
e.key,
e.hash,
e.code_rev,
e.meta,
);
}
});
insertTx();
const infoStmt = dumpDb.prepare('INSERT INTO rollback_info (key, value) VALUES (?, ?)');
const infoStmt = dumpDb.prepare(
'INSERT INTO rollback_info (key, value) VALUES (?, ?)',
);
infoStmt.run('from_commit', currentCommit);
infoStmt.run('to_commit', targetShort);
infoStmt.run('cutoff_event_id', String(cutoffId));
@@ -177,7 +196,9 @@ export function performRollback(opts: RollbackOptions): RollbackResult {
// Clean projections table if present
try {
const hasTable = db
.prepare("SELECT name FROM sqlite_master WHERE type='table' AND name='projections'")
.prepare(
"SELECT name FROM sqlite_master WHERE type='table' AND name='projections'",
)
.get();
if (hasTable) {
db.prepare('DELETE FROM projections').run();
@@ -213,10 +234,14 @@ export function registerRollbackCommand(program: Command): void {
const baseDir = resolveDir(program.opts().dir);
const config = loadConfig(baseDir);
const engineDir = opts.engine || config?.engine?.path || join(baseDir, 'engine');
const engineDir =
opts.engine || config?.engine?.path || join(baseDir, 'engine');
const eventsPath =
opts.events ||
join(config?.store?.scopesDir || join(baseDir, 'scopes'), 'workflows.db');
join(
config?.store?.scopesDir || join(baseDir, 'scopes'),
'workflows.db',
);
try {
const result = performRollback({
@@ -231,18 +256,26 @@ export function registerRollbackCommand(program: Command): void {
console.log('ℹ️ Already at target commit, nothing to do.');
break;
case 'dry-run':
console.log(`🔄 Rollback: ${result.fromCommit}${result.toCommit}`);
console.log(
`🔄 Rollback: ${result.fromCommit}${result.toCommit}`,
);
console.log(
`🏜️ Dry run — would remove ${result.eventsRemoved} events and revert to ${result.toCommit}`,
);
break;
case 'rolled-back':
console.log(`🔄 Rollback: ${result.fromCommit}${result.toCommit}`);
console.log(
`🔄 Rollback: ${result.fromCommit}${result.toCommit}`,
);
if (result.dumpPath) {
console.log(`💾 Dumped ${result.eventsRemoved} events → ${result.dumpPath}`);
console.log(
`💾 Dumped ${result.eventsRemoved} events → ${result.dumpPath}`,
);
}
console.log(`✅ Engine reverted to ${result.toCommit}`);
console.log('\n🎯 Rollback complete! Restart the daemon to apply changes.');
console.log(
'\n🎯 Rollback complete! Restart the daemon to apply changes.',
);
console.log(' sudo systemctl restart pulse-workflow.service');
break;
}
+2 -4
View File
@@ -2,8 +2,8 @@
* commands/status.ts systemd workflow service + workflows.db summary
*/
import { execSync } from 'node:child_process';
import { Database } from 'bun:sqlite';
import { execSync } from 'node:child_process';
import type { Command } from 'commander';
import { loadConfig, resolveDir } from '../config.js';
import { workflowDataPaths } from '../store.js';
@@ -48,9 +48,7 @@ export function registerStatusCommand(program: Command): void {
const db = new Database(eventsDbPath, { readonly: true });
try {
const row = db
.query(
'SELECT COUNT(*) AS c, MAX(occurred_at) AS m FROM events',
)
.query('SELECT COUNT(*) AS c, MAX(occurred_at) AS m FROM events')
.get() as { c: number; m: number | null };
console.log(`Event count: ${row.c}`);
if (row.m != null) {
@@ -8,9 +8,7 @@ import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { createStore, type PulseStore } from '@uncaged/pulse';
function parseWorkflowKind(
kind: string,
): { wf: string; role: string } | null {
function parseWorkflowKind(kind: string): { wf: string; role: string } | null {
const i = kind.lastIndexOf('.');
if (i <= 0) return null;
return { wf: kind.slice(0, i), role: kind.slice(i + 1) };
+18 -6
View File
@@ -136,14 +136,26 @@ export function initUpulse(dir?: string): void {
console.log(`Staging: ${config.staging.path}`);
if (process.platform === 'linux') {
console.log('\nSystemd user units: pulse.service, pulse-workflow.service');
console.log(' systemctl --user start pulse.service # engine daemon');
console.log(' systemctl --user start pulse-workflow.service # workflow daemon (if installed)');
console.log(' systemctl --user status pulse.service # check engine');
console.log(' upulse status # workflow DB + units');
console.log(
' systemctl --user start pulse.service # engine daemon',
);
console.log(
' systemctl --user start pulse-workflow.service # workflow daemon (if installed)',
);
console.log(
' systemctl --user status pulse.service # check engine',
);
console.log(
' upulse status # workflow DB + units',
);
}
console.log('\nCLI:');
console.log(' upulse workflow submit <wf> <topicKey> <file> # enqueue workflow task');
console.log(' upulse inspect events --limit 5 # recent workflow events');
console.log(
' upulse workflow submit <wf> <topicKey> <file> # enqueue workflow task',
);
console.log(
' upulse inspect events --limit 5 # recent workflow events',
);
}
// ── Helpers ────────────────────────────────────────────────────