diff --git a/packages/pulse/src/workflows/coding-tdd.test.ts b/packages/pulse/src/workflows/coding-tdd.test.ts index 79a5660..263f47a 100644 --- a/packages/pulse/src/workflows/coding-tdd.test.ts +++ b/packages/pulse/src/workflows/coding-tdd.test.ts @@ -95,6 +95,22 @@ describe('coding-tdd WorkflowType', () => { expect(wf.moderator({ role: START, meta: null }, 'x')).toBe('test-planner'); }); + it('moderator: test-planner → test-reviewer', () => { + const wf = createTddCodingWorkflow(); + expect( + wf.moderator( + { + role: 'test-planner', + meta: { + testPlan: '# p', + scenarios: ['a'], + }, + }, + 'x', + ), + ).toBe('test-reviewer'); + }); + it('moderator: test-reviewer approved → test-coder', () => { const wf = createTddCodingWorkflow(); expect( @@ -146,6 +162,22 @@ describe('coding-tdd WorkflowType', () => { ).toBe('coder'); }); + it('moderator: coder → auto-tester', () => { + const wf = createTddCodingWorkflow(); + expect( + wf.moderator( + { + role: 'coder', + meta: { + filesChanged: ['src/foo.ts'], + deploymentGuide: 'bun test', + }, + }, + 'x', + ), + ).toBe('auto-tester'); + }); + it('moderator: auto-tester pass → manual-tester', () => { const wf = createTddCodingWorkflow(); expect( @@ -243,6 +275,25 @@ describe('coding-tdd WorkflowType', () => { ).toBe('closer'); }); + it('moderator: reviewer approved + emergency → closer', () => { + const wf = createTddCodingWorkflow(); + expect( + wf.moderator( + { + role: 'reviewer', + meta: { + verdict: 'approved', + comments: '', + codeQuality: 'a', + testQuality: 'a', + }, + }, + 'x', + 1, + ), + ).toBe('closer'); + }); + it('moderator: reviewer rejected → coder', () => { const wf = createTddCodingWorkflow(); expect( diff --git a/packages/pulse/src/workflows/coding-tdd.ts b/packages/pulse/src/workflows/coding-tdd.ts index df48a38..b819ac3 100644 --- a/packages/pulse/src/workflows/coding-tdd.ts +++ b/packages/pulse/src/workflows/coding-tdd.ts @@ -3,6 +3,9 @@ * * Pure roles + START/END automaton. Trigger: coding-tdd.__start__ * + * 设计里的 role `type`(llm / code / agent)在实现层统一为 `Role<…>`;默认全部为可注入的 + * mock,生产环境通过 `CreateTddCodingWorkflowOpts` 传入 `createLlmRole` / agent 工厂等。 + * * 小橘 🍊 (NEKO Team) */ diff --git a/packages/upulse/bunfig.toml b/packages/upulse/bunfig.toml index cd45629..020197b 100644 --- a/packages/upulse/bunfig.toml +++ b/packages/upulse/bunfig.toml @@ -1,2 +1,2 @@ [test] -root = "src" +pathIgnorePatterns = ["**/src/e2e/**"] diff --git a/packages/upulse/package.json b/packages/upulse/package.json index ddddca1..e064432 100644 --- a/packages/upulse/package.json +++ b/packages/upulse/package.json @@ -13,8 +13,7 @@ "scripts": { "build": "tsc", "dev": "bun run src/cli.ts", - "test": "bun test --timeout 60000", - "test:e2e": "bun test --timeout 120000 src/e2e/" + "test": "bun test --timeout 60000 ./src" }, "dependencies": { "@uncaged/pulse": "workspace:*", diff --git a/packages/upulse/src/cli.ts b/packages/upulse/src/cli.ts index 41ba60a..73ec0ca 100644 --- a/packages/upulse/src/cli.ts +++ b/packages/upulse/src/cli.ts @@ -7,15 +7,9 @@ */ import { Command } from 'commander'; -import { registerDaemonCommand } from './commands/daemon.js'; -import { registerDeployCommand } from './commands/deploy.js'; -import { registerDevCommand } from './commands/dev.js'; -import { registerGcCommand } from './commands/gc.js'; import { registerInitCommand } from './commands/init.js'; import { registerInspectCommand } from './commands/inspect.js'; -import { registerListCommand } from './commands/list.js'; -import { registerTickCommand } from './commands/tick.js'; -import { registerUICommand } from './commands/ui.js'; +import { registerStatusCommand } from './commands/status.js'; import { registerWorkflowCommand } from './commands/workflow.js'; const program = new Command(); @@ -30,14 +24,8 @@ program ); registerInitCommand(program); -registerDaemonCommand(program); -registerTickCommand(program); -registerListCommand(program); registerInspectCommand(program); -registerDevCommand(program); -registerDeployCommand(program); -registerGcCommand(program); registerWorkflowCommand(program); -registerUICommand(program); +registerStatusCommand(program); program.parse(); diff --git a/packages/upulse/src/commands/daemon.ts b/packages/upulse/src/commands/daemon.ts deleted file mode 100644 index 27abbc4..0000000 --- a/packages/upulse/src/commands/daemon.ts +++ /dev/null @@ -1,47 +0,0 @@ -/** - * commands/daemon.ts — upulse daemon start/stop/status - */ - -import type { Command } from 'commander'; -import { loadConfig, resolveDir } from '../config.js'; -import { daemonStart, daemonStatus, daemonStop } from '../daemon.js'; - -export function registerDaemonCommand(program: Command): void { - const daemon = program - .command('daemon') - .description('Daemon lifecycle management'); - - daemon - .command('start') - .description('Start the Pulse daemon') - .option('--foreground', 'Run in foreground (no detach)') - .action((opts: { foreground?: boolean }) => { - const config = loadConfig(resolveDir(program.opts().dir)); - daemonStart(config, opts.foreground ?? false); - }); - - daemon - .command('stop') - .description('Stop the Pulse daemon') - .action(() => { - const config = loadConfig(resolveDir(program.opts().dir)); - daemonStop(config); - }); - - daemon - .command('restart') - .description('Restart the Pulse daemon') - .action(() => { - const config = loadConfig(resolveDir(program.opts().dir)); - daemonStop(config); - daemonStart(config, false); - }); - - daemon - .command('status') - .description('Show daemon status') - .action(() => { - const config = loadConfig(resolveDir(program.opts().dir)); - daemonStatus(config); - }); -} diff --git a/packages/upulse/src/commands/deploy.ts b/packages/upulse/src/commands/deploy.ts deleted file mode 100644 index 4733421..0000000 --- a/packages/upulse/src/commands/deploy.ts +++ /dev/null @@ -1,538 +0,0 @@ -/** - * commands/deploy.ts — upulse deploy promote/rollback - * - * Phase 4: Version-aware promote/rollback with event-sourced versioning. - * All state changes are append-only events. - */ - -import { execSync } from 'node:child_process'; -import { existsSync } from 'node:fs'; -import { join } from 'node:path'; -import type { Command } from 'commander'; -import { loadConfig, resolveDir } from '../config.js'; -import { daemonStart, daemonStop, isDaemonRunning } from '../daemon.js'; -import { gitExec, gitMerge, gitResetHard, gitRevert } from '../git.js'; -import { migrateToScoped } from '../migrate.js'; -import { - type EventRecord, - openOrCreateStore, - openStore, - type PulseStore, -} from '../store.js'; - -// ── Helpers ──────────────────────────────────────────────────── - -function typeCheck(cwd: string): boolean { - try { - // Pure type checking — no emit, no dist/ needed - execSync('npx tsc --noEmit', { - cwd, - encoding: 'utf-8', - stdio: ['pipe', 'pipe', 'pipe'], - }); - return true; - } catch (err: unknown) { - const msg = - err instanceof Error - ? ((err as { stderr?: string }).stderr ?? err.message) - : String(err); - console.error(`Type check failed:\n${msg}`); - return false; - } -} - -function getGitHash(cwd: string): string { - return gitExec(cwd, 'rev-parse --short HEAD'); -} - -function getCurrentCodeRev(store: PulseStore): string | undefined { - const promote = store.getLatest('promote'); - return promote?.codeRev ?? undefined; -} - -/** - * Find the previous promote event with a different code_rev than the current one. - * queryByKind returns newest-first, so find the first with a different codeRev. - */ -function findPreviousPromote( - store: PulseStore, - currentCodeRev: string | undefined, -): EventRecord | null { - const allPromotes = store.queryByKind('promote'); - for (const promote of allPromotes) { - if (promote.codeRev !== currentCodeRev) { - return promote; - } - } - return null; -} - -/** - * Try to run migrate/initSenses from staging source, writing events. - * Best-effort: failures are logged as warnings, not fatal. - */ -async function tryWriteMigrateEvents( - store: PulseStore, - config: { scopesDir: string; objectsDir: string }, - stagingSrcPath: string, - newCodeRev: string, -): Promise { - const migrateFile = join(stagingSrcPath, 'rules', 'migrate.ts'); - if (!existsSync(migrateFile)) return; - - try { - const mod = (await import(migrateFile)) as Record; - const migrateFn = (mod.default ?? mod.migrate) as - | ((snapshot: Record) => Record) - | undefined; - if (!migrateFn) return; - - const { createScopedStore, findEffectiveEpoch, rebuildSnapshot } = - await import('@uncaged/pulse'); - - const scopedStore = createScopedStore({ - basePath: config.scopesDir, - objectsDir: config.objectsDir, - }); - const systemStore = scopedStore.scope('_system'); - const vitalsStore = scopedStore.scope('_vitals'); - - const epoch = findEffectiveEpoch(systemStore); - // Discover sense keys from vitals scope, then fall back to collect events - let senseKeys: string[] = []; - const vitalEvents = vitalsStore.queryByKind('vital', { limit: 200 }); - senseKeys = [ - ...new Set(vitalEvents.map((e) => e.key).filter(Boolean) as string[]), - ]; - if (senseKeys.length === 0) { - const collects = systemStore.queryByKind('collect', { limit: 100 }); - senseKeys = [ - ...new Set(collects.map((e) => e.key).filter(Boolean) as string[]), - ]; - } - const currentSnapshot = rebuildSnapshot( - { system: systemStore, vitals: vitalsStore }, - senseKeys, - epoch, - ); - scopedStore.close(); - const migrated = migrateFn(currentSnapshot); - - for (const [key, data] of Object.entries(migrated)) { - if (key === 'timestamp') continue; - const value = (data as { data?: unknown })?.data ?? data; - store.appendEvent({ - occurredAt: Date.now(), - kind: 'migrate', - key, - hash: store.putObject(value), - codeRev: newCodeRev, - }); - } - console.log(' ✓ migrate events written'); - } catch (err: unknown) { - console.error( - ` ✗ migrate failed: ${err instanceof Error ? err.message : String(err)}`, - ); - console.error(' Promote aborted. Fix migrate.ts and try again.'); - process.exit(1); - } -} - -async function tryWriteInitEvents( - store: PulseStore, - stagingSrcPath: string, - newCodeRev: string, -): Promise { - const initFile = join(stagingSrcPath, 'rules', 'initSenses.ts'); - if (!existsSync(initFile)) return; - - try { - const mod = (await import(initFile)) as Record; - const initSenses = (mod.default ?? mod.initSenses) as - | Record - | undefined; - if (!initSenses || typeof initSenses !== 'object') return; - - for (const [key, data] of Object.entries(initSenses)) { - store.appendEvent({ - occurredAt: Date.now(), - kind: 'init', - key, - hash: store.putObject(data), - codeRev: newCodeRev, - }); - } - console.log(' ✓ init events written'); - } catch (err: unknown) { - console.warn( - ` ⚠ initSenses skipped: ${err instanceof Error ? err.message : String(err)}`, - ); - } -} - -// ── Command Registration ─────────────────────────────────────── - -export function registerDeployCommand(program: Command): void { - const deploy = program - .command('deploy') - .description('Deploy staging changes to engine'); - - deploy - .command('promote') - .description( - 'Merge staging → engine (tsc verify + bun test + version events + daemon reload)', - ) - .action(async () => { - const config = loadConfig(resolveDir(program.opts().dir)); - migrateToScoped(config); - const running = isDaemonRunning(config); - - // Step 1: Type-check staging - console.log('Step 1: Type-checking staging...'); - if (!typeCheck(config.staging.path)) { - console.error('Error: staging type check failed. Aborting promote.'); - process.exit(1); - } - console.log(' ✓ staging type-checks'); - - // Step 2: Run bun test in staging before merge - console.log('Step 2: Running bun test in staging...'); - try { - execSync('bun test', { - cwd: config.staging.path, - stdio: ['pipe', 'pipe', 'pipe'], - encoding: 'utf-8', - }); - console.log(' ✓ staging tests pass'); - } catch (testErr: unknown) { - const errObj = testErr as { stderr?: string; stdout?: string }; - const stderr = errObj.stderr ?? ''; - // "No tests found" is OK — staging may not have test files - if (stderr.includes('No tests found')) { - console.log(' ✓ no tests in staging (skipped)'); - } else { - console.error('Error: staging tests failed. Aborting promote.'); - if (stderr) console.error(stderr); - process.exit(1); - } - } - - // Note staging source path before merge (Bun runs .ts directly) - const stagingSrcPath = config.staging.path; - - // Step 3: Stop daemon before merge (avoid running stale code during merge) - if (running) { - console.log('Step 3: Stopping daemon before merge...'); - try { - daemonStop(config); - console.log(' ✓ daemon stopped'); - } catch (err: unknown) { - console.error( - `Warning: daemon stop failed: ${err instanceof Error ? err.message : String(err)}`, - ); - } - } - - // Step 4: Merge staging → engine - console.log('Step 4: Merging staging → engine...'); - try { - gitMerge(config.engine.path, 'staging'); - console.log(' ✓ merge successful'); - } catch (err: unknown) { - console.error( - `Error: merge failed: ${err instanceof Error ? err.message : String(err)}`, - ); - console.error('Resolve conflicts manually in engine/ and retry.'); - // Restart daemon with old code if it was running - if (running) { - try { - daemonStart(config, false); - console.log(' ✓ daemon restarted with old code'); - } catch {} - } - process.exit(1); - } - - // Step 5: Run bun install after merge (picks up new deps) - console.log('Step 5: Running bun install after merge...'); - try { - // Try --frozen-lockfile first (atomic, reproducible) - try { - execSync('bun install --frozen-lockfile', { - cwd: config.engine.path, - stdio: ['pipe', 'pipe', 'pipe'], - }); - console.log(' ✓ bun install --frozen-lockfile complete'); - } catch { - // Fallback: no lockfile or deps changed — regular install - execSync('bun install', { - cwd: config.engine.path, - stdio: 'inherit', - }); - console.log(' ✓ bun install complete (lockfile updated)'); - } - } catch { - console.error('Error: bun install failed after merge. Rolling back...'); - // Rollback: reset engine to pre-merge state - try { - gitResetHard(config.engine.path, 'HEAD~1'); - console.error( - 'Rolled back engine to previous commit. Restarting with old code...', - ); - // Write error event for monitoring - const errorStore = openOrCreateStore( - config.store.scopesDir, - config.store.objectsDir, - ); - errorStore.appendEvent({ - occurredAt: Date.now(), - kind: 'error', - meta: JSON.stringify({ - type: 'promote_rollback', - reason: 'bun_install_failed', - }), - }); - errorStore.close(); - - execSync('bun install --no-cache', { - cwd: config.engine.path, - stdio: 'inherit', - }); - } catch { - console.error( - 'Warning: rollback bun install also failed. Manual intervention required.', - ); - } - if (running) { - try { - daemonStart(config, false); - console.log(' ✓ daemon restarted with old code'); - } catch {} - } - process.exit(1); - } - - // Step 6: Type-check engine (second verification) - console.log('Step 6: Type-checking engine...'); - if (!typeCheck(config.engine.path)) { - console.error( - 'Error: engine type check failed after merge. Rolling back...', - ); - gitResetHard(config.engine.path, 'HEAD~1'); - console.error('Rolled back to previous state.'); - if (running) { - try { - daemonStart(config, false); - } catch {} - } - process.exit(1); - } - console.log(' ✓ engine type-checks'); - - // Step 7: Write version events - console.log('Step 7: Writing version events...'); - const store = openOrCreateStore( - config.store.scopesDir, - config.store.objectsDir, - ); - { - const currentCodeRev = getCurrentCodeRev(store); - const newCodeRev = getGitHash(config.engine.path); - - // Write migrate events (best-effort) - await tryWriteMigrateEvents( - store, - config.store, - stagingSrcPath, - newCodeRev, - ); - - // Write init events (best-effort) - await tryWriteInitEvents(store, stagingSrcPath, newCodeRev); - - // Write promote event (always) - store.appendEvent({ - occurredAt: Date.now(), - kind: 'promote', - codeRev: newCodeRev, - meta: JSON.stringify({ from: currentCodeRev ?? null }), - }); - console.log( - ` ✓ promote event: ${currentCodeRev ?? '(cold start)'} → ${newCodeRev}`, - ); - - store.close(); - } - - // Step 8: Restart daemon after successful promote - if (running) { - console.log('Step 8: Restarting daemon...'); - try { - daemonStart(config, false); - console.log(' ✓ daemon restarted'); - } catch (err: unknown) { - console.error( - `Warning: daemon restart failed: ${err instanceof Error ? err.message : String(err)}`, - ); - } - } - - console.log('\n✅ Promote complete!'); - }); - - deploy - .command('rollback') - .description( - 'Roll back to previous version (append-only: writes rollback event, reverts engine)', - ) - .action(() => { - const config = loadConfig(resolveDir(program.opts().dir)); - migrateToScoped(config); - const running = isDaemonRunning(config); - - // Step 1: Open store and find versions - console.log('Step 1: Finding versions...'); - const store = openStore(config.store.scopesDir, config.store.objectsDir); - if (!store) { - console.error( - 'Error: no events.db found. Cannot determine versions for rollback.', - ); - process.exit(1); - } - - const currentPromote = store.getLatest('promote'); - if (!currentPromote) { - console.error('Error: no promote event found. Nothing to roll back.'); - store.close(); - process.exit(1); - } - - const previousPromote = findPreviousPromote( - store, - currentPromote.codeRev, - ); - if (!previousPromote) { - console.error('Error: no previous version to roll back to.'); - store.close(); - process.exit(1); - } - - console.log(` Current: ${currentPromote.codeRev}`); - console.log(` Rolling back to: ${previousPromote.codeRev}`); - - // Step 2: Write rollback event (append-only, before git changes) - console.log('Step 2: Writing rollback event...'); - store.appendEvent({ - occurredAt: Date.now(), - kind: 'rollback', - codeRev: previousPromote.codeRev, - meta: JSON.stringify({ - from: currentPromote.codeRev, - to: previousPromote.codeRev, - }), - }); - console.log(' ✓ rollback event written'); - store.close(); - - // Step 3: Stop daemon before revert - if (running) { - console.log('Step 3: Stopping daemon before revert...'); - try { - daemonStop(config); - console.log(' ✓ daemon stopped'); - } catch (err: unknown) { - console.error( - `Warning: daemon stop failed: ${err instanceof Error ? err.message : String(err)}`, - ); - } - } - - // Step 4: Revert engine git commit - console.log('Step 4: Reverting engine commit...'); - try { - gitRevert(config.engine.path, 'HEAD'); - console.log(' ✓ reverted'); - } catch (err: unknown) { - console.error( - `Error: revert failed: ${err instanceof Error ? err.message : String(err)}`, - ); - // Restart daemon if it was running - if (running) { - try { - daemonStart(config, false); - console.log(' ✓ daemon restarted'); - } catch {} - } - process.exit(1); - } - - // Step 5: Run bun install after revert - console.log('Step 5: Running bun install after revert...'); - try { - // Try --frozen-lockfile first (atomic, reproducible) - try { - execSync('bun install --frozen-lockfile', { - cwd: config.engine.path, - stdio: ['pipe', 'pipe', 'pipe'], - }); - console.log(' ✓ bun install --frozen-lockfile complete'); - } catch { - execSync('bun install', { - cwd: config.engine.path, - stdio: 'inherit', - }); - console.log(' ✓ bun install complete (lockfile updated)'); - } - } catch (installErr: unknown) { - console.error( - `Error: bun install failed after revert: ${installErr instanceof Error ? installErr.message : String(installErr)}`, - ); - // Write warning event for monitoring - try { - const warningStore = openOrCreateStore( - config.store.scopesDir, - config.store.objectsDir, - ); - warningStore.appendEvent({ - occurredAt: Date.now(), - kind: 'warn', - meta: JSON.stringify({ - type: 'rollback_install_failed', - reason: - installErr instanceof Error - ? installErr.message - : String(installErr), - }), - }); - warningStore.close(); - } catch { - /* best-effort */ - } - process.exit(1); - } - - // Step 6: Type-check engine - console.log('Step 6: Type-checking engine...'); - if (!typeCheck(config.engine.path)) { - console.error('Error: engine type check failed after revert.'); - process.exit(1); - } - console.log(' ✓ engine type-checks'); - - // Step 7: Restart daemon if it was running - if (running) { - console.log('Step 7: Restarting daemon...'); - try { - daemonStart(config, false); - console.log(' ✓ daemon restarted'); - } catch (err: unknown) { - console.error( - `Warning: daemon restart failed: ${err instanceof Error ? err.message : String(err)}`, - ); - } - } - - console.log('\n✅ Rollback complete!'); - }); -} diff --git a/packages/upulse/src/commands/dev.ts b/packages/upulse/src/commands/dev.ts deleted file mode 100644 index 4fc5149..0000000 --- a/packages/upulse/src/commands/dev.ts +++ /dev/null @@ -1,52 +0,0 @@ -/** - * commands/dev.ts — upulse dev path/build - */ - -import { execSync } from 'node:child_process'; -import { join } from 'node:path'; -import type { Command } from 'commander'; -import { loadConfig, resolveDir } from '../config.js'; - -export function registerDevCommand(program: Command): void { - const dev = program - .command('dev') - .description('Development commands for staging'); - - dev - .command('path') - .description('Output staging/ or engine/ absolute path') - .option('--engine', 'Output engine path instead of staging') - .action((opts: { engine?: boolean }) => { - const config = loadConfig(resolveDir(program.opts().dir)); - if (opts.engine) { - process.stdout.write(`${config.engine.path}\n`); - } else { - process.stdout.write(`${config.staging.path}\n`); - } - }); - - dev - .command('build') - .description('tsc compile staging/') - .action(() => { - const config = loadConfig(resolveDir(program.opts().dir)); - const tscBin = join( - config.engine.path, - 'node_modules', - 'typescript', - 'bin', - 'tsc', - ); - console.log(`Compiling staging (${config.staging.path})...`); - try { - execSync(`node ${tscBin}`, { - cwd: config.staging.path, - stdio: 'inherit', - }); - console.log('✓ Build successful'); - } catch { - console.error('Error: tsc compilation failed.'); - process.exit(1); - } - }); -} diff --git a/packages/upulse/src/commands/gc.ts b/packages/upulse/src/commands/gc.ts deleted file mode 100644 index 1568cd0..0000000 --- a/packages/upulse/src/commands/gc.ts +++ /dev/null @@ -1,126 +0,0 @@ -/** - * commands/gc.ts — upulse gc vitals - * - * Housekeeping for vitals: archive old records, downsample. - * Vitals are stored as events (kind='vital') in the _vitals scope. - */ - -import type { Command } from 'commander'; -import { loadConfig, resolveDir } from '../config.js'; -import { migrateToScoped } from '../migrate.js'; -import { openScopedStore } from '../store.js'; - -function parseDuration(s: string): number { - const match = s.match(/^(\d+)(ms|s|m|h|d)?$/); - if (!match) { - console.error( - `Error: invalid duration "${s}". Use format: 7d, 24h, 30m, 60s, 1000ms`, - ); - process.exit(1); - } - const val = parseInt(match[1], 10); - const unit = match[2] ?? 'd'; - const multiplier: Record = { - ms: 1, - s: 1000, - m: 60_000, - h: 3_600_000, - d: 86_400_000, - }; - return val * (multiplier[unit] ?? 86_400_000); -} - -export function registerGcCommand(program: Command): void { - const gc = program - .command('gc') - .description('Housekeeping: archive and downsample vitals'); - - gc.command('vitals') - .description('Archive old vitals and optionally downsample') - .option( - '--keep ', - 'Delete vitals older than this (default: 7d)', - '7d', - ) - .option( - '--downsample ', - 'Downsample interval (e.g., 1m, 5m). If omitted, no downsampling.', - ) - .option('--dry-run', 'Show what would happen without making changes', false) - .action((opts: { keep: string; downsample?: string; dryRun: boolean }) => { - const config = loadConfig(resolveDir(program.opts().dir)); - migrateToScoped(config); - const scopedStore = openScopedStore( - config.store.scopesDir, - config.store.objectsDir, - ); - - if (!scopedStore) { - console.log('No scopes directory found. Daemon may not have run yet.'); - return; - } - - const vitalsStore = scopedStore.scope('_vitals'); - - const keepMs = parseDuration(opts.keep); - const olderThan = Date.now() - keepMs; - const olderThanStr = new Date(olderThan).toISOString(); - - if (opts.dryRun) { - console.log('Dry run — no changes will be made.\n'); - } - - console.log(`Archive threshold: ${opts.keep} (before ${olderThanStr})`); - - if (opts.downsample) { - const intervalMs = parseDuration(opts.downsample); - console.log( - `Downsample interval: ${opts.downsample} (${intervalMs}ms)`, - ); - - if (!opts.dryRun) { - console.log('\n[1/2] Downsampling vitals...'); - // Discover distinct keys from recent vital events - const recentVitals = vitalsStore.queryByKind('vital', { limit: 200 }); - const keys = [ - ...new Set( - recentVitals.map((e) => e.key).filter(Boolean) as string[], - ), - ]; - - let totalDownsampled = 0; - for (const key of keys) { - const deleted = vitalsStore.downsampleEvents( - 'vital', - key, - intervalMs, - olderThan, - ); - totalDownsampled += deleted; - } - console.log( - ` Downsampled ${totalDownsampled} vital(s) across ${keys.length} key(s).`, - ); - - console.log('\n[2/2] Archiving vitals...'); - const archived = vitalsStore.archiveEvents(olderThan); - console.log( - ` Archived ${archived} vital(s) older than ${olderThanStr}.`, - ); - } else { - console.log('\n(dry-run) Would downsample and archive vitals.'); - } - } else { - if (!opts.dryRun) { - const archived = vitalsStore.archiveEvents(olderThan); - console.log( - `\nArchived ${archived} vital(s) older than ${olderThanStr}.`, - ); - } else { - console.log('\n(dry-run) Would archive vitals.'); - } - } - - scopedStore.close(); - }); -} diff --git a/packages/upulse/src/commands/inspect.ts b/packages/upulse/src/commands/inspect.ts index ce9f3ef..20887bf 100644 --- a/packages/upulse/src/commands/inspect.ts +++ b/packages/upulse/src/commands/inspect.ts @@ -1,124 +1,94 @@ /** - * commands/inspect.ts — upulse inspect errors/ticks - * - * Reads from the unified `events` table via PulseStore.queryByKind(). + * commands/inspect.ts — Inspect workflow daemon store (events + CAS objects) */ import type { Command } from 'commander'; import { loadConfig, resolveDir } from '../config.js'; -import { migrateToScoped } from '../migrate.js'; -import { openStore } from '../store.js'; - -function parseDuration(s: string): number { - const match = s.match(/^(\d+)(m|h|d|s)?$/); - if (!match) { - console.error( - `Error: invalid duration "${s}". Use format: 1h, 30m, 7d, 3600s`, - ); - process.exit(1); - } - const val = parseInt(match[1], 10); - const unit = match[2] ?? 'm'; - const multiplier: Record = { - s: 1000, - m: 60_000, - h: 3_600_000, - d: 86_400_000, - }; - return val * (multiplier[unit] ?? 60_000); -} +import { + openOrCreateWorkflowsStore, + openWorkflowsStore, + type EventRecord, +} from '../store.js'; export function registerInspectCommand(program: Command): void { const inspect = program .command('inspect') - .description('Inspect Pulse runtime state and history'); + .description('Inspect workflow events and CAS objects'); inspect - .command('errors') - .description('Show recent errors') - .option('--since ', 'Time window (e.g., 1h, 30m)', '1h') - .action((opts: { since: string }) => { + .command('events') + .description('List events from workflows.db') + .option('--topic ', 'Filter by topic key') + .option('--kind ', 'Filter by event kind (e.g. meta.architect)') + .option('--limit ', 'Max rows', '20') + .action((opts: { topic?: string; kind?: string; limit: string }) => { const config = loadConfig(resolveDir(program.opts().dir)); - migrateToScoped(config); - const store = openStore(config.store.scopesDir, config.store.objectsDir); + const store = openWorkflowsStore(config); + const limit = Math.max(1, parseInt(opts.limit, 10) || 20); if (!store) { - console.log('No events.db found. Daemon may not have run yet.'); + console.log('No workflows.db yet. Run the workflow daemon or `upulse workflow submit`.'); return; } - const sinceMs = Date.now() - parseDuration(opts.since); - const errors = store.queryByKind('error', { since: sinceMs }); + let rows: EventRecord[]; - if (errors.length === 0) { - console.log(`No errors in the last ${opts.since}.`); + if (opts.kind) { + rows = store.queryByKind(opts.kind, { + key: opts.topic, + limit, + }); + } else if (opts.topic) { + const pool = store + .getAfter(0, { key: opts.topic }) + .sort((a, b) => b.occurredAt - a.occurredAt || b.id - a.id) + .slice(0, limit); + rows = pool; } else { - console.log(`Errors (last ${opts.since}):`); - console.log(''); - for (const err of errors) { - const ts = new Date(err.occurredAt).toISOString(); - const key = err.key ? ` [${err.key}]` : ''; - const detail = err.meta ? ` ${err.meta}` : ''; - console.log(` ${ts}${key}${detail}`); - } - console.log(`\n Total: ${errors.length} error(s)`); + rows = store.getRecent(limit); } - store.close(); - }); - - inspect - .command('ticks') - .description('Show recent ticks') - .option('--limit ', 'Number of ticks to show', '10') - .action((opts: { limit: string }) => { - const config = loadConfig(resolveDir(program.opts().dir)); - migrateToScoped(config); - const store = openStore(config.store.scopesDir, config.store.objectsDir); - - if (!store) { - console.log('No events.db found. Daemon may not have run yet.'); - return; - } - - const limit = parseInt(opts.limit, 10) || 10; - const ticks = store.queryByKind('tick', { limit }); - - if (ticks.length === 0) { - console.log('No ticks recorded yet.'); + if (rows.length === 0) { + console.log('No matching events.'); } else { - console.log(`Recent ticks (last ${limit}):`); - console.log(''); console.log( - ' Timestamp | tickMs | Effects | Duration', + 'id'.padEnd(6) + + 'occurredAt (ISO)'.padEnd(28) + + 'kind'.padEnd(28) + + 'key', ); - console.log( - ' ------------------------------------------------------------------', - ); - for (const tick of ticks) { - const ts = new Date(tick.occurredAt).toISOString(); - let tickMs = '-'; - let effectCount = '-'; - let durationMs = '-'; - if (tick.meta) { - try { - const meta = JSON.parse(tick.meta); - if (meta.tick_ms !== undefined) tickMs = String(meta.tick_ms); - if (meta.effect_count !== undefined) - effectCount = String(meta.effect_count); - if (meta.duration_ms !== undefined) - durationMs = `${meta.duration_ms}ms`; - } catch { - // ignore parse errors - } - } + console.log('-'.repeat(100)); + for (const e of rows) { + const ts = new Date(e.occurredAt).toISOString(); + const k = e.key ?? ''; console.log( - ` ${ts} | ${tickMs.padStart(6)} | ${effectCount.padStart(7)} | ${durationMs}`, + `${String(e.id).padEnd(6)}${ts.padEnd(28)}${e.kind.padEnd(28)}${k}`, ); } - console.log(`\n Total: ${ticks.length} tick(s)`); + console.log(`\nTotal: ${rows.length}`); } store.close(); }); + + inspect + .command('object') + .description('Read a CAS object by hash') + .argument('', 'Object hash (32 hex chars)') + .action((hash: string) => { + const config = loadConfig(resolveDir(program.opts().dir)); + const store = openOrCreateWorkflowsStore(config); + + const obj = store.getObject(hash); + store.close(); + + if (obj === null) { + console.error(`No object for hash: ${hash}`); + process.exit(1); + } + + console.log( + typeof obj === 'string' ? obj : JSON.stringify(obj, null, 2), + ); + }); } diff --git a/packages/upulse/src/commands/list.ts b/packages/upulse/src/commands/list.ts deleted file mode 100644 index cc232ae..0000000 --- a/packages/upulse/src/commands/list.ts +++ /dev/null @@ -1,70 +0,0 @@ -/** - * commands/list.ts — upulse list (show current rule chain) - */ - -import { existsSync, readdirSync, readFileSync } from 'node:fs'; -import { join } from 'node:path'; -import type { Command } from 'commander'; -import { loadConfig, resolveDir } from '../config.js'; - -export function registerListCommand(program: Command): void { - program - .command('list') - .description('Show current rule chain (from engine)') - .action(() => { - const config = loadConfig(resolveDir(program.opts().dir)); - const rulesDir = join(config.engine.path, 'rules'); - - if (!existsSync(rulesDir)) { - console.error('Error: rules/ directory not found in engine.'); - process.exit(1); - } - - // List rule files sorted by name (number prefix = order) - const files = readdirSync(rulesDir) - .filter((f) => f.endsWith('.ts')) - .sort(); - - if (files.length === 0) { - console.log('No rules found.'); - return; - } - - console.log('Rule chain:'); - console.log(''); - - for (let i = 0; i < files.length; i++) { - const file = files[i]; - const filePath = join(rulesDir, file); - - // Try to extract a description from comments - const content = readFileSync(filePath, 'utf-8'); - const commentMatch = content.match(/\/\/\s*(.+)/); - const desc = commentMatch ? commentMatch[1].trim() : '(no description)'; - - console.log(` ${i + 1}. ${file}`); - console.log(` ${desc}`); - } - - // Also check pulse.config.ts for inline rules - const configFile = join(config.engine.path, 'pulse.config.ts'); - if (existsSync(configFile)) { - const configContent = readFileSync(configFile, 'utf-8'); - const rulesMatch = configContent.match( - /const rules\s*=\s*\[([\s\S]*?)\]/, - ); - if (rulesMatch) { - console.log(`\n Inline rules in pulse.config.ts:`); - const inlineRules = rulesMatch[1] - .split('\n') - .map((l) => l.trim()) - .filter((l) => l && !l.startsWith('//') && !l.startsWith(']')); - for (const rule of inlineRules) { - console.log(` ${rule.replace(/,\s*$/, '')}`); - } - } - } - - console.log(`\n Total: ${files.length} rule file(s)`); - }); -} diff --git a/packages/upulse/src/commands/status.ts b/packages/upulse/src/commands/status.ts new file mode 100644 index 0000000..bd8aada --- /dev/null +++ b/packages/upulse/src/commands/status.ts @@ -0,0 +1,99 @@ +/** + * commands/status.ts — systemd workflow service + workflows.db summary + */ + +import { execSync } from 'node:child_process'; +import { Database } from 'bun:sqlite'; +import type { Command } from 'commander'; +import { loadConfig, resolveDir } from '../config.js'; +import { workflowDataPaths } from '../store.js'; + +function systemctlStatus(unit: string): string { + try { + return execSync(`systemctl --user status ${unit} --no-pager 2>&1`, { + encoding: 'utf-8', + maxBuffer: 1024 * 1024, + }); + } catch (err) { + const e = err as { stdout?: Buffer; stderr?: Buffer; message?: string }; + return ( + (e.stdout && e.stdout.toString()) || + (e.stderr && e.stderr.toString()) || + e.message || + String(err) + ); + } +} + +export function registerStatusCommand(program: Command): void { + program + .command('status') + .description('Workflow daemon systemd status and store summary') + .action(() => { + const config = loadConfig(resolveDir(program.opts().dir)); + const { eventsDbPath } = workflowDataPaths(config); + + console.log('=== systemd (user) ===\n'); + const units = ['pulse-workflow.service', 'pulse.service']; + for (const u of units) { + console.log(`--- ${u} ---`); + console.log(systemctlStatus(u)); + console.log(''); + } + + console.log('=== workflows.db ===\n'); + console.log(`Path: ${eventsDbPath}\n`); + + try { + const db = new Database(eventsDbPath, { readonly: true }); + try { + const row = db + .query( + 'SELECT COUNT(*) AS c, MAX(occurred_at) AS m FROM events', + ) + .get() as { c: number; m: number | null }; + console.log(`Event count: ${row.c}`); + if (row.m != null) { + console.log(`Latest event: ${new Date(row.m).toISOString()}`); + } + } finally { + db.close(); + } + } catch { + console.log('(database not readable or missing)'); + } + + console.log('\n=== active topics (recent keys) ===\n'); + try { + const db = new Database(eventsDbPath, { readonly: true }); + try { + const recent = db + .query( + `SELECT key, kind, occurred_at FROM events + WHERE key IS NOT NULL AND key != '' + ORDER BY occurred_at DESC, id DESC + LIMIT 30`, + ) + .all() as { key: string; kind: string; occurred_at: number }[]; + + const seen = new Set(); + const lines: string[] = []; + for (const r of recent) { + if (seen.has(r.key)) continue; + seen.add(r.key); + lines.push( + ` ${r.key} ${r.kind} ${new Date(r.occurred_at).toISOString()}`, + ); + if (lines.length >= 10) break; + } + console.log(lines.length ? lines.join('\n') : ' (none)'); + } finally { + db.close(); + } + } catch { + console.log(' (could not query topics)'); + } + + console.log(''); + }); +} diff --git a/packages/upulse/src/commands/tick.ts b/packages/upulse/src/commands/tick.ts deleted file mode 100644 index ebb1c03..0000000 --- a/packages/upulse/src/commands/tick.ts +++ /dev/null @@ -1,212 +0,0 @@ -/** - * commands/tick.ts — upulse tick [--dry-run] [--verbose] - * - * Manually trigger one tick using snapshots rebuilt from vitals/events. - * - * tick is a pure "read → decide → execute" operation: - * 1. Rebuild snapshot from existing vitals/events (no collection) - * 2. Run rules to produce effects - * 3. Execute effects (unless --dry-run) - * - * Data collection is the watcher's responsibility (via daemon). - * If the store is empty, tick sees an empty snapshot — rules will - * produce collect effects, which are logged but not executed. - */ - -import { existsSync } from 'node:fs'; -import { join } from 'node:path'; -import { pathToFileURL } from 'node:url'; -import type { Command } from 'commander'; -import { loadConfig, resolveDir } from '../config.js'; -import { migrateToScoped } from '../migrate.js'; - -export function registerTickCommand(program: Command): void { - program - .command('tick') - .description('Manually trigger one tick') - .option('--dry-run', 'Calculate effects without executing') - .option('--verbose', 'Show each rule input/output') - .action(async (opts: { dryRun?: boolean; verbose?: boolean }) => { - const config = loadConfig(resolveDir(program.opts().dir)); - migrateToScoped(config); - const enginePath = config.engine.path; - - try { - // Import @uncaged/pulse from engine's node_modules - const pulseModulePath = join( - enginePath, - 'node_modules', - '@uncaged', - 'pulse', - 'dist', - 'index.js', - ); - let rebuildSnapshot: ( - store: unknown, - senseKeys: string[], - epoch?: unknown, - ) => unknown; - let findEffectiveEpoch: (store: unknown) => unknown; - let createScopedStore: (options: { - basePath: string; - objectsDir: string; - }) => { scope(name: string): unknown; close(): void }; - let composeRules: ( - rules: unknown[], - defaultTickMs?: number, - ) => (prev: unknown, curr: unknown) => Promise<[unknown[], number]>; - - if (existsSync(pulseModulePath)) { - const pulseModule = await import(pathToFileURL(pulseModulePath).href); - rebuildSnapshot = pulseModule.rebuildSnapshot; - findEffectiveEpoch = pulseModule.findEffectiveEpoch; - createScopedStore = pulseModule.createScopedStore; - composeRules = pulseModule.composeRules; - } else { - console.error( - 'Error: @uncaged/pulse not found in engine node_modules.', - ); - console.error('Run: cd ~/.upulse/engine && bun install'); - process.exit(1); - } - - // Import rules (dynamically discover .ts rule files) - const rulesDir = join(enginePath, 'rules'); - const ruleFiles: string[] = []; - if (existsSync(rulesDir)) { - const { readdirSync } = await import('node:fs'); - ruleFiles.push( - ...readdirSync(rulesDir) - .filter((f: string) => f.endsWith('.ts')) - .sort(), - ); - } - - const rules: unknown[] = []; - for (const file of ruleFiles) { - const rulePath = pathToFileURL(join(rulesDir, file)).href; - const ruleModule = await import(rulePath); - rules.push(ruleModule.default); - } - - // Open or create store via ScopedStore - const scopedStore = createScopedStore({ - basePath: config.store.scopesDir, - objectsDir: config.store.objectsDir, - }); - const store = scopedStore.scope('_system') as { - hasEvents(): boolean; - appendEvent(e: Record): unknown; - putObject(d: unknown): string; - queryByKind( - kind: string, - opts?: { limit?: number }, - ): Array<{ key?: string }>; - }; - - // Discover sense keys from existing collect events in the store - const collects = store.queryByKind('collect', { limit: 100 }); - const senseKeys = [ - ...new Set(collects.map((e) => e.key).filter(Boolean) as string[]), - ]; - - if (senseKeys.length === 0 && opts.verbose) { - console.log( - 'Note: no collect events found in store. Snapshot will be empty.', - ); - console.log( - 'Start the daemon first to collect data, or rules will emit collect effects.', - ); - } - - // Determine version epoch - const epoch = findEffectiveEpoch(store); - - // Rebuild snapshot from vitals/events — no collection - const snapshot = rebuildSnapshot(store, senseKeys, epoch) as { - timestamp: number; - }; - - console.log('--- Snapshot ---'); - console.log(JSON.stringify(snapshot, null, 2)); - - // Run rule chain (prev = curr for single-tick: no time delta) - const pulse = composeRules(rules, 15000); - const [effects, tickMs] = await pulse(snapshot, snapshot); - - if (opts.verbose) { - console.log('\n--- Rules applied ---'); - console.log(` Rules count: ${rules.length}`); - for (let i = 0; i < ruleFiles.length; i++) { - console.log(` [${i}] ${ruleFiles[i]}`); - } - } - - console.log('\n--- Result ---'); - console.log(`Effects (${effects.length}):`); - if (effects.length === 0) { - console.log(' (none)'); - } else { - for (const e of effects) { - console.log(` ${JSON.stringify(e)}`); - } - } - console.log(`Next tick: ${tickMs}ms`); - - if (opts.dryRun) { - console.log('\n(dry-run: effects not executed)'); - } else { - // Execute effects with built-in handlers - for (const effect of effects) { - const e = effect as { - kind: string; - message?: string; - key?: string; - }; - switch (e.kind) { - case 'collect': - console.log( - ` [COLLECT] ${e.key} (skipped — start daemon for live collection)`, - ); - break; - case 'log': - console.log(` [LOG] ${e.message}`); - break; - default: - console.log(` [EFFECT] ${JSON.stringify(e)}`); - break; - } - } - if (effects.length > 0) { - console.log('\n✓ Effects handled (tick mode)'); - } - - // Record events like runPulse does - if (effects.length > 0) { - const effectsHash = store.putObject(effects); - store.appendEvent({ - occurredAt: Date.now(), - kind: 'effect', - hash: effectsHash, - meta: JSON.stringify({ count: effects.length }), - }); - } - - // Record tick event - store.appendEvent({ - occurredAt: Date.now(), - kind: 'tick', - meta: JSON.stringify({ - tick_ms: tickMs, - effect_count: effects.length, - }), - }); - } - } catch (err: unknown) { - console.error( - `Error during tick: ${err instanceof Error ? err.message : String(err)}`, - ); - process.exit(1); - } - }); -} diff --git a/packages/upulse/src/commands/ui.ts b/packages/upulse/src/commands/ui.ts deleted file mode 100644 index d10ccd5..0000000 --- a/packages/upulse/src/commands/ui.ts +++ /dev/null @@ -1,46 +0,0 @@ -/** - * commands/ui.ts — upulse ui [--port] [--no-open] - * - * Start a local WebUI dashboard for monitoring Pulse. - */ - -import type { Command } from 'commander'; -import { loadConfig, resolveDir } from '../config.js'; -import { createUIServer } from '../ui/server.js'; - -export function registerUICommand(program: Command): void { - program - .command('ui') - .description('Start local WebUI dashboard') - .option('--port ', 'HTTP port', '3140') - .option('--no-open', 'Do not open browser automatically') - .action(async (opts: { port: string; open: boolean }) => { - const config = loadConfig(resolveDir(program.opts().dir)); - const port = parseInt(opts.port, 10); - - const server = createUIServer(config, port); - - console.log(`\n ⚡ Pulse UI running at http://localhost:${port}\n`); - - if (opts.open) { - try { - const { execSync } = await import('node:child_process'); - const cmd = - process.platform === 'darwin' - ? 'open' - : process.platform === 'win32' - ? 'start' - : 'xdg-open'; - execSync(`${cmd} http://localhost:${port}`, { stdio: 'ignore' }); - } catch { - // Ignore — headless environments - } - } - - // Keep alive - process.on('SIGINT', () => { - server.stop(); - process.exit(0); - }); - }); -} diff --git a/packages/upulse/src/commands/workflow.test.ts b/packages/upulse/src/commands/workflow.test.ts index d695fa7..3b6d1f1 100644 --- a/packages/upulse/src/commands/workflow.test.ts +++ b/packages/upulse/src/commands/workflow.test.ts @@ -1,171 +1,140 @@ /** - * workflow.test.ts — Tests for upulse workflow create/list logic (message chain model). + * workflow.test.ts — workflows.db 模型:__start__、按 workflow 分组 list */ import { afterEach, beforeEach, describe, expect, it } from 'bun:test'; -import { mkdtempSync, rmSync } from 'node:fs'; +import { mkdirSync, rmSync } from 'node:fs'; import { tmpdir } from 'node:os'; import { join } from 'node:path'; -import { - createScopedStore, - type PulseStore, - type ScopedStore, -} from '@uncaged/pulse'; +import { createStore, type PulseStore } from '@uncaged/pulse'; -describe('workflow', () => { +function parseWorkflowKind( + kind: string, +): { wf: string; role: string } | null { + const i = kind.lastIndexOf('.'); + if (i <= 0) return null; + return { wf: kind.slice(0, i), role: kind.slice(i + 1) }; +} + +function workflowForTopic( + events: { kind: string; key?: string }[], + topicKey: string, +): string | null { + for (const e of events) { + if (e.key !== topicKey) continue; + const p = parseWorkflowKind(e.kind); + if (p?.role === '__start__') return p.wf; + } + return null; +} + +describe('workflow store (daemon model)', () => { let tmpDir: string; - let scopedStore: ScopedStore; let store: PulseStore; beforeEach(() => { - tmpDir = mkdtempSync(join(tmpdir(), 'pulse-topic-test-')); - scopedStore = createScopedStore({ - basePath: join(tmpDir, 'scopes'), - objectsDir: join(tmpDir, 'objects'), - }); - store = scopedStore.scope('_system'); + tmpDir = join(tmpdir(), `upulse-wf-test-${Date.now()}`); + mkdirSync(tmpDir, { recursive: true }); + const eventsDbPath = join(tmpDir, 'workflows.db'); + const objectsDir = join(tmpDir, 'objects'); + store = createStore({ eventsDbPath, objectsDir }); }); afterEach(() => { - scopedStore.close(); + store.close(); rmSync(tmpDir, { recursive: true, force: true }); }); - it('create topic writes coding.user event with CAS', () => { - const topicId = 'fix-login-bug-a3k'; - const hash = store.putObject({ - content: 'The login page crashes on submit', - artifacts: { title: 'Fix login bug', repoDir: '/tmp/repo' }, - }); + it('__start__ stores task content in CAS', () => { + const topicKey = 'task-a'; + const hash = store.putObject('hello task'); store.appendEvent({ occurredAt: Date.now(), - kind: 'coding.user', - key: topicId, + kind: 'coding.__start__', + key: topicKey, hash, }); - const events = store.queryByKind('coding.user'); - expect(events.length).toBe(1); - expect(events[0].key).toBe(topicId); - expect(events[0].hash).toBeTruthy(); - - const content = store.getObject(events[0].hash!) as any; - expect(content.artifacts.title).toBe('Fix login bug'); - expect(content.content).toBe('The login page crashes on submit'); - expect(content.artifacts.repoDir).toBe('/tmp/repo'); + const ev = store.queryByKind('coding.__start__', { key: topicKey }); + expect(ev.length).toBe(1); + expect(store.getObject(ev[0].hash!)).toBe('hello task'); }); - it('list topics shows created topic', () => { - const topicId = 'add-feature-x-b2c'; - const hash = store.putObject({ - content: 'Implement feature X', - artifacts: { title: 'Add feature X', repoDir: '/tmp/repo' }, + it('list grouping uses __start__ workflow prefix and latest kind', () => { + const k1 = 'topic-1'; + const k2 = 'topic-2'; + const ts = Date.now(); + store.appendEvent({ + occurredAt: ts, + kind: 'coding.__start__', + key: k1, + hash: store.putObject('t1'), }); + store.appendEvent({ + occurredAt: ts + 1, + kind: 'coding.architect', + key: k1, + hash: store.putObject('arch'), + }); + store.appendEvent({ + occurredAt: ts + 2, + kind: 'meta.__start__', + key: k2, + hash: store.putObject('m'), + }); + + const events = store.getAfter(0); + const startByKey = new Map(); + for (const e of events) { + if (!e.key) continue; + const p = parseWorkflowKind(e.kind); + if (p?.role === '__start__') startByKey.set(e.key, p.wf); + } + expect(startByKey.get(k1)).toBe('coding'); + expect(startByKey.get(k2)).toBe('meta'); + + expect(workflowForTopic(events, k1)).toBe('coding'); + expect(workflowForTopic(events, k2)).toBe('meta'); + + const latestByKey = new Map(); + for (const e of events) { + if (!e.key) continue; + const cur = latestByKey.get(e.key); + if ( + !cur || + e.occurredAt > cur.occurredAt || + (e.occurredAt === cur.occurredAt && e.id > cur.id) + ) { + latestByKey.set(e.key, e); + } + } + expect(latestByKey.get(k1)?.kind).toBe('coding.architect'); + expect(latestByKey.get(k2)?.kind).toBe('meta.__start__'); + }); + + it('list without --all omits topics whose latest event is only __start__', () => { + const hash = store.putObject('pending'); store.appendEvent({ occurredAt: Date.now(), - kind: 'coding.user', - key: topicId, + kind: 'meta.__start__', + key: 'stuck-topic', hash, }); - - const allEvents = store - .getAfter(0) - .filter((e) => e.kind.startsWith('coding.')); - expect(allEvents.length).toBe(1); - - const topics = new Map< - string, - { title: string; role: string; lastUpdated: number } - >(); - for (const event of allEvents) { - const tid = event.key; - if (!tid) continue; - const role = event.kind.replace('coding.', ''); - let title = tid; - if (role === 'user' && event.hash) { - try { - const obj = store.getObject(event.hash) as any; - if (obj?.artifacts?.title) title = obj.artifacts.title; - } catch {} - } - topics.set(tid, { title, role, lastUpdated: event.occurredAt }); - } - - expect(topics.size).toBe(1); - const t = topics.get(topicId)!; - expect(t.title).toBe('Add feature X'); - expect(t.role).toBe('user'); - }); - - it('list without --all hides closed topics', () => { - const now = Date.now(); - - // Topic 1: active (user) - const h1 = store.putObject({ - content: 'Active', - artifacts: { title: 'Active Topic' }, - }); - store.appendEvent({ - occurredAt: now, - kind: 'coding.user', - key: 'active-topic-1a2', - hash: h1, - }); - - // Topic 2: closed - const h2 = store.putObject({ - content: 'Closed', - artifacts: { title: 'Closed Topic' }, - }); - store.appendEvent({ - occurredAt: now, - kind: 'coding.user', - key: 'closed-topic-3b4', - hash: h2, - }); - const h3 = store.putObject({ content: 'Done' }); - store.appendEvent({ - occurredAt: now + 1, - kind: 'coding.closer', - key: 'closed-topic-3b4', - hash: h3, - }); - - const allEvents = store - .getAfter(0) - .filter((e) => e.kind.startsWith('coding.')); - - const topics = new Map< - string, - { title: string; role: string; lastUpdated: number } - >(); - for (const event of allEvents) { - const tid = event.key; - if (!tid) continue; - const role = event.kind.replace('coding.', ''); - let title = tid; - if (role === 'user' && event.hash) { - try { - const obj = store.getObject(event.hash) as any; - if (obj?.artifacts?.title) title = obj.artifacts.title; - } catch {} - } - const existing = topics.get(tid); - if (!existing) { - topics.set(tid, { title, role, lastUpdated: event.occurredAt }); - } else { - existing.role = role; - if (event.occurredAt > existing.lastUpdated) - existing.lastUpdated = event.occurredAt; + const events = store.getAfter(0); + const latestByKey = new Map(); + for (const e of events) { + if (!e.key) continue; + const cur = latestByKey.get(e.key); + if ( + !cur || + e.occurredAt > cur.occurredAt || + (e.occurredAt === cur.occurredAt && e.id > cur.id) + ) { + latestByKey.set(e.key, e); } } - - // Without --all: only non-closer - const active = [...topics.entries()].filter(([, t]) => t.role !== 'closer'); - expect(active.length).toBe(1); - expect(active[0][0]).toBe('active-topic-1a2'); - - // With --all: all topics - expect(topics.size).toBe(2); + const last = latestByKey.get('stuck-topic')!; + const p = parseWorkflowKind(last.kind); + expect(p?.role === '__start__').toBe(true); }); }); diff --git a/packages/upulse/src/commands/workflow.ts b/packages/upulse/src/commands/workflow.ts index 63dd194..3898c6c 100644 --- a/packages/upulse/src/commands/workflow.ts +++ b/packages/upulse/src/commands/workflow.ts @@ -1,54 +1,67 @@ /** - * commands/workflow.ts — upulse workflow create/list + * commands/workflow.ts — Workflow daemon CLI (list / timeline / submit / init) * - * Manage coding workflows via the routine scope store. - * Events follow the coding.{role} convention (message chain model). + * Event kinds: `{workflow}.{role}` with `{workflow}.__start__` to begin a topic. */ -import { randomBytes } from 'node:crypto'; +import { createRequire } from 'node:module'; +import { existsSync, readFileSync } from 'node:fs'; +import { dirname, join } from 'node:path'; +import { pathToFileURL } from 'node:url'; import type { Command } from 'commander'; import { loadConfig, resolveDir } from '../config.js'; -import { migrateToScoped } from '../migrate.js'; -import { openOrCreateScopedStore, openScopedStore } from '../store.js'; +import type { EventRecord } from '../store.js'; +import { openOrCreateWorkflowsStore, openWorkflowsStore } from '../store.js'; -function slugify(text: string): string { - return text - .toLowerCase() - .replace(/[^a-z0-9]+/g, '-') - .replace(/^-+|-+$/g, '') - .slice(0, 48); +function parseWorkflowKind( + kind: string, +): { wf: string; role: string } | null { + const i = kind.lastIndexOf('.'); + if (i <= 0) return null; + return { wf: kind.slice(0, i), role: kind.slice(i + 1) }; } -function shortId(): string { - return randomBytes(2).toString('hex').slice(0, 3); -} - -type CodingRole = 'user' | 'architect' | 'coder' | 'reviewer' | 'closer'; - -function kindToRole(kind: string): CodingRole { - return kind.replace('coding.', '') as CodingRole; +function workflowForTopic( + events: EventRecord[], + topicKey: string, +): string | null { + for (const e of events) { + if (e.key !== topicKey) continue; + const p = parseWorkflowKind(e.kind); + if (p?.role === '__start__') return p.wf; + } + return null; } export function registerWorkflowCommand(program: Command): void { const topic = program .command('workflow') - .description('Manage coding workflows'); + .description('Inspect and submit workflow tasks'); topic .command('init ') .description('Scaffold a new workflow (generates skeleton files)') .option('--roles ', 'Comma-separated role names', 'processor') .option('--dir ', 'Workflows directory', '') - .action((name: string, opts: { roles: string; dir: string }) => { - const { - scaffoldWorkflow, - } = require('@uncaged/pulse/workflows/scaffold.js'); - const workflowsDir = - opts.dir || - require('node:path').join( - process.cwd(), - 'packages/pulse/src/workflows', + .action(async (name: string, opts: { roles: string; dir: string }) => { + const require = createRequire(import.meta.url); + const pulseRoot = dirname(require.resolve('@uncaged/pulse/package.json')); + const scaffoldPath = join(pulseRoot, 'dist/workflows/scaffold.js'); + if (!existsSync(scaffoldPath)) { + console.error( + `Missing ${scaffoldPath}. Build @uncaged/pulse first: cd packages/pulse && bun run build`, ); + process.exit(1); + } + const { scaffoldWorkflow } = (await import( + pathToFileURL(scaffoldPath).href + )) as { scaffoldWorkflow: (o: { + name: string; + roles: string[]; + workflowsDir: string; + }) => string[] }; + const workflowsDir = + opts.dir || join(process.cwd(), 'packages/pulse/src/workflows'); const roles = opts.roles .split(',') .map((r: string) => r.trim()) @@ -63,68 +76,60 @@ export function registerWorkflowCommand(program: Command): void { }); topic - .command('create ') - .description('Create a new coding workflow') - .requiredOption('--desc <description>', 'Workflow description') - .requiredOption('--repo <repoDir>', 'Repository directory') - .action((title: string, opts: { desc: string; repo: string }) => { + .command('submit') + .description('Submit a task (append {workflow}.__start__ event)') + .argument('<workflow>', 'Workflow name (e.g. meta, coding)') + .argument('<topicKey>', 'Topic key') + .argument('<taskFile>', 'Path to task file') + .action((workflow: string, topicKey: string, taskFile: string) => { const config = loadConfig(resolveDir(program.opts().dir)); - migrateToScoped(config); - const scopedStore = openOrCreateScopedStore( - config.store.scopesDir, - config.store.objectsDir, - ); - const store = scopedStore.scope('workflows'); - - const topicId = `${slugify(title)}-${shortId()}`; - - // Store full content in CAS - const hash = store.putObject({ - content: opts.desc, - artifacts: { title, repoDir: opts.repo }, - }); - + const store = openOrCreateWorkflowsStore(config); + const content = readFileSync(taskFile, 'utf-8'); + const hash = store.putObject(content); store.appendEvent({ occurredAt: Date.now(), - kind: 'coding.user', - key: topicId, + kind: `${workflow}.__start__`, + key: topicKey, hash, }); - - console.log(topicId); - scopedStore.close(); + console.log(`✅ Submitted: ${workflow}.__start__ [${topicKey}]`); + console.log(` Hash: ${hash}`); + store.close(); }); topic .command('timeline <key>') - .description('Show workflow event timeline') + .description('Show workflow event timeline for a topic key') .option('--json', 'Output as JSON') - .option('--content', 'Include content preview') + .option('--content', 'Include CAS content preview') .action((key: string, opts: { json?: boolean; content?: boolean }) => { const config = loadConfig(resolveDir(program.opts().dir)); - migrateToScoped(config); - const scopedStore = openScopedStore( - config.store.scopesDir, - config.store.objectsDir, - ); - if (!scopedStore) { - console.error('No store found.'); + const store = openWorkflowsStore(config); + + if (!store) { + console.error('workflows.db not found.'); process.exit(1); } - const store = scopedStore.scope('workflows'); - const events = store - .getAfter(0) - .filter((e) => e.kind.startsWith('coding.') && e.key === key); + + const all = store.getAfter(0); + const wf = workflowForTopic(all, key); + const events = all + .filter((e) => e.key === key) + .filter((e) => { + if (!wf) return true; + const p = parseWorkflowKind(e.kind); + return !p || p.wf === wf; + }) + .sort((a, b) => a.occurredAt - b.occurredAt || a.id - b.id); if (events.length === 0) { console.error(`No events found for key: ${key}`); - scopedStore.close(); + store.close(); process.exit(1); } const t0 = events[0].occurredAt; const entries = events.map((e, i) => { - const role = e.kind.replace('coding.', ''); const prevTime = i > 0 ? events[i - 1].occurredAt : t0; const durationMs = e.occurredAt - prevTime; const meta = e.meta ? JSON.parse(e.meta) : null; @@ -135,11 +140,13 @@ export function registerWorkflowCommand(program: Command): void { const text = typeof obj === 'string' ? obj : JSON.stringify(obj); contentPreview = text.slice(0, 200) + (text.length > 200 ? '...' : ''); - } catch {} + } catch { + /* ignore */ + } } return { id: e.id, - role, + kind: e.kind, offsetMs: e.occurredAt - t0, durationMs: i === 0 ? 0 : durationMs, meta, @@ -161,8 +168,9 @@ export function registerWorkflowCommand(program: Command): void { ? ` (${(entry.durationMs / 1000).toFixed(1)}s)` : ''; const metaStr = entry.meta ? ` ${JSON.stringify(entry.meta)}` : ''; + const kindPad = entry.kind.padEnd(28); console.log( - ` [${offset.padStart(8)}] #${String(entry.id).padStart(2)} ${entry.role.padEnd(12)}${dur}${metaStr}`, + ` [${offset.padStart(8)}] #${String(entry.id).padStart(4)} ${kindPad}${dur}${metaStr}`, ); if (entry.contentPreview) { console.log(` ${entry.contentPreview.split('\n')[0]}`); @@ -171,97 +179,85 @@ export function registerWorkflowCommand(program: Command): void { console.log(); } - scopedStore.close(); + store.close(); }); topic .command('list') - .description('List coding topics') - .option('--all', 'Include closed topics') + .description('List topics grouped by workflow (__start__) with latest role') + .option('--all', 'Include topics whose latest event is __start__ only') .action((opts: { all?: boolean }) => { const config = loadConfig(resolveDir(program.opts().dir)); - migrateToScoped(config); - const scopedStore = openScopedStore( - config.store.scopesDir, - config.store.objectsDir, - ); + const store = openWorkflowsStore(config); - if (!scopedStore) { - console.log('No store found. Run "upulse init" first.'); + if (!store) { + console.log('No workflows.db. Submit a task or run the workflow daemon.'); return; } - const store = scopedStore.scope('workflows'); + const events = store.getAfter(0); + const startByKey = new Map<string, string>(); + for (const e of events) { + if (!e.key) continue; + const p = parseWorkflowKind(e.kind); + if (p?.role === '__start__') startByKey.set(e.key, p.wf); + } - const allEvents = store - .getAfter(0) - .filter((e) => e.kind.startsWith('coding.')); + const latestByKey = new Map<string, EventRecord>(); + for (const e of events) { + if (!e.key) continue; + const cur = latestByKey.get(e.key); + if ( + !cur || + e.occurredAt > cur.occurredAt || + (e.occurredAt === cur.occurredAt && e.id > cur.id) + ) { + latestByKey.set(e.key, e); + } + } - if (allEvents.length === 0) { + type Row = { key: string; wf: string; latestKind: string; at: number }; + const rows: Row[] = []; + for (const [topicKey, last] of latestByKey) { + const p = parseWorkflowKind(last.kind); + if (!opts.all && p?.role === '__start__') continue; + const wf = startByKey.get(topicKey) ?? p?.wf ?? 'unknown'; + rows.push({ + key: topicKey, + wf, + latestKind: last.kind, + at: last.occurredAt, + }); + } + + rows.sort((a, b) => { + if (a.wf !== b.wf) return a.wf.localeCompare(b.wf); + return b.at - a.at; + }); + + if (rows.length === 0) { console.log('No topics found.'); - scopedStore.close(); + store.close(); return; } - // Group by key (topicId), track latest role - const topics = new Map< - string, - { title: string; role: CodingRole; lastUpdated: number } - >(); - - for (const event of allEvents) { - const topicId = event.key; - if (!topicId) continue; - - const role = kindToRole(event.kind); - - // Try to get title from CAS for user events - let title = topicId; - if (role === 'user' && event.hash) { - try { - const obj = store.getObject(event.hash) as any; - if (obj?.artifacts?.title) title = obj.artifacts.title; - } catch {} + let currentWf = ''; + for (const r of rows) { + if (r.wf !== currentWf) { + currentWf = r.wf; + console.log(`\n[${currentWf}]`); + console.log( + 'topicKey | latest kind | updated', + ); + console.log('-'.repeat(100)); } - - const existing = topics.get(topicId); - if (!existing) { - topics.set(topicId, { title, role, lastUpdated: event.occurredAt }); - } else { - existing.role = role; - if (event.occurredAt > existing.lastUpdated) { - existing.lastUpdated = event.occurredAt; - } - if (role === 'user' && title !== topicId) { - existing.title = title; - } - } - } - - // Filter closed unless --all - const entries = [...topics.entries()].filter( - ([, t]) => opts.all || t.role !== 'closer', - ); - - if (entries.length === 0) { - console.log('No active topics. Use --all to include closed topics.'); - scopedStore.close(); - return; - } - - console.log( - 'topicId | title | role | last updated', - ); - console.log('-'.repeat(120)); - - for (const [topicId, t] of entries) { - const ts = new Date(t.lastUpdated).toISOString(); + const ts = new Date(r.at).toISOString(); console.log( - `${topicId.padEnd(49)}| ${t.title.slice(0, 30).padEnd(31)}| ${t.role.padEnd(9)}| ${ts}`, + `${r.key.padEnd(49)}| ${r.latestKind.slice(0, 28).padEnd(29)}| ${ts}`, ); } - console.log(`\nTotal: ${entries.length} topic(s)`); - scopedStore.close(); + console.log(`\nTotal: ${rows.length} topic(s)`); + store.close(); }); } diff --git a/packages/upulse/src/daemon.ts b/packages/upulse/src/daemon.ts index f533c6f..ec343ae 100644 --- a/packages/upulse/src/daemon.ts +++ b/packages/upulse/src/daemon.ts @@ -30,7 +30,7 @@ export function daemonStart( const existingPid = readPid(config); if (existingPid !== null && isProcessAlive(existingPid)) { console.error( - `Error: daemon is already running (PID: ${existingPid}). Stop it first with "upulse daemon stop".`, + `Error: daemon is already running (PID: ${existingPid}). Stop that process or remove ${config.daemon.pidFile}.`, ); process.exit(1); } diff --git a/packages/upulse/src/e2e/helper.ts b/packages/upulse/src/e2e/helper.ts deleted file mode 100644 index 6545005..0000000 --- a/packages/upulse/src/e2e/helper.ts +++ /dev/null @@ -1,157 +0,0 @@ -/** - * E2E Test Helper — shared utilities for upulse E2E tests. - * - * Provides isolated temp directories, CLI runner, and DB query helpers. - */ - -import { Database } from 'bun:sqlite'; -import { execSync } from 'node:child_process'; -import { existsSync, mkdtempSync, readdirSync, rmSync } from 'node:fs'; -import { tmpdir } from 'node:os'; -import { dirname, join } from 'node:path'; -import { fileURLToPath } from 'node:url'; - -const __filename = fileURLToPath(import.meta.url); -const __dirname = dirname(__filename); - -/** Path to the upulse CLI entrypoint (src/cli.ts). */ -const CLI_PATH = join(__dirname, '..', 'cli.ts'); - -export interface E2EContext { - /** Temp directory used as HOME for the test. */ - baseDir: string; - /** Path to .upulse inside baseDir. */ - upulseDir: string; - /** Path to scopes/ directory (new layout). */ - scopesDir: string; - /** Path to scopes/_system.db (events). */ - eventsDbPath: string; - /** Path to scopes/_vitals.db (vitals). */ - vitalsDbPath: string; - /** Path to objects/. */ - objectsDir: string; - /** Path to engine/. */ - engineDir: string; - /** Path to staging/. */ - stagingDir: string; -} - -/** - * Create an isolated E2E test environment with a fresh temp directory. - */ -export function createE2EContext(): E2EContext { - const baseDir = mkdtempSync(join(tmpdir(), 'pulse-e2e-')); - const upulseDir = join(baseDir, '.upulse'); - const scopesDir = join(upulseDir, 'scopes'); - return { - baseDir, - upulseDir, - scopesDir, - eventsDbPath: join(scopesDir, '_system.db'), - vitalsDbPath: join(scopesDir, '_vitals.db'), - objectsDir: join(upulseDir, 'objects'), - engineDir: join(upulseDir, 'engine'), - stagingDir: join(upulseDir, 'staging'), - }; -} - -/** - * Clean up an E2E test environment (remove temp directory). - */ -export function cleanupE2EContext(ctx: E2EContext): void { - try { - rmSync(ctx.baseDir, { recursive: true, force: true }); - } catch { - // best-effort cleanup; CI temp dirs are ephemeral anyway - } -} - -export interface RunOptions { - /** If true, expect the command to exit non-zero. Returns stderr. */ - expectFail?: boolean; - /** Timeout in ms (default: 120_000 for bun install during init). */ - timeout?: number; -} - -/** - * Run an upulse CLI command in the E2E context. - * - * - Sets HOME so `homedir()` → ctx.baseDir → config lands in ctx.baseDir/.upulse/ - * - Sets PULSE_BASE_DIR so engine's pulse.config.ts uses the right paths. - * - Uses `bun` to run the TypeScript CLI directly. - */ -export function runUpulse( - ctx: E2EContext, - args: string, - options?: RunOptions, -): string { - const timeout = options?.timeout ?? 120_000; - const env: Record<string, string> = { - ...(process.env as Record<string, string>), - HOME: ctx.baseDir, - PULSE_BASE_DIR: ctx.upulseDir, - }; - - try { - const result = execSync(`bun ${CLI_PATH} ${args}`, { - env, - cwd: ctx.baseDir, - timeout, - encoding: 'utf-8', - stdio: ['pipe', 'pipe', 'pipe'], - }); - - if (options?.expectFail) { - throw new Error( - `Expected command to fail but it succeeded: upulse ${args}\nstdout: ${result}`, - ); - } - return result; - } catch (err: unknown) { - if (options?.expectFail) { - const e = err as { stderr?: string; stdout?: string }; - return e.stderr || e.stdout || ''; - } - const e = err as { stdout?: string; stderr?: string; message?: string }; - throw new Error( - `upulse ${args} failed:\nstdout: ${e.stdout ?? ''}\nstderr: ${e.stderr ?? ''}\nmessage: ${e.message ?? ''}`, - ); - } -} - -/** - * Open events DB (scopes/_system.db) and run a query. - * Returns empty array if DB does not exist. - */ -export function queryEventsDb(ctx: E2EContext, sql: string): unknown[] { - if (!existsSync(ctx.eventsDbPath)) return []; - const db = new Database(ctx.eventsDbPath); - try { - return db.prepare(sql).all(); - } finally { - db.close(); - } -} - -/** - * Open vitals DB (scopes/_vitals.db) and run a query. - * Vitals are stored as events (kind='vital') in the events table. - * Returns empty array if DB does not exist. - */ -export function queryVitalsDb(ctx: E2EContext, sql: string): unknown[] { - if (!existsSync(ctx.vitalsDbPath)) return []; - const db = new Database(ctx.vitalsDbPath); - try { - return db.prepare(sql).all(); - } finally { - db.close(); - } -} - -/** - * List .json files in the objects/ directory. - */ -export function listObjects(ctx: E2EContext): string[] { - if (!existsSync(ctx.objectsDir)) return []; - return readdirSync(ctx.objectsDir).filter((f) => f.endsWith('.json')); -} diff --git a/packages/upulse/src/e2e/t1-init-daemon-tick.test.ts b/packages/upulse/src/e2e/t1-init-daemon-tick.test.ts deleted file mode 100644 index 35a133f..0000000 --- a/packages/upulse/src/e2e/t1-init-daemon-tick.test.ts +++ /dev/null @@ -1,144 +0,0 @@ -/** - * E2E T1 — Init → Daemon → Tick 完整链路测试 - * - * Validates the full lifecycle: - * upulse init → creates directories, git repo, bun install - * upulse tick → compiles engine, runs one tick, writes to DB - * upulse daemon status → reports daemon state - * - * Run: bun test src/e2e/t1-init-daemon-tick.test.ts - */ - -import { afterAll, beforeAll, describe, expect, it } from 'bun:test'; -import { execSync } from 'node:child_process'; -import { existsSync, readFileSync } from 'node:fs'; -import { join } from 'node:path'; -import { - cleanupE2EContext, - createE2EContext, - type E2EContext, - listObjects, - queryEventsDb, - runUpulse, -} from './helper.js'; - -describe('E2E T1: Init → Daemon → Tick', () => { - let ctx: E2EContext; - - // Use a single context for all tests in this suite because - // `upulse init` runs bun install which is slow (~15-30s). - // Tests are ordered: init → tick → daemon. - beforeAll(() => { - ctx = createE2EContext(); - }); - - afterAll(() => { - cleanupE2EContext(ctx); - }); - - // ── Init ───────────────────────────────────────────────────── - - it('upulse init creates .upulse directory', () => { - const output = runUpulse(ctx, 'init'); - expect(output.includes('initialized')).toBeTruthy(); - expect(existsSync(ctx.upulseDir)).toBeTruthy(); - }); - - it('init creates config.json', () => { - const cfgPath = join(ctx.upulseDir, 'config.json'); - expect(existsSync(cfgPath)).toBeTruthy(); - const cfg = JSON.parse(readFileSync(cfgPath, 'utf-8')); - expect(cfg.dir).toBe(ctx.upulseDir); - }); - - it('init creates complete engine directory structure', () => { - expect(existsSync(ctx.engineDir)).toBeTruthy(); - expect(existsSync(join(ctx.engineDir, 'rules'))).toBeTruthy(); - expect(existsSync(join(ctx.engineDir, 'executors'))).toBeTruthy(); - expect(existsSync(join(ctx.engineDir, 'AGENTS.md'))).toBeTruthy(); - expect(existsSync(join(ctx.engineDir, 'types.ts'))).toBeTruthy(); - expect(existsSync(join(ctx.engineDir, 'pulse.config.ts'))).toBeTruthy(); - expect(existsSync(join(ctx.engineDir, 'tsconfig.json'))).toBeTruthy(); - expect(existsSync(join(ctx.engineDir, '.gitignore'))).toBeTruthy(); - expect(existsSync(join(ctx.engineDir, 'package.json'))).toBeTruthy(); - }); - - it('init creates git repo with initial commit', () => { - expect(existsSync(join(ctx.engineDir, '.git'))).toBeTruthy(); - - // Verify there is at least one commit - const log = execSync('git log --oneline', { - cwd: ctx.engineDir, - encoding: 'utf-8', - }).trim(); - expect(log.length > 0).toBeTruthy(); - expect(log.includes('init')).toBeTruthy(); - }); - - it('init creates staging worktree', () => { - expect(existsSync(ctx.stagingDir)).toBeTruthy(); - }); - - it('init installs node_modules in engine', () => { - expect(existsSync(join(ctx.engineDir, 'node_modules'))).toBeTruthy(); - expect( - existsSync(join(ctx.engineDir, 'node_modules', '@uncaged', 'pulse')), - ).toBeTruthy(); - }); - - it('init is idempotent — running again fails gracefully', () => { - const output = runUpulse(ctx, 'init', { expectFail: true }); - expect( - output.includes('already initialized') || output.includes('config.json'), - ).toBeTruthy(); - }); - - // ── Tick ───────────────────────────────────────────────────── - - it('upulse tick --dry-run succeeds', () => { - const output = runUpulse(ctx, 'tick --dry-run'); - expect(output.includes('dry-run')).toBeTruthy(); - }); - - it('upulse tick runs and writes events to DB', () => { - const output = runUpulse(ctx, 'tick'); - - // tick should produce snapshot output - expect( - output.includes('Snapshot') || output.includes('Result'), - ).toBeTruthy(); - - // Verify scopes/_system.db has entries - expect(existsSync(ctx.eventsDbPath)).toBeTruthy(); - const events = queryEventsDb(ctx, 'SELECT * FROM events'); - expect(events.length > 0).toBeTruthy(); - }); - - it('tick creates objects', () => { - // After tick, objects/ should have at least one snapshot file - const objects = listObjects(ctx); - expect(objects.length > 0).toBeTruthy(); - }); - - it('tick records tick event', () => { - const tickEvents = queryEventsDb( - ctx, - "SELECT * FROM events WHERE kind = 'tick'", - ) as Array<{ - kind: string; - meta: string; - }>; - expect(tickEvents.length > 0).toBeTruthy(); - // tick event should have meta with tick_ms and effect_count - const meta = JSON.parse(tickEvents[tickEvents.length - 1].meta); - expect(typeof meta.tick_ms).toBe('number'); - expect(typeof meta.effect_count).toBe('number'); - }); - - // ── Daemon Status ──────────────────────────────────────────── - - it('upulse daemon status shows STOPPED when daemon is not running', () => { - const output = runUpulse(ctx, 'daemon status'); - expect(output.includes('STOPPED')).toBeTruthy(); - }); -}); diff --git a/packages/upulse/src/e2e/t2-staging-promote.test.ts b/packages/upulse/src/e2e/t2-staging-promote.test.ts deleted file mode 100644 index d420cbe..0000000 --- a/packages/upulse/src/e2e/t2-staging-promote.test.ts +++ /dev/null @@ -1,109 +0,0 @@ -/** - * E2E T2 — Staging → Promote 测试 - * - * 验证 staging 到 promote 的完整流程: - * upulse init → 初始化项目 - * 在 staging 里添加新 rule - * git add + commit - * upulse deploy promote → 将 staging 合并到 engine - * 验证 promote 成功并且新 rule 生效 - * - * Run: bun test src/e2e/t2-staging-promote.test.ts - */ - -import { afterAll, beforeAll, describe, expect, it } from 'bun:test'; -import { execSync } from 'node:child_process'; -import { existsSync, writeFileSync } from 'node:fs'; -import { join } from 'node:path'; -import { - cleanupE2EContext, - createE2EContext, - type E2EContext, - queryEventsDb, - runUpulse, -} from './helper.js'; - -describe('E2E T2: Staging → Promote', () => { - let ctx: E2EContext; - - beforeAll(() => { - ctx = createE2EContext(); - }); - - afterAll(() => { - cleanupE2EContext(ctx); - }); - - it('upulse init initializes project', () => { - const output = runUpulse(ctx, 'init'); - expect(output.includes('initialized')).toBeTruthy(); - expect(existsSync(ctx.upulseDir)).toBeTruthy(); - expect(existsSync(ctx.stagingDir)).toBeTruthy(); - expect(existsSync(ctx.engineDir)).toBeTruthy(); - }); - - it('add new rule to staging directory', () => { - const newRuleContent = `import type { Snapshot, Effect } from '../types.js'; -import type { Rule } from '@uncaged/pulse'; - -const rule: Rule<Snapshot, Effect> = async (_prev, _curr, inner) => { - const [effects, tickMs] = await inner(_prev, _curr); - return [[...effects, { kind: 'log', message: 'hello from v2' }], tickMs]; -}; - -export default rule;`; - - const ruleFilePath = join(ctx.stagingDir, 'rules', '02-log-hello.ts'); - writeFileSync(ruleFilePath, newRuleContent); - - expect(existsSync(ruleFilePath)).toBeTruthy(); - }); - - it('commit changes in staging', () => { - execSync('git add .', { cwd: ctx.stagingDir }); - execSync('git commit -m "Add 02-log-hello rule"', { cwd: ctx.stagingDir }); - - // Verify commit exists - const log = execSync('git log --oneline -n 1', { - cwd: ctx.stagingDir, - encoding: 'utf-8', - }).trim(); - expect(log.includes('02-log-hello')).toBeTruthy(); - }); - - it('upulse deploy promote succeeds', async () => { - const output = runUpulse(ctx, 'deploy promote', { timeout: 30_000 }); - expect(!output.includes('error') && !output.includes('Error')).toBeTruthy(); - }, 30_000); - - it('promote creates promote event in events.db', () => { - const promoteEvents = queryEventsDb( - ctx, - "SELECT * FROM events WHERE kind = 'promote'", - ) as Array<{ - kind: string; - code_rev: string; - }>; - expect(promoteEvents.length > 0).toBeTruthy(); - - const latestPromote = promoteEvents[promoteEvents.length - 1]; - expect(latestPromote.code_rev).toBeTruthy(); - }); - - it('new rule is merged to engine directory', () => { - const mergedRulePath = join(ctx.engineDir, 'rules', '02-log-hello.ts'); - expect(existsSync(mergedRulePath)).toBeTruthy(); - }); - - it('upulse tick produces effect from new rule', () => { - const output = runUpulse(ctx, 'tick'); - expect( - output.includes('Snapshot') || output.includes('Result'), - ).toBeTruthy(); - - // The new rule should produce a log effect visible in tick output - expect( - output.includes('log') || output.includes('hello from v2'), - ).toBeTruthy(); - }); -}); diff --git a/packages/upulse/src/e2e/t3-promote-guard.test.ts b/packages/upulse/src/e2e/t3-promote-guard.test.ts deleted file mode 100644 index 6dc637c..0000000 --- a/packages/upulse/src/e2e/t3-promote-guard.test.ts +++ /dev/null @@ -1,125 +0,0 @@ -/** - * E2E T3 — Promote Guard 测试(编译失败阻止 promote) - * - * 验证 promote 的编译守门机制: - * upulse init → 初始化项目 - * 在 staging 里添加编译错误的 rule - * git add + commit - * upulse deploy promote → 期望失败(编译不通过) - * 验证 promote 被阻止且 engine 目录未被污染 - * - * Run: bun test src/e2e/t3-promote-guard.test.ts - */ - -import { afterAll, beforeAll, describe, expect, it } from 'bun:test'; -import { execSync } from 'node:child_process'; -import { existsSync, writeFileSync } from 'node:fs'; -import { join } from 'node:path'; -import { - cleanupE2EContext, - createE2EContext, - type E2EContext, - queryEventsDb, - runUpulse, -} from './helper.js'; - -describe('E2E T3: Promote Guard (Compilation Failed)', () => { - let ctx: E2EContext; - - beforeAll(() => { - ctx = createE2EContext(); - }); - - afterAll(() => { - cleanupE2EContext(ctx); - }); - - it('upulse init initializes project', () => { - const output = runUpulse(ctx, 'init'); - expect( - output.includes('initialized'), - `init output should mention initialized, got:\n${output}`, - ).toBeTruthy(); - expect(existsSync(ctx.upulseDir), '.upulse/ should exist').toBeTruthy(); - expect(existsSync(ctx.stagingDir), 'staging/ should exist').toBeTruthy(); - expect(existsSync(ctx.engineDir), 'engine/ should exist').toBeTruthy(); - }); - - it('add bad rule with compilation error to staging', () => { - const badRuleContent = `// 语法错误:缺少类型 -const bad: NONEXISTENT_TYPE = 42; -export default bad;`; - - const badRuleFilePath = join(ctx.stagingDir, 'rules', '99-bad-rule.ts'); - writeFileSync(badRuleFilePath, badRuleContent); - - expect( - existsSync(badRuleFilePath), - '99-bad-rule.ts should be written to staging/rules/', - ).toBeTruthy(); - }); - - it('commit bad rule in staging', () => { - execSync('git add .', { cwd: ctx.stagingDir }); - execSync('git commit -m "Add bad rule with compilation error"', { - cwd: ctx.stagingDir, - }); - - // Verify commit exists - const log = execSync('git log --oneline -n 1', { - cwd: ctx.stagingDir, - encoding: 'utf-8', - }).trim(); - expect( - log.includes('bad rule'), - 'commit should mention the bad rule', - ).toBeTruthy(); - }); - - it('upulse deploy promote fails due to compilation error', () => { - const output = runUpulse(ctx, 'deploy promote', { expectFail: true }); - - // Check that output contains type-check-related error messages - expect( - output.includes('Type check') || - output.includes('tsc') || - output.includes('type check failed'), - ).toBeTruthy(); - }); - - it('no promote event created in events.db after failed promote', () => { - const promoteEvents = queryEventsDb( - ctx, - "SELECT * FROM events WHERE kind = 'promote'", - ); - expect(promoteEvents.length).toBe(0); - }); - - it('bad rule file not merged to engine directory', () => { - const badRulePath = join(ctx.engineDir, 'rules', '99-bad-rule.ts'); - expect( - !existsSync(badRulePath), - 'bad rule should not exist in engine/rules/ after failed promote', - ).toBeTruthy(); - }); - - it('engine directory remains clean after failed promote', () => { - // Verify engine still has only the original files (no contamination from failed promote) - const engineRulesPath = join( - ctx.engineDir, - 'rules', - '01-collect-system.ts', - ); - expect( - existsSync(engineRulesPath), - 'original rules should still exist in engine', - ).toBeTruthy(); - - // But the bad rule should not be there - const badRulePath = join(ctx.engineDir, 'rules', '99-bad-rule.ts'); - expect( - !existsSync(badRulePath), - 'bad rule should not contaminate engine directory', - ).toBeTruthy(); - }); -}); diff --git a/packages/upulse/src/e2e/t4-rollback.test.ts b/packages/upulse/src/e2e/t4-rollback.test.ts deleted file mode 100644 index 331f60d..0000000 --- a/packages/upulse/src/e2e/t4-rollback.test.ts +++ /dev/null @@ -1,308 +0,0 @@ -/** - * E2E T4 — Rollback: append-only + epoch 正确 - * - * Validates the full rollback lifecycle: - * init → promote v1 → tick (v1) → promote v2 → tick (v2) → rollback → verify - * - * Key assertions: - * - rollback event is written correctly (code_rev, meta.from, meta.to) - * - append-only: event count is monotonically increasing - * - v2 events still exist after rollback (not deleted) - * - engine code reverts after rollback - * - rollback with no previous version fails gracefully - * - * Run: bun test src/e2e/t4-rollback.test.ts - */ - -import { - afterAll, - beforeAll, - describe, - expect, - it, - setDefaultTimeout, -} from 'bun:test'; - -setDefaultTimeout(30_000); - -import { execSync } from 'node:child_process'; -import { existsSync, mkdirSync, writeFileSync } from 'node:fs'; -import { join } from 'node:path'; -import { - cleanupE2EContext, - createE2EContext, - type E2EContext, - queryEventsDb, - runUpulse, -} from './helper.js'; - -// ── Helpers ──────────────────────────────────────────────────── - -/** Count events in the DB. */ -function eventCount(ctx: E2EContext): number { - const rows = queryEventsDb( - ctx, - 'SELECT COUNT(*) as cnt FROM events', - ) as Array<{ cnt: number }>; - return rows.length > 0 ? rows[0]?.cnt : 0; -} - -/** - * Add a rule file to staging and commit. - * Uses targeted `git add` to avoid committing the node_modules symlink - * (git .gitignore pattern `node_modules/` does not match symlinks). - */ -function addRuleToStaging( - ctx: E2EContext, - filename: string, - commitMsg: string, -): void { - const rulesDir = join(ctx.stagingDir, 'rules'); - if (!existsSync(rulesDir)) mkdirSync(rulesDir, { recursive: true }); - - const rulePath = join(rulesDir, filename); - writeFileSync( - rulePath, - `import type { Rule } from '@uncaged/pulse'; -import type { Snapshot, Effect } from '../types.js'; - -/** Auto-generated test rule: ${filename} */ -const rule: Rule<Snapshot, Effect> = async (_prev, _curr, inner) => inner(_prev, _curr); -export default rule; -`, - 'utf-8', - ); - - execSync(`git add rules/${filename} && git commit -m "${commitMsg}"`, { - cwd: ctx.stagingDir, - encoding: 'utf-8', - stdio: ['pipe', 'pipe', 'pipe'], - }); -} - -// ── Tests ────────────────────────────────────────────────────── - -describe('E2E T4: Rollback', () => { - let ctx: E2EContext; - - // Track code_revs and event counts at each stage for verification. - let v1CodeRev: string; - let v2CodeRev: string; - const eventCounts: number[] = []; - - // Single context — init + bun install is expensive (~15-30s). - // Tests are ordered and build on each other. - beforeAll(() => { - ctx = createE2EContext(); - - // 1. Init - runUpulse(ctx, 'init'); - runUpulse(ctx, 'tick'); - - // 2. Promote v1 (first promote establishes a version baseline) - addRuleToStaging(ctx, '98-v1.ts', 'add v1 rule'); - runUpulse(ctx, 'deploy promote'); - eventCounts.push(eventCount(ctx)); - - // Record v1 code_rev - const v1Promotes = queryEventsDb( - ctx, - "SELECT * FROM events WHERE kind = 'promote' ORDER BY occurred_at ASC", - ) as Array<{ code_rev: string }>; - v1CodeRev = v1Promotes[v1Promotes.length - 1]?.code_rev; - - // 3. Tick under v1 - runUpulse(ctx, 'tick'); - runUpulse(ctx, 'tick'); - eventCounts.push(eventCount(ctx)); - - // 4. Promote v2 - addRuleToStaging(ctx, '99-v2.ts', 'add v2 rule'); - runUpulse(ctx, 'deploy promote'); - eventCounts.push(eventCount(ctx)); - - // Record v2 code_rev - const v2Promotes = queryEventsDb( - ctx, - "SELECT * FROM events WHERE kind = 'promote' ORDER BY occurred_at DESC LIMIT 1", - ) as Array<{ code_rev: string }>; - v2CodeRev = v2Promotes[0]?.code_rev; - - // 5. Tick under v2 - runUpulse(ctx, 'tick'); - runUpulse(ctx, 'tick'); - eventCounts.push(eventCount(ctx)); - }); - - afterAll(() => { - cleanupE2EContext(ctx); - }); - - // ── Pre-rollback Sanity ────────────────────────────────────── - - it('two promotes created distinct code_revs', () => { - expect(v1CodeRev).toBeTruthy(); - expect(v2CodeRev).toBeTruthy(); - expect(v1CodeRev).not.toBe(v2CodeRev); - }); - - it('v2 rule (99-v2.ts) exists in engine before rollback', () => { - const v2RulePath = join(ctx.engineDir, 'rules', '99-v2.ts'); - expect( - existsSync(v2RulePath), - 'engine should have 99-v2.ts after v2 promote', - ).toBeTruthy(); - }); - - it('both promote events are in the database', () => { - const promotes = queryEventsDb( - ctx, - "SELECT * FROM events WHERE kind = 'promote'", - ) as Array<{ - code_rev: string; - meta: string; - }>; - expect( - promotes.length >= 2, - `should have at least 2 promote events, found ${promotes.length}`, - ).toBeTruthy(); - }); - - // ── Rollback ───────────────────────────────────────────────── - - it('rollback succeeds', () => { - const countBefore = eventCount(ctx); - - const output = runUpulse(ctx, 'deploy rollback'); - expect( - output.includes('Rollback complete') || - output.includes('rollback event written'), - ).toBeTruthy(); - - // Event count must increase (append-only) - const countAfter = eventCount(ctx); - expect(countAfter > countBefore).toBeTruthy(); - eventCounts.push(countAfter); - }); - - // ── Rollback Event Content ────────────────────────────────── - - it('rollback event has correct code_rev and meta', () => { - const rollbackEvent = ( - queryEventsDb( - ctx, - "SELECT * FROM events WHERE kind = 'rollback' ORDER BY occurred_at DESC LIMIT 1", - ) as Array<{ code_rev: string; meta: string }> - )[0]; - expect(rollbackEvent).toBeTruthy(); - - // code_rev should point to the target (v1) - expect(rollbackEvent.code_rev).toBe(v1CodeRev); - - // meta should have from + to - const meta = JSON.parse(rollbackEvent.meta) as { from: string; to: string }; - expect(meta.from).toBe(v2CodeRev); - expect(meta.to).toBe(v1CodeRev); - expect(meta.from).not.toBe(meta.to); - }); - - // ── Append-Only Invariant ─────────────────────────────────── - - it('append-only: event count is monotonically increasing', () => { - expect(eventCounts.length >= 5).toBeTruthy(); - for (let i = 1; i < eventCounts.length; i++) { - expect(eventCounts[i]! >= eventCounts[i - 1]!).toBeTruthy(); - } - }); - - it('all event IDs are ordered', () => { - const allEvents = queryEventsDb( - ctx, - 'SELECT id FROM events ORDER BY id ASC', - ) as Array<{ id: number }>; - expect(allEvents.length > 0).toBeTruthy(); - for (let i = 1; i < allEvents.length; i++) { - expect(allEvents[i]?.id > allEvents[i - 1]?.id).toBeTruthy(); - } - }); - - // ── v2 Events Preservation ────────────────────────────────── - - it('v2 promote event still exists after rollback', () => { - const v2Promotes = queryEventsDb( - ctx, - `SELECT * FROM events WHERE kind = 'promote' AND code_rev = '${v2CodeRev}'`, - ) as Array<{ code_rev: string }>; - expect(v2Promotes.length > 0).toBeTruthy(); - }); - - it('v2 events are not deleted (append-only)', () => { - // All events that were in the DB before rollback should still be there. - // We check that at least the promote events for both versions exist. - const allPromotes = queryEventsDb( - ctx, - "SELECT * FROM events WHERE kind = 'promote'", - ) as Array<{ - code_rev: string; - }>; - const codeRevs = new Set(allPromotes.map((p) => p.code_rev)); - expect(codeRevs.has(v1CodeRev)).toBeTruthy(); - expect(codeRevs.has(v2CodeRev)).toBeTruthy(); - }); - - // ── Engine Code Revert ────────────────────────────────────── - - it('engine code reverts after rollback (99-v2.ts removed)', () => { - const v2RulePath = join(ctx.engineDir, 'rules', '99-v2.ts'); - expect(!existsSync(v2RulePath)).toBeTruthy(); - }); - - it('v1 rule (98-v1.ts) still exists in engine after rollback', () => { - const v1RulePath = join(ctx.engineDir, 'rules', '98-v1.ts'); - expect(existsSync(v1RulePath)).toBeTruthy(); - }); - - it('engine git log shows revert commit', () => { - const log = execSync('git log --oneline -5', { - cwd: ctx.engineDir, - encoding: 'utf-8', - }).trim(); - expect(log.toLowerCase().includes('revert')).toBeTruthy(); - }); -}); - -// ── Boundary: Rollback with no previous version ──────────────── - -describe('E2E T4: Rollback boundary — no previous version', () => { - let ctx: E2EContext; - - beforeAll(() => { - ctx = createE2EContext(); - runUpulse(ctx, 'init'); - runUpulse(ctx, 'tick'); - }); - - afterAll(() => { - cleanupE2EContext(ctx); - }); - - it('rollback with no promote events fails gracefully', () => { - const output = runUpulse(ctx, 'deploy rollback', { expectFail: true }); - expect( - output.includes('no promote event') || - output.includes('Nothing to roll back') || - output.includes('Error'), - ).toBeTruthy(); - }); - - it('rollback with only one promote (no previous version) fails gracefully', () => { - addRuleToStaging(ctx, '99-test.ts', 'add test rule'); - runUpulse(ctx, 'deploy promote'); - - // Now try to rollback — should fail because there's no older version - const output = runUpulse(ctx, 'deploy rollback', { expectFail: true }); - expect( - output.includes('no previous version') || output.includes('Error'), - ).toBeTruthy(); - }); -}); diff --git a/packages/upulse/src/e2e/t5-migrate.test.ts b/packages/upulse/src/e2e/t5-migrate.test.ts deleted file mode 100644 index 241f400..0000000 --- a/packages/upulse/src/e2e/t5-migrate.test.ts +++ /dev/null @@ -1,291 +0,0 @@ -/** - * E2E T5 — Migrate 链式迁移测试 - * - * 验证数据格式变更时的迁移机制: - * upulse init → 初始化项目 - * upulse tick → 跑一次,产生 v1 数据 - * 修改 types.ts 改变 SystemSense 格式 - * 添加 migrate.ts 处理 v1 → v2 数据迁移 - * 更新 01-collect-system.ts 适配新格式 - * upulse deploy promote → 部署新版本 - * 验证 migrate 事件和新格式数据 - * - * Run: bun test src/e2e/t5-migrate.test.ts - */ - -import { afterAll, beforeAll, describe, expect, it } from 'bun:test'; -import { execSync } from 'node:child_process'; -import { existsSync, readFileSync, writeFileSync } from 'node:fs'; -import { join } from 'node:path'; -import { - cleanupE2EContext, - createE2EContext, - type E2EContext, - queryEventsDb, - runUpulse, -} from './helper.js'; - -describe('E2E T5: Migrate Chain Migration', () => { - let ctx: E2EContext; - - beforeAll(() => { - ctx = createE2EContext(); - }); - - afterAll(() => { - cleanupE2EContext(ctx); - }); - - it('upulse init initializes project', () => { - const output = runUpulse(ctx, 'init'); - expect(output.includes('initialized')).toBeTruthy(); - expect(existsSync(ctx.upulseDir)).toBeTruthy(); - expect(existsSync(ctx.stagingDir)).toBeTruthy(); - expect(existsSync(ctx.engineDir)).toBeTruthy(); - }); - - it('upulse tick produces v1 data', () => { - const output = runUpulse(ctx, 'tick'); - expect( - output.includes('Snapshot') || output.includes('Result'), - ).toBeTruthy(); - - // Seed v1 system data into _vitals scope + objects - const v1Data = { memoryPct: 42.5, cpuIdlePct: 87.3 }; - const v1Hash = require('node:crypto') - .createHash('sha256') - .update(JSON.stringify(v1Data)) - .digest('hex') - .slice(0, 32); - const { mkdirSync, writeFileSync: wfs } = require('node:fs'); - mkdirSync(ctx.objectsDir, { recursive: true }); - wfs(join(ctx.objectsDir, `${v1Hash}.json`), JSON.stringify(v1Data)); - - // Insert vital event into scopes/_vitals.db (events table, kind='vital') - const Database = require('bun:sqlite').default; - const vitalsDb = new Database(ctx.vitalsDbPath); - vitalsDb.exec(` - CREATE TABLE IF NOT EXISTS events ( - id TEXT PRIMARY KEY, - occurred_at INTEGER NOT NULL, - kind TEXT NOT NULL, - key TEXT, - hash TEXT, - code_rev TEXT, - meta TEXT - ) - `); - vitalsDb.exec(` - INSERT INTO events (id, occurred_at, kind, key, hash, code_rev, meta) - VALUES ('test-vital-01', ${Date.now()}, 'vital', 'system', '${v1Hash}', NULL, NULL) - `); - vitalsDb.close(); - - // Insert collect event into scopes/_system.db - const eventsDb = new Database(ctx.eventsDbPath); - eventsDb.exec(` - INSERT INTO events (id, occurred_at, kind, key, hash, code_rev, meta) - VALUES (99999, ${Date.now()}, 'collect', 'system', '${v1Hash}', '', '{}') - `); - eventsDb.close(); - }); - - it('modify types.ts to v2 format in staging', () => { - const newTypesContent = `import type { Sensed } from '@uncaged/pulse'; - -// ── Sense Types ──────────────────────────────────────────────── -// Each collector returns its own sense type. - -export interface SystemSense { - memory: { usedPct: number; totalMb: number }; // v2 格式 - cpuIdlePct: number; -} - -// ── Snapshot ─────────────────────────────────────────────────── -// Rebuilt from events table. Each sense is Sensed<T> = { data, refreshedAt }. -// refreshedAt = the collect event's occurred_at timestamp. - -export interface Snapshot { - timestamp: number; - system: Sensed<SystemSense>; - // Agent 按需添加: - // ograph: Sensed<OGraphSense>; - // executors: Sensed<ExecutorsSense>; -} - -// ── Effects ──────────────────────────────────────────────────── - -export type Effect = - | { kind: 'collect'; key: string } // 触发采集(由 Rule 产生) - | { kind: 'log'; message: string } - // 按需添加: - // | { kind: 'dispatch'; target: string; payload: unknown } - // | { kind: 'notify'; message: string } -`; - - const typesFilePath = join(ctx.stagingDir, 'types.ts'); - writeFileSync(typesFilePath, newTypesContent); - - expect(existsSync(typesFilePath)).toBeTruthy(); - }); - - it('create migrate.ts in staging/rules/', () => { - const migrateContent = `export function migrate(snapshot: Record<string, any>): Record<string, any> { - const system = snapshot.system?.data; - if (system && 'memoryPct' in system) { - return { - ...snapshot, - system: { - data: { - memory: { usedPct: system.memoryPct, totalMb: 2048 }, - cpuIdlePct: system.cpuIdlePct, - }, - refreshedAt: snapshot.system.refreshedAt, - }, - }; - } - return snapshot; -}`; - - const migrateFilePath = join(ctx.stagingDir, 'rules', 'migrate.ts'); - writeFileSync(migrateFilePath, migrateContent); - - expect(existsSync(migrateFilePath)).toBeTruthy(); - }); - - it('update 01-collect-system.ts to adapt new format in staging', () => { - const newSystemRuleContent = `import type { Snapshot, Effect } from '../types.js'; -import type { Rule } from '@uncaged/pulse'; - -/** - * Trigger system collection every 15 seconds. - * If system sense is stale (refreshedAt > 15s ago), emit collect effect. - * Updated for v2 format. - */ -const collectSystemRule: Rule<Snapshot, Effect> = async (_prev, curr, inner) => { - const [effects, tickMs] = await inner(_prev, curr); - const refreshedAt = curr.system?.refreshedAt ?? 0; - if (Date.now() - refreshedAt > 15000) { - return [[...effects, { kind: 'collect', key: 'system' }], tickMs]; - } - return [effects, tickMs]; -}; - -export default collectSystemRule; -`; - - const systemRuleFilePath = join( - ctx.stagingDir, - 'rules', - '01-collect-system.ts', - ); - writeFileSync(systemRuleFilePath, newSystemRuleContent); - - expect(existsSync(systemRuleFilePath)).toBeTruthy(); - }); - - it('update system executor for v2 format in staging', () => { - const newExecutorContent = `import type { SystemSense } from '../types.js'; -import { freemem, totalmem, cpus } from 'node:os'; - -/** - * Collect system stats in v2 format. - * Called by execute() when it receives a { kind: 'collect', key: 'system' } effect. - * Returns data; execute() writes to events/vitals. - */ -export async function collectSystem(): Promise<SystemSense> { - const memoryUsedBytes = totalmem() - freemem(); - const memoryTotalMb = Math.round(totalmem() / (1024 * 1024)); - const usedPct = Math.round((memoryUsedBytes / totalmem()) * 100); - - const cores = cpus(); - let totalIdle = 0; - let totalTick = 0; - for (const core of cores) { - for (const type in core.times) { - totalTick += (core.times as Record<string, number>)[type]!; - } - totalIdle += core.times.idle; - } - const cpuIdlePct = Math.round((totalIdle / totalTick) * 100); - - return { - memory: { usedPct, totalMb: memoryTotalMb }, - cpuIdlePct - }; -}`; - - const executorFilePath = join(ctx.stagingDir, 'executors', 'system.ts'); - writeFileSync(executorFilePath, newExecutorContent); - - expect(existsSync(executorFilePath)).toBeTruthy(); - }); - - it('commit changes in staging', () => { - execSync('git add .', { cwd: ctx.stagingDir }); - execSync('git commit -m "Migrate to v2 SystemSense format"', { - cwd: ctx.stagingDir, - }); - - // Verify commit exists - const log = execSync('git log --oneline -n 1', { - cwd: ctx.stagingDir, - encoding: 'utf-8', - }).trim(); - expect(log.includes('v2')).toBeTruthy(); - }); - - it('upulse deploy promote succeeds', () => { - const output = runUpulse(ctx, 'deploy promote'); - expect(!output.includes('error') && !output.includes('Error')).toBeTruthy(); - }, 30_000); - - it('migrate event created in events.db', () => { - const migrateEvents = queryEventsDb( - ctx, - "SELECT * FROM events WHERE kind = 'migrate' AND key = 'system'", - ) as Array<{ - kind: string; - key: string; - hash: string; - }>; - expect(migrateEvents.length > 0).toBeTruthy(); - - const latestMigrate = migrateEvents[migrateEvents.length - 1]; - expect(latestMigrate.hash).toBeTruthy(); - }); - - it('migrated object contains new format data', () => { - const migrateEvents = queryEventsDb( - ctx, - "SELECT * FROM events WHERE kind = 'migrate' AND key = 'system'", - ) as Array<{ - hash: string; - }>; - expect(migrateEvents.length > 0).toBeTruthy(); - - const migrateHash = migrateEvents[migrateEvents.length - 1].hash; - const objectPath = join(ctx.objectsDir, `${migrateHash}.json`); - - expect(existsSync(objectPath)).toBeTruthy(); - - const objectData = JSON.parse(readFileSync(objectPath, 'utf-8')); - // The stored object is the sense value (data.data), not the full snapshot wrapper - expect(objectData.memory?.usedPct !== undefined).toBeTruthy(); - expect(typeof objectData.memory.usedPct === 'number').toBeTruthy(); - }); - - it('promote event created after migration', () => { - const promoteEvents = queryEventsDb( - ctx, - "SELECT * FROM events WHERE kind = 'promote'", - ) as Array<{ - kind: string; - code_rev: string; - }>; - expect(promoteEvents.length > 0).toBeTruthy(); - - const latestPromote = promoteEvents[promoteEvents.length - 1]; - expect(latestPromote.code_rev).toBeTruthy(); - }); -}); diff --git a/packages/upulse/src/e2e/t6-dir-flag.test.ts b/packages/upulse/src/e2e/t6-dir-flag.test.ts deleted file mode 100644 index a306cea..0000000 --- a/packages/upulse/src/e2e/t6-dir-flag.test.ts +++ /dev/null @@ -1,84 +0,0 @@ -/** - * E2E: --dir / -d flag integration test - * - * Validates that the global --dir flag correctly targets a specific pulse instance. - * Run: bun test src/e2e/t6-dir-flag.test.ts - */ - -import { afterAll, beforeAll, describe, expect, it } from 'bun:test'; -import { execSync } from 'node:child_process'; -import { existsSync } from 'node:fs'; -import { dirname, join } from 'node:path'; -import { fileURLToPath } from 'node:url'; -import { - cleanupE2EContext, - createE2EContext, - type E2EContext, -} from './helper.js'; - -const __dirname = dirname(fileURLToPath(import.meta.url)); -const CLI_PATH = join(__dirname, '..', 'cli.ts'); - -function runCli(args: string, env?: Record<string, string>): string { - return execSync(`bun ${CLI_PATH} ${args}`, { - encoding: 'utf-8', - timeout: 120_000, - env: { ...process.env, ...env }, - stdio: ['pipe', 'pipe', 'pipe'], - }); -} - -function runCliFail(args: string, env?: Record<string, string>): string { - try { - runCli(args, env); - throw new Error(`Expected command to fail: ${args}`); - } catch (err: unknown) { - const e = err as { stderr?: string; stdout?: string }; - return e.stderr || e.stdout || ''; - } -} - -describe('E2E: --dir / -d flag', () => { - let ctx: E2EContext; - - beforeAll(() => { - ctx = createE2EContext(); - // Init using --dir flag (not env var) - runCli(`--dir ${ctx.upulseDir} init`); - }); - - afterAll(() => { - cleanupE2EContext(ctx); - }); - - it('--dir flag initializes in the specified directory', () => { - expect(existsSync(ctx.upulseDir)).toBe(true); - expect(existsSync(join(ctx.upulseDir, 'config.json'))).toBe(true); - expect(existsSync(join(ctx.upulseDir, 'engine'))).toBe(true); - }); - - it('-d short flag works the same as --dir', () => { - const output = runCli(`-d ${ctx.upulseDir} daemon status`); - expect(output).toContain('STOPPED'); - }); - - it('--dir flag overrides PULSE_BASE_DIR env', () => { - // Set env to a nonexistent path, but --dir points to valid instance - const output = runCli(`--dir ${ctx.upulseDir} daemon status`, { - PULSE_BASE_DIR: '/tmp/this-does-not-exist', - }); - expect(output).toContain('STOPPED'); - }); - - it('PULSE_BASE_DIR env works when no --dir flag', () => { - const output = runCli('daemon status', { - PULSE_BASE_DIR: ctx.upulseDir, - }); - expect(output).toContain('STOPPED'); - }); - - it('nonexistent --dir fails gracefully', () => { - const output = runCliFail('--dir /tmp/pulse-nonexistent-dir daemon status'); - expect(output).toContain('config not found'); - }); -}); diff --git a/packages/upulse/src/init.ts b/packages/upulse/src/init.ts index a47cde6..b9cf734 100644 --- a/packages/upulse/src/init.ts +++ b/packages/upulse/src/init.ts @@ -135,14 +135,15 @@ export function initUpulse(dir?: string): void { console.log(`\nEngine: ${config.engine.path}`); console.log(`Staging: ${config.staging.path}`); if (process.platform === 'linux') { - console.log('\nSystemd service: pulse.service (enabled, not started)'); - console.log(' systemctl --user start pulse.service # start daemon'); - console.log(' systemctl --user status pulse.service # check status'); - console.log(' journalctl --user -u pulse.service -f # follow logs'); + console.log('\nSystemd user units: pulse.service, pulse-workflow.service'); + console.log(' systemctl --user start pulse.service # engine daemon'); + console.log(' systemctl --user start pulse-workflow.service # workflow daemon (if installed)'); + console.log(' systemctl --user status pulse.service # check engine'); + console.log(' upulse status # workflow DB + units'); } - console.log('\nManual:'); - console.log(' upulse daemon start --foreground # start in foreground'); - console.log(' upulse tick --dry-run # manual dry-run tick'); + console.log('\nCLI:'); + console.log(' upulse workflow submit <wf> <topicKey> <file> # enqueue workflow task'); + console.log(' upulse inspect events --limit 5 # recent workflow events'); } // ── Helpers ──────────────────────────────────────────────────── @@ -267,10 +268,11 @@ type Rule<S, E> = (prev: S, curr: S) ## 构建和测试 \`\`\`bash -bun run typecheck # tsc --noEmit 类型检查 -upulse dev test # 历史 snapshot 回放 dry-run -upulse tick --dry-run # 手动触发一次(不执行 effect) +bun run typecheck # tsc --noEmit 类型检查 +upulse status # workflow daemon / DB 概览(可选) +upulse inspect events --limit 5 # 最近 workflow 事件 \`\`\` +Linux 上引擎由 systemd 用户单元驱动:\`systemctl --user start pulse.service\`。 ## ⚠️ 红线 - **不要修改 pulse.config.ts 的 runPulse 调用签名** diff --git a/packages/upulse/src/migrate.test.ts b/packages/upulse/src/migrate.test.ts deleted file mode 100644 index 5772092..0000000 --- a/packages/upulse/src/migrate.test.ts +++ /dev/null @@ -1,184 +0,0 @@ -/** - * migrate.test.ts — Tests for legacy → scoped data migration. - */ - -import { Database } from 'bun:sqlite'; -import { afterEach, beforeEach, describe, expect, it } from 'bun:test'; -import { - existsSync, - mkdirSync, - mkdtempSync, - readFileSync, - rmSync, - writeFileSync, -} from 'node:fs'; -import { tmpdir } from 'node:os'; -import { join } from 'node:path'; -import type { UpulseConfig } from './config.js'; -import { migrateToScoped } from './migrate.js'; - -function makeConfig(dir: string): UpulseConfig { - return { - dir, - engine: { path: join(dir, 'engine'), entrypoint: 'pulse.config.ts' }, - staging: { path: join(dir, 'staging') }, - daemon: { - pidFile: join(dir, 'daemon.pid'), - logFile: join(dir, 'daemon.log'), - }, - store: { - scopesDir: join(dir, 'scopes'), - objectsDir: join(dir, 'objects'), - }, - }; -} - -function createLegacyEventsDb(path: string): void { - const db = new Database(path, { create: true }); - db.exec('PRAGMA journal_mode = WAL'); - db.exec(` - CREATE TABLE IF NOT EXISTS events ( - id TEXT PRIMARY KEY, - occurred_at INTEGER NOT NULL, - kind TEXT NOT NULL, - key TEXT, - hash TEXT, - code_rev TEXT, - meta TEXT - ) - `); - db.exec(` - INSERT INTO events (id, occurred_at, kind, key, hash, code_rev, meta) - VALUES ('evt-001', 1000, 'tick', NULL, NULL, NULL, '{"tick_ms":15000}') - `); - db.close(); -} - -function createLegacyVitalsDb(path: string): void { - const db = new Database(path, { create: true }); - db.exec('PRAGMA journal_mode = WAL'); - db.exec(` - CREATE TABLE IF NOT EXISTS vitals ( - id TEXT PRIMARY KEY, - occurred_at INTEGER NOT NULL, - key TEXT NOT NULL, - hash TEXT, - meta TEXT - ) - `); - db.exec(` - INSERT INTO vitals (id, occurred_at, key, hash, meta) - VALUES ('vit-001', 1000, 'system', 'abc123', '{}') - `); - db.close(); -} - -describe('migrateToScoped', () => { - let tmpDir: string; - - beforeEach(() => { - tmpDir = mkdtempSync(join(tmpdir(), 'pulse-migrate-test-')); - mkdirSync(tmpDir, { recursive: true }); - }); - - afterEach(() => { - rmSync(tmpDir, { recursive: true, force: true }); - }); - - it('migrates events.db → scopes/_system.db', () => { - createLegacyEventsDb(join(tmpDir, 'events.db')); - const config = makeConfig(tmpDir); - - migrateToScoped(config); - - const systemDbPath = join(tmpDir, 'scopes', '_system.db'); - expect(existsSync(systemDbPath)).toBe(true); - - const db = new Database(systemDbPath); - const rows = db.prepare('SELECT * FROM events').all() as Array<{ - id: string; - }>; - db.close(); - expect(rows.length).toBe(1); - expect(rows[0].id).toBe('evt-001'); - }); - - it('migrates vitals.db → scopes/_vitals.db as events', () => { - createLegacyEventsDb(join(tmpDir, 'events.db')); - createLegacyVitalsDb(join(tmpDir, 'vitals.db')); - const config = makeConfig(tmpDir); - - migrateToScoped(config); - - const vitalsDbPath = join(tmpDir, 'scopes', '_vitals.db'); - expect(existsSync(vitalsDbPath)).toBe(true); - - const db = new Database(vitalsDbPath); - const rows = db - .prepare('SELECT * FROM events WHERE kind = ?') - .all('vital') as Array<{ - id: string; - kind: string; - key: string; - hash: string | null; - }>; - db.close(); - expect(rows.length).toBe(1); - expect(rows[0].kind).toBe('vital'); - expect(rows[0].key).toBe('system'); - expect(rows[0].hash).toBe('abc123'); - }); - - it('preserves old files as backups', () => { - const legacyEventsPath = join(tmpDir, 'events.db'); - const legacyVitalsPath = join(tmpDir, 'vitals.db'); - createLegacyEventsDb(legacyEventsPath); - createLegacyVitalsDb(legacyVitalsPath); - const config = makeConfig(tmpDir); - - migrateToScoped(config); - - expect(existsSync(legacyEventsPath)).toBe(true); - expect(existsSync(legacyVitalsPath)).toBe(true); - }); - - it('repeated migration is a no-op', () => { - createLegacyEventsDb(join(tmpDir, 'events.db')); - const config = makeConfig(tmpDir); - - migrateToScoped(config); - const systemDbPath = join(tmpDir, 'scopes', '_system.db'); - - // Record modification time - const stat1 = Bun.file(systemDbPath).lastModified; - - // Migrate again — should be a no-op - migrateToScoped(config); - - const stat2 = Bun.file(systemDbPath).lastModified; - expect(stat2).toBe(stat1); - }); - - it('skips when no legacy files exist', () => { - const config = makeConfig(tmpDir); - - migrateToScoped(config); - - expect(existsSync(join(tmpDir, 'scopes'))).toBe(false); - }); - - it('copies WAL and SHM files if they exist', () => { - createLegacyEventsDb(join(tmpDir, 'events.db')); - writeFileSync(join(tmpDir, 'events.db-wal'), 'wal-data'); - writeFileSync(join(tmpDir, 'events.db-shm'), 'shm-data'); - const config = makeConfig(tmpDir); - - migrateToScoped(config); - - const systemDbPath = join(tmpDir, 'scopes', '_system.db'); - expect(existsSync(`${systemDbPath}-wal`)).toBe(true); - expect(existsSync(`${systemDbPath}-shm`)).toBe(true); - expect(readFileSync(`${systemDbPath}-wal`, 'utf-8')).toBe('wal-data'); - expect(readFileSync(`${systemDbPath}-shm`, 'utf-8')).toBe('shm-data'); - }); -}); diff --git a/packages/upulse/src/migrate.ts b/packages/upulse/src/migrate.ts deleted file mode 100644 index defddfd..0000000 --- a/packages/upulse/src/migrate.ts +++ /dev/null @@ -1,79 +0,0 @@ -/** - * migrate.ts — Automatic data migration from legacy flat layout to scoped layout. - * - * Legacy: ~/.upulse/events.db + ~/.upulse/vitals.db - * Scoped: ~/.upulse/scopes/_system.db + ~/.upulse/scopes/_vitals.db (events table) - * - * Vitals records are converted to events with kind='vital'. - * Old files are preserved as backups — they are never deleted. - */ - -import { Database } from 'bun:sqlite'; -import { copyFileSync, existsSync, mkdirSync } from 'node:fs'; -import { join } from 'node:path'; -import { createScopedStore } from '@uncaged/pulse'; -import type { UpulseConfig } from './config.js'; - -export function migrateToScoped(config: UpulseConfig): void { - const scopesDir = config.store.scopesDir; - const systemDbPath = join(scopesDir, '_system.db'); - - // Already migrated — skip - if (existsSync(systemDbPath)) return; - - const legacyEvents = join(config.dir, 'events.db'); - const legacyVitals = join(config.dir, 'vitals.db'); - - // Nothing to migrate - if (!existsSync(legacyEvents) && !existsSync(legacyVitals)) return; - - mkdirSync(scopesDir, { recursive: true }); - - if (existsSync(legacyEvents)) { - copyFileSync(legacyEvents, systemDbPath); - for (const ext of ['-wal', '-shm']) { - if (existsSync(legacyEvents + ext)) { - copyFileSync(legacyEvents + ext, systemDbPath + ext); - } - } - console.log('Migrated events.db → scopes/_system.db'); - } - - if (existsSync(legacyVitals)) { - const vitalsDbPath = join(scopesDir, '_vitals.db'); - if (!existsSync(vitalsDbPath)) { - const oldDb = new Database(legacyVitals, { readonly: true }); - const rows = oldDb - .query( - 'SELECT id, occurred_at, key, hash, meta FROM vitals ORDER BY occurred_at', - ) - .all() as Array<{ - id: string; - occurred_at: number; - key: string; - hash: string | null; - meta: string | null; - }>; - oldDb.close(); - - const scopedStore = createScopedStore({ - basePath: scopesDir, - objectsDir: config.store.objectsDir, - }); - const vitalsStore = scopedStore.scope('_vitals'); - for (const row of rows) { - vitalsStore.appendEvent({ - occurredAt: row.occurred_at, - kind: 'vital', - key: row.key, - hash: row.hash ?? undefined, - meta: row.meta ?? undefined, - }); - } - scopedStore.close(); - console.log( - `Migrated vitals.db → scopes/_vitals.db (${rows.length} records as events)`, - ); - } - } -} diff --git a/packages/upulse/src/store.ts b/packages/upulse/src/store.ts index aa5bc27..853ac14 100644 --- a/packages/upulse/src/store.ts +++ b/packages/upulse/src/store.ts @@ -5,13 +5,16 @@ * Vitals use ScopedStore → scope('_vitals') with kind='vital' events. */ -import { existsSync } from 'node:fs'; +import { existsSync, mkdirSync } from 'node:fs'; +import { dirname, join } from 'node:path'; import { createScopedStore, + createStore, type EventRecord, type PulseStore, type ScopedStore, } from '@uncaged/pulse'; +import type { UpulseConfig } from './config.js'; export type { EventRecord, PulseStore, ScopedStore }; @@ -65,3 +68,28 @@ export function openOrCreateStore( const scoped = openOrCreateScopedStore(scopesDir, objectsDir); return scoped.scope('_system'); } + +/** Paths for workflow daemon / CLI (workflows.db + scopes/objects). */ +export function workflowDataPaths(config: UpulseConfig): { + eventsDbPath: string; + objectsDir: string; +} { + return { + eventsDbPath: join(config.store.scopesDir, 'workflows.db'), + objectsDir: join(config.store.scopesDir, 'objects'), + }; +} + +/** Open workflow event store if workflows.db exists (aligns with workflow-daemon). */ +export function openWorkflowsStore(config: UpulseConfig): PulseStore | null { + const { eventsDbPath, objectsDir } = workflowDataPaths(config); + if (!existsSync(eventsDbPath)) return null; + return createStore({ eventsDbPath, objectsDir }); +} + +/** Create dirs and open workflow store (for submit). */ +export function openOrCreateWorkflowsStore(config: UpulseConfig): PulseStore { + const { eventsDbPath, objectsDir } = workflowDataPaths(config); + mkdirSync(dirname(eventsDbPath), { recursive: true }); + return createStore({ eventsDbPath, objectsDir }); +} diff --git a/packages/upulse/src/ui/dashboard.ts b/packages/upulse/src/ui/dashboard.ts index 89f7713..f45740b 100644 --- a/packages/upulse/src/ui/dashboard.ts +++ b/packages/upulse/src/ui/dashboard.ts @@ -610,7 +610,7 @@ function renderEvents(events) { const list = document.getElementById('eventList'); if (!events || events.length === 0) { list.innerHTML = '<div class="empty"><div class="empty-icon">◇</div>' + - '<div class="empty-text">No events yet. Start the daemon to begin collecting data.</div></div>'; + '<div class="empty-text">No events yet. Start the pulse engine (e.g. systemctl --user start pulse.service) to begin collecting data.</div></div>'; return; } @@ -658,7 +658,7 @@ async function loadSenseKeys() { const tabs = document.getElementById('vitalsTabs'); if (!data.keys || data.keys.length === 0) { - tabs.innerHTML = '<div class="empty-text" style="color:var(--text-dim)">No sense keys found. Start the daemon to collect data.</div>'; + tabs.innerHTML = '<div class="empty-text" style="color:var(--text-dim)">No sense keys found. Start the pulse engine (e.g. systemctl --user start pulse.service) to collect data.</div>'; return; } @@ -839,7 +839,7 @@ async function loadDeploys() { if (!data.deploys || data.deploys.length === 0) { list.innerHTML = '<div class="empty"><div class="empty-icon">↑</div>' + - '<div class="empty-text">No deploy history. Use "upulse deploy promote" to create one.</div></div>'; + '<div class="empty-text">No deploy history yet. Promote/rollback events appear here when recorded by the engine.</div></div>'; return; }