feat: upulse CLI refactor — 4 commands (init/inspect/workflow/status)
CI / test (push) Has been cancelled

Auto-generated by meta workflow (cli-cleanup topic).
Removed: daemon, deploy, dev, gc, list, tick, ui commands.
Added: status (systemd + db stats), updated inspect + workflow.
This commit is contained in:
2026-04-17 13:30:17 +00:00
parent 620c5a8323
commit 3e30503c08
29 changed files with 511 additions and 2978 deletions
@@ -95,6 +95,22 @@ describe('coding-tdd WorkflowType', () => {
expect(wf.moderator({ role: START, meta: null }, 'x')).toBe('test-planner');
});
it('moderator: test-planner → test-reviewer', () => {
const wf = createTddCodingWorkflow();
expect(
wf.moderator(
{
role: 'test-planner',
meta: {
testPlan: '# p',
scenarios: ['a'],
},
},
'x',
),
).toBe('test-reviewer');
});
it('moderator: test-reviewer approved → test-coder', () => {
const wf = createTddCodingWorkflow();
expect(
@@ -146,6 +162,22 @@ describe('coding-tdd WorkflowType', () => {
).toBe('coder');
});
it('moderator: coder → auto-tester', () => {
const wf = createTddCodingWorkflow();
expect(
wf.moderator(
{
role: 'coder',
meta: {
filesChanged: ['src/foo.ts'],
deploymentGuide: 'bun test',
},
},
'x',
),
).toBe('auto-tester');
});
it('moderator: auto-tester pass → manual-tester', () => {
const wf = createTddCodingWorkflow();
expect(
@@ -243,6 +275,25 @@ describe('coding-tdd WorkflowType', () => {
).toBe('closer');
});
it('moderator: reviewer approved + emergency → closer', () => {
const wf = createTddCodingWorkflow();
expect(
wf.moderator(
{
role: 'reviewer',
meta: {
verdict: 'approved',
comments: '',
codeQuality: 'a',
testQuality: 'a',
},
},
'x',
1,
),
).toBe('closer');
});
it('moderator: reviewer rejected → coder', () => {
const wf = createTddCodingWorkflow();
expect(
@@ -3,6 +3,9 @@
*
* Pure roles + START/END automaton. Trigger: coding-tdd.__start__
*
* 设计里的 role `type`(llm / code / agent)在实现层统一为 `Role<…>`;默认全部为可注入的
* mock,生产环境通过 `CreateTddCodingWorkflowOpts` 传入 `createLlmRole` / agent 工厂等。
*
* 小橘 🍊 (NEKO Team)
*/
+1 -1
View File
@@ -1,2 +1,2 @@
[test]
root = "src"
pathIgnorePatterns = ["**/src/e2e/**"]
+1 -2
View File
@@ -13,8 +13,7 @@
"scripts": {
"build": "tsc",
"dev": "bun run src/cli.ts",
"test": "bun test --timeout 60000",
"test:e2e": "bun test --timeout 120000 src/e2e/"
"test": "bun test --timeout 60000 ./src"
},
"dependencies": {
"@uncaged/pulse": "workspace:*",
+2 -14
View File
@@ -7,15 +7,9 @@
*/
import { Command } from 'commander';
import { registerDaemonCommand } from './commands/daemon.js';
import { registerDeployCommand } from './commands/deploy.js';
import { registerDevCommand } from './commands/dev.js';
import { registerGcCommand } from './commands/gc.js';
import { registerInitCommand } from './commands/init.js';
import { registerInspectCommand } from './commands/inspect.js';
import { registerListCommand } from './commands/list.js';
import { registerTickCommand } from './commands/tick.js';
import { registerUICommand } from './commands/ui.js';
import { registerStatusCommand } from './commands/status.js';
import { registerWorkflowCommand } from './commands/workflow.js';
const program = new Command();
@@ -30,14 +24,8 @@ program
);
registerInitCommand(program);
registerDaemonCommand(program);
registerTickCommand(program);
registerListCommand(program);
registerInspectCommand(program);
registerDevCommand(program);
registerDeployCommand(program);
registerGcCommand(program);
registerWorkflowCommand(program);
registerUICommand(program);
registerStatusCommand(program);
program.parse();
-47
View File
@@ -1,47 +0,0 @@
/**
* commands/daemon.ts — upulse daemon start/stop/status
*/
import type { Command } from 'commander';
import { loadConfig, resolveDir } from '../config.js';
import { daemonStart, daemonStatus, daemonStop } from '../daemon.js';
export function registerDaemonCommand(program: Command): void {
const daemon = program
.command('daemon')
.description('Daemon lifecycle management');
daemon
.command('start')
.description('Start the Pulse daemon')
.option('--foreground', 'Run in foreground (no detach)')
.action((opts: { foreground?: boolean }) => {
const config = loadConfig(resolveDir(program.opts().dir));
daemonStart(config, opts.foreground ?? false);
});
daemon
.command('stop')
.description('Stop the Pulse daemon')
.action(() => {
const config = loadConfig(resolveDir(program.opts().dir));
daemonStop(config);
});
daemon
.command('restart')
.description('Restart the Pulse daemon')
.action(() => {
const config = loadConfig(resolveDir(program.opts().dir));
daemonStop(config);
daemonStart(config, false);
});
daemon
.command('status')
.description('Show daemon status')
.action(() => {
const config = loadConfig(resolveDir(program.opts().dir));
daemonStatus(config);
});
}
-538
View File
@@ -1,538 +0,0 @@
/**
* commands/deploy.ts — upulse deploy promote/rollback
*
* Phase 4: Version-aware promote/rollback with event-sourced versioning.
* All state changes are append-only events.
*/
import { execSync } from 'node:child_process';
import { existsSync } from 'node:fs';
import { join } from 'node:path';
import type { Command } from 'commander';
import { loadConfig, resolveDir } from '../config.js';
import { daemonStart, daemonStop, isDaemonRunning } from '../daemon.js';
import { gitExec, gitMerge, gitResetHard, gitRevert } from '../git.js';
import { migrateToScoped } from '../migrate.js';
import {
type EventRecord,
openOrCreateStore,
openStore,
type PulseStore,
} from '../store.js';
// ── Helpers ────────────────────────────────────────────────────
function typeCheck(cwd: string): boolean {
try {
// Pure type checking — no emit, no dist/ needed
execSync('npx tsc --noEmit', {
cwd,
encoding: 'utf-8',
stdio: ['pipe', 'pipe', 'pipe'],
});
return true;
} catch (err: unknown) {
const msg =
err instanceof Error
? ((err as { stderr?: string }).stderr ?? err.message)
: String(err);
console.error(`Type check failed:\n${msg}`);
return false;
}
}
function getGitHash(cwd: string): string {
return gitExec(cwd, 'rev-parse --short HEAD');
}
function getCurrentCodeRev(store: PulseStore): string | undefined {
const promote = store.getLatest('promote');
return promote?.codeRev ?? undefined;
}
/**
* Find the previous promote event with a different code_rev than the current one.
* queryByKind returns newest-first, so find the first with a different codeRev.
*/
function findPreviousPromote(
store: PulseStore,
currentCodeRev: string | undefined,
): EventRecord | null {
const allPromotes = store.queryByKind('promote');
for (const promote of allPromotes) {
if (promote.codeRev !== currentCodeRev) {
return promote;
}
}
return null;
}
/**
* Try to run migrate/initSenses from staging source, writing events.
* Best-effort: failures are logged as warnings, not fatal.
*/
async function tryWriteMigrateEvents(
store: PulseStore,
config: { scopesDir: string; objectsDir: string },
stagingSrcPath: string,
newCodeRev: string,
): Promise<void> {
const migrateFile = join(stagingSrcPath, 'rules', 'migrate.ts');
if (!existsSync(migrateFile)) return;
try {
const mod = (await import(migrateFile)) as Record<string, unknown>;
const migrateFn = (mod.default ?? mod.migrate) as
| ((snapshot: Record<string, unknown>) => Record<string, unknown>)
| undefined;
if (!migrateFn) return;
const { createScopedStore, findEffectiveEpoch, rebuildSnapshot } =
await import('@uncaged/pulse');
const scopedStore = createScopedStore({
basePath: config.scopesDir,
objectsDir: config.objectsDir,
});
const systemStore = scopedStore.scope('_system');
const vitalsStore = scopedStore.scope('_vitals');
const epoch = findEffectiveEpoch(systemStore);
// Discover sense keys from vitals scope, then fall back to collect events
let senseKeys: string[] = [];
const vitalEvents = vitalsStore.queryByKind('vital', { limit: 200 });
senseKeys = [
...new Set(vitalEvents.map((e) => e.key).filter(Boolean) as string[]),
];
if (senseKeys.length === 0) {
const collects = systemStore.queryByKind('collect', { limit: 100 });
senseKeys = [
...new Set(collects.map((e) => e.key).filter(Boolean) as string[]),
];
}
const currentSnapshot = rebuildSnapshot(
{ system: systemStore, vitals: vitalsStore },
senseKeys,
epoch,
);
scopedStore.close();
const migrated = migrateFn(currentSnapshot);
for (const [key, data] of Object.entries(migrated)) {
if (key === 'timestamp') continue;
const value = (data as { data?: unknown })?.data ?? data;
store.appendEvent({
occurredAt: Date.now(),
kind: 'migrate',
key,
hash: store.putObject(value),
codeRev: newCodeRev,
});
}
console.log(' ✓ migrate events written');
} catch (err: unknown) {
console.error(
` ✗ migrate failed: ${err instanceof Error ? err.message : String(err)}`,
);
console.error(' Promote aborted. Fix migrate.ts and try again.');
process.exit(1);
}
}
async function tryWriteInitEvents(
store: PulseStore,
stagingSrcPath: string,
newCodeRev: string,
): Promise<void> {
const initFile = join(stagingSrcPath, 'rules', 'initSenses.ts');
if (!existsSync(initFile)) return;
try {
const mod = (await import(initFile)) as Record<string, unknown>;
const initSenses = (mod.default ?? mod.initSenses) as
| Record<string, unknown>
| undefined;
if (!initSenses || typeof initSenses !== 'object') return;
for (const [key, data] of Object.entries(initSenses)) {
store.appendEvent({
occurredAt: Date.now(),
kind: 'init',
key,
hash: store.putObject(data),
codeRev: newCodeRev,
});
}
console.log(' ✓ init events written');
} catch (err: unknown) {
console.warn(
` ⚠ initSenses skipped: ${err instanceof Error ? err.message : String(err)}`,
);
}
}
// ── Command Registration ───────────────────────────────────────
export function registerDeployCommand(program: Command): void {
const deploy = program
.command('deploy')
.description('Deploy staging changes to engine');
deploy
.command('promote')
.description(
'Merge staging → engine (tsc verify + bun test + version events + daemon reload)',
)
.action(async () => {
const config = loadConfig(resolveDir(program.opts().dir));
migrateToScoped(config);
const running = isDaemonRunning(config);
// Step 1: Type-check staging
console.log('Step 1: Type-checking staging...');
if (!typeCheck(config.staging.path)) {
console.error('Error: staging type check failed. Aborting promote.');
process.exit(1);
}
console.log(' ✓ staging type-checks');
// Step 2: Run bun test in staging before merge
console.log('Step 2: Running bun test in staging...');
try {
execSync('bun test', {
cwd: config.staging.path,
stdio: ['pipe', 'pipe', 'pipe'],
encoding: 'utf-8',
});
console.log(' ✓ staging tests pass');
} catch (testErr: unknown) {
const errObj = testErr as { stderr?: string; stdout?: string };
const stderr = errObj.stderr ?? '';
// "No tests found" is OK — staging may not have test files
if (stderr.includes('No tests found')) {
console.log(' ✓ no tests in staging (skipped)');
} else {
console.error('Error: staging tests failed. Aborting promote.');
if (stderr) console.error(stderr);
process.exit(1);
}
}
// Note staging source path before merge (Bun runs .ts directly)
const stagingSrcPath = config.staging.path;
// Step 3: Stop daemon before merge (avoid running stale code during merge)
if (running) {
console.log('Step 3: Stopping daemon before merge...');
try {
daemonStop(config);
console.log(' ✓ daemon stopped');
} catch (err: unknown) {
console.error(
`Warning: daemon stop failed: ${err instanceof Error ? err.message : String(err)}`,
);
}
}
// Step 4: Merge staging → engine
console.log('Step 4: Merging staging → engine...');
try {
gitMerge(config.engine.path, 'staging');
console.log(' ✓ merge successful');
} catch (err: unknown) {
console.error(
`Error: merge failed: ${err instanceof Error ? err.message : String(err)}`,
);
console.error('Resolve conflicts manually in engine/ and retry.');
// Restart daemon with old code if it was running
if (running) {
try {
daemonStart(config, false);
console.log(' ✓ daemon restarted with old code');
} catch {}
}
process.exit(1);
}
// Step 5: Run bun install after merge (picks up new deps)
console.log('Step 5: Running bun install after merge...');
try {
// Try --frozen-lockfile first (atomic, reproducible)
try {
execSync('bun install --frozen-lockfile', {
cwd: config.engine.path,
stdio: ['pipe', 'pipe', 'pipe'],
});
console.log(' ✓ bun install --frozen-lockfile complete');
} catch {
// Fallback: no lockfile or deps changed — regular install
execSync('bun install', {
cwd: config.engine.path,
stdio: 'inherit',
});
console.log(' ✓ bun install complete (lockfile updated)');
}
} catch {
console.error('Error: bun install failed after merge. Rolling back...');
// Rollback: reset engine to pre-merge state
try {
gitResetHard(config.engine.path, 'HEAD~1');
console.error(
'Rolled back engine to previous commit. Restarting with old code...',
);
// Write error event for monitoring
const errorStore = openOrCreateStore(
config.store.scopesDir,
config.store.objectsDir,
);
errorStore.appendEvent({
occurredAt: Date.now(),
kind: 'error',
meta: JSON.stringify({
type: 'promote_rollback',
reason: 'bun_install_failed',
}),
});
errorStore.close();
execSync('bun install --no-cache', {
cwd: config.engine.path,
stdio: 'inherit',
});
} catch {
console.error(
'Warning: rollback bun install also failed. Manual intervention required.',
);
}
if (running) {
try {
daemonStart(config, false);
console.log(' ✓ daemon restarted with old code');
} catch {}
}
process.exit(1);
}
// Step 6: Type-check engine (second verification)
console.log('Step 6: Type-checking engine...');
if (!typeCheck(config.engine.path)) {
console.error(
'Error: engine type check failed after merge. Rolling back...',
);
gitResetHard(config.engine.path, 'HEAD~1');
console.error('Rolled back to previous state.');
if (running) {
try {
daemonStart(config, false);
} catch {}
}
process.exit(1);
}
console.log(' ✓ engine type-checks');
// Step 7: Write version events
console.log('Step 7: Writing version events...');
const store = openOrCreateStore(
config.store.scopesDir,
config.store.objectsDir,
);
{
const currentCodeRev = getCurrentCodeRev(store);
const newCodeRev = getGitHash(config.engine.path);
// Write migrate events (best-effort)
await tryWriteMigrateEvents(
store,
config.store,
stagingSrcPath,
newCodeRev,
);
// Write init events (best-effort)
await tryWriteInitEvents(store, stagingSrcPath, newCodeRev);
// Write promote event (always)
store.appendEvent({
occurredAt: Date.now(),
kind: 'promote',
codeRev: newCodeRev,
meta: JSON.stringify({ from: currentCodeRev ?? null }),
});
console.log(
` ✓ promote event: ${currentCodeRev ?? '(cold start)'}${newCodeRev}`,
);
store.close();
}
// Step 8: Restart daemon after successful promote
if (running) {
console.log('Step 8: Restarting daemon...');
try {
daemonStart(config, false);
console.log(' ✓ daemon restarted');
} catch (err: unknown) {
console.error(
`Warning: daemon restart failed: ${err instanceof Error ? err.message : String(err)}`,
);
}
}
console.log('\n✅ Promote complete!');
});
deploy
.command('rollback')
.description(
'Roll back to previous version (append-only: writes rollback event, reverts engine)',
)
.action(() => {
const config = loadConfig(resolveDir(program.opts().dir));
migrateToScoped(config);
const running = isDaemonRunning(config);
// Step 1: Open store and find versions
console.log('Step 1: Finding versions...');
const store = openStore(config.store.scopesDir, config.store.objectsDir);
if (!store) {
console.error(
'Error: no events.db found. Cannot determine versions for rollback.',
);
process.exit(1);
}
const currentPromote = store.getLatest('promote');
if (!currentPromote) {
console.error('Error: no promote event found. Nothing to roll back.');
store.close();
process.exit(1);
}
const previousPromote = findPreviousPromote(
store,
currentPromote.codeRev,
);
if (!previousPromote) {
console.error('Error: no previous version to roll back to.');
store.close();
process.exit(1);
}
console.log(` Current: ${currentPromote.codeRev}`);
console.log(` Rolling back to: ${previousPromote.codeRev}`);
// Step 2: Write rollback event (append-only, before git changes)
console.log('Step 2: Writing rollback event...');
store.appendEvent({
occurredAt: Date.now(),
kind: 'rollback',
codeRev: previousPromote.codeRev,
meta: JSON.stringify({
from: currentPromote.codeRev,
to: previousPromote.codeRev,
}),
});
console.log(' ✓ rollback event written');
store.close();
// Step 3: Stop daemon before revert
if (running) {
console.log('Step 3: Stopping daemon before revert...');
try {
daemonStop(config);
console.log(' ✓ daemon stopped');
} catch (err: unknown) {
console.error(
`Warning: daemon stop failed: ${err instanceof Error ? err.message : String(err)}`,
);
}
}
// Step 4: Revert engine git commit
console.log('Step 4: Reverting engine commit...');
try {
gitRevert(config.engine.path, 'HEAD');
console.log(' ✓ reverted');
} catch (err: unknown) {
console.error(
`Error: revert failed: ${err instanceof Error ? err.message : String(err)}`,
);
// Restart daemon if it was running
if (running) {
try {
daemonStart(config, false);
console.log(' ✓ daemon restarted');
} catch {}
}
process.exit(1);
}
// Step 5: Run bun install after revert
console.log('Step 5: Running bun install after revert...');
try {
// Try --frozen-lockfile first (atomic, reproducible)
try {
execSync('bun install --frozen-lockfile', {
cwd: config.engine.path,
stdio: ['pipe', 'pipe', 'pipe'],
});
console.log(' ✓ bun install --frozen-lockfile complete');
} catch {
execSync('bun install', {
cwd: config.engine.path,
stdio: 'inherit',
});
console.log(' ✓ bun install complete (lockfile updated)');
}
} catch (installErr: unknown) {
console.error(
`Error: bun install failed after revert: ${installErr instanceof Error ? installErr.message : String(installErr)}`,
);
// Write warning event for monitoring
try {
const warningStore = openOrCreateStore(
config.store.scopesDir,
config.store.objectsDir,
);
warningStore.appendEvent({
occurredAt: Date.now(),
kind: 'warn',
meta: JSON.stringify({
type: 'rollback_install_failed',
reason:
installErr instanceof Error
? installErr.message
: String(installErr),
}),
});
warningStore.close();
} catch {
/* best-effort */
}
process.exit(1);
}
// Step 6: Type-check engine
console.log('Step 6: Type-checking engine...');
if (!typeCheck(config.engine.path)) {
console.error('Error: engine type check failed after revert.');
process.exit(1);
}
console.log(' ✓ engine type-checks');
// Step 7: Restart daemon if it was running
if (running) {
console.log('Step 7: Restarting daemon...');
try {
daemonStart(config, false);
console.log(' ✓ daemon restarted');
} catch (err: unknown) {
console.error(
`Warning: daemon restart failed: ${err instanceof Error ? err.message : String(err)}`,
);
}
}
console.log('\n✅ Rollback complete!');
});
}
-52
View File
@@ -1,52 +0,0 @@
/**
* commands/dev.ts — upulse dev path/build
*/
import { execSync } from 'node:child_process';
import { join } from 'node:path';
import type { Command } from 'commander';
import { loadConfig, resolveDir } from '../config.js';
export function registerDevCommand(program: Command): void {
const dev = program
.command('dev')
.description('Development commands for staging');
dev
.command('path')
.description('Output staging/ or engine/ absolute path')
.option('--engine', 'Output engine path instead of staging')
.action((opts: { engine?: boolean }) => {
const config = loadConfig(resolveDir(program.opts().dir));
if (opts.engine) {
process.stdout.write(`${config.engine.path}\n`);
} else {
process.stdout.write(`${config.staging.path}\n`);
}
});
dev
.command('build')
.description('tsc compile staging/')
.action(() => {
const config = loadConfig(resolveDir(program.opts().dir));
const tscBin = join(
config.engine.path,
'node_modules',
'typescript',
'bin',
'tsc',
);
console.log(`Compiling staging (${config.staging.path})...`);
try {
execSync(`node ${tscBin}`, {
cwd: config.staging.path,
stdio: 'inherit',
});
console.log('✓ Build successful');
} catch {
console.error('Error: tsc compilation failed.');
process.exit(1);
}
});
}
-126
View File
@@ -1,126 +0,0 @@
/**
* commands/gc.ts — upulse gc vitals
*
* Housekeeping for vitals: archive old records, downsample.
* Vitals are stored as events (kind='vital') in the _vitals scope.
*/
import type { Command } from 'commander';
import { loadConfig, resolveDir } from '../config.js';
import { migrateToScoped } from '../migrate.js';
import { openScopedStore } from '../store.js';
function parseDuration(s: string): number {
const match = s.match(/^(\d+)(ms|s|m|h|d)?$/);
if (!match) {
console.error(
`Error: invalid duration "${s}". Use format: 7d, 24h, 30m, 60s, 1000ms`,
);
process.exit(1);
}
const val = parseInt(match[1], 10);
const unit = match[2] ?? 'd';
const multiplier: Record<string, number> = {
ms: 1,
s: 1000,
m: 60_000,
h: 3_600_000,
d: 86_400_000,
};
return val * (multiplier[unit] ?? 86_400_000);
}
export function registerGcCommand(program: Command): void {
const gc = program
.command('gc')
.description('Housekeeping: archive and downsample vitals');
gc.command('vitals')
.description('Archive old vitals and optionally downsample')
.option(
'--keep <duration>',
'Delete vitals older than this (default: 7d)',
'7d',
)
.option(
'--downsample <duration>',
'Downsample interval (e.g., 1m, 5m). If omitted, no downsampling.',
)
.option('--dry-run', 'Show what would happen without making changes', false)
.action((opts: { keep: string; downsample?: string; dryRun: boolean }) => {
const config = loadConfig(resolveDir(program.opts().dir));
migrateToScoped(config);
const scopedStore = openScopedStore(
config.store.scopesDir,
config.store.objectsDir,
);
if (!scopedStore) {
console.log('No scopes directory found. Daemon may not have run yet.');
return;
}
const vitalsStore = scopedStore.scope('_vitals');
const keepMs = parseDuration(opts.keep);
const olderThan = Date.now() - keepMs;
const olderThanStr = new Date(olderThan).toISOString();
if (opts.dryRun) {
console.log('Dry run — no changes will be made.\n');
}
console.log(`Archive threshold: ${opts.keep} (before ${olderThanStr})`);
if (opts.downsample) {
const intervalMs = parseDuration(opts.downsample);
console.log(
`Downsample interval: ${opts.downsample} (${intervalMs}ms)`,
);
if (!opts.dryRun) {
console.log('\n[1/2] Downsampling vitals...');
// Discover distinct keys from recent vital events
const recentVitals = vitalsStore.queryByKind('vital', { limit: 200 });
const keys = [
...new Set(
recentVitals.map((e) => e.key).filter(Boolean) as string[],
),
];
let totalDownsampled = 0;
for (const key of keys) {
const deleted = vitalsStore.downsampleEvents(
'vital',
key,
intervalMs,
olderThan,
);
totalDownsampled += deleted;
}
console.log(
` Downsampled ${totalDownsampled} vital(s) across ${keys.length} key(s).`,
);
console.log('\n[2/2] Archiving vitals...');
const archived = vitalsStore.archiveEvents(olderThan);
console.log(
` Archived ${archived} vital(s) older than ${olderThanStr}.`,
);
} else {
console.log('\n(dry-run) Would downsample and archive vitals.');
}
} else {
if (!opts.dryRun) {
const archived = vitalsStore.archiveEvents(olderThan);
console.log(
`\nArchived ${archived} vital(s) older than ${olderThanStr}.`,
);
} else {
console.log('\n(dry-run) Would archive vitals.');
}
}
scopedStore.close();
});
}
+62 -92
View File
@@ -1,124 +1,94 @@
/**
* commands/inspect.ts — upulse inspect errors/ticks
*
* Reads from the unified `events` table via PulseStore.queryByKind().
* commands/inspect.ts — Inspect workflow daemon store (events + CAS objects)
*/
import type { Command } from 'commander';
import { loadConfig, resolveDir } from '../config.js';
import { migrateToScoped } from '../migrate.js';
import { openStore } from '../store.js';
function parseDuration(s: string): number {
const match = s.match(/^(\d+)(m|h|d|s)?$/);
if (!match) {
console.error(
`Error: invalid duration "${s}". Use format: 1h, 30m, 7d, 3600s`,
);
process.exit(1);
}
const val = parseInt(match[1], 10);
const unit = match[2] ?? 'm';
const multiplier: Record<string, number> = {
s: 1000,
m: 60_000,
h: 3_600_000,
d: 86_400_000,
};
return val * (multiplier[unit] ?? 60_000);
}
import {
openOrCreateWorkflowsStore,
openWorkflowsStore,
type EventRecord,
} from '../store.js';
export function registerInspectCommand(program: Command): void {
const inspect = program
.command('inspect')
.description('Inspect Pulse runtime state and history');
.description('Inspect workflow events and CAS objects');
inspect
.command('errors')
.description('Show recent errors')
.option('--since <duration>', 'Time window (e.g., 1h, 30m)', '1h')
.action((opts: { since: string }) => {
.command('events')
.description('List events from workflows.db')
.option('--topic <topicId>', 'Filter by topic key')
.option('--kind <kind>', 'Filter by event kind (e.g. meta.architect)')
.option('--limit <n>', 'Max rows', '20')
.action((opts: { topic?: string; kind?: string; limit: string }) => {
const config = loadConfig(resolveDir(program.opts().dir));
migrateToScoped(config);
const store = openStore(config.store.scopesDir, config.store.objectsDir);
const store = openWorkflowsStore(config);
const limit = Math.max(1, parseInt(opts.limit, 10) || 20);
if (!store) {
console.log('No events.db found. Daemon may not have run yet.');
console.log('No workflows.db yet. Run the workflow daemon or `upulse workflow submit`.');
return;
}
const sinceMs = Date.now() - parseDuration(opts.since);
const errors = store.queryByKind('error', { since: sinceMs });
let rows: EventRecord[];
if (errors.length === 0) {
console.log(`No errors in the last ${opts.since}.`);
if (opts.kind) {
rows = store.queryByKind(opts.kind, {
key: opts.topic,
limit,
});
} else if (opts.topic) {
const pool = store
.getAfter(0, { key: opts.topic })
.sort((a, b) => b.occurredAt - a.occurredAt || b.id - a.id)
.slice(0, limit);
rows = pool;
} else {
console.log(`Errors (last ${opts.since}):`);
console.log('');
for (const err of errors) {
const ts = new Date(err.occurredAt).toISOString();
const key = err.key ? ` [${err.key}]` : '';
const detail = err.meta ? ` ${err.meta}` : '';
console.log(` ${ts}${key}${detail}`);
}
console.log(`\n Total: ${errors.length} error(s)`);
rows = store.getRecent(limit);
}
store.close();
});
inspect
.command('ticks')
.description('Show recent ticks')
.option('--limit <n>', 'Number of ticks to show', '10')
.action((opts: { limit: string }) => {
const config = loadConfig(resolveDir(program.opts().dir));
migrateToScoped(config);
const store = openStore(config.store.scopesDir, config.store.objectsDir);
if (!store) {
console.log('No events.db found. Daemon may not have run yet.');
return;
}
const limit = parseInt(opts.limit, 10) || 10;
const ticks = store.queryByKind('tick', { limit });
if (ticks.length === 0) {
console.log('No ticks recorded yet.');
if (rows.length === 0) {
console.log('No matching events.');
} else {
console.log(`Recent ticks (last ${limit}):`);
console.log('');
console.log(
' Timestamp | tickMs | Effects | Duration',
'id'.padEnd(6) +
'occurredAt (ISO)'.padEnd(28) +
'kind'.padEnd(28) +
'key',
);
console.log(
' ------------------------------------------------------------------',
);
for (const tick of ticks) {
const ts = new Date(tick.occurredAt).toISOString();
let tickMs = '-';
let effectCount = '-';
let durationMs = '-';
if (tick.meta) {
try {
const meta = JSON.parse(tick.meta);
if (meta.tick_ms !== undefined) tickMs = String(meta.tick_ms);
if (meta.effect_count !== undefined)
effectCount = String(meta.effect_count);
if (meta.duration_ms !== undefined)
durationMs = `${meta.duration_ms}ms`;
} catch {
// ignore parse errors
}
}
console.log('-'.repeat(100));
for (const e of rows) {
const ts = new Date(e.occurredAt).toISOString();
const k = e.key ?? '';
console.log(
` ${ts} | ${tickMs.padStart(6)} | ${effectCount.padStart(7)} | ${durationMs}`,
`${String(e.id).padEnd(6)}${ts.padEnd(28)}${e.kind.padEnd(28)}${k}`,
);
}
console.log(`\n Total: ${ticks.length} tick(s)`);
console.log(`\nTotal: ${rows.length}`);
}
store.close();
});
inspect
.command('object')
.description('Read a CAS object by hash')
.argument('<hash>', 'Object hash (32 hex chars)')
.action((hash: string) => {
const config = loadConfig(resolveDir(program.opts().dir));
const store = openOrCreateWorkflowsStore(config);
const obj = store.getObject(hash);
store.close();
if (obj === null) {
console.error(`No object for hash: ${hash}`);
process.exit(1);
}
console.log(
typeof obj === 'string' ? obj : JSON.stringify(obj, null, 2),
);
});
}
-70
View File
@@ -1,70 +0,0 @@
/**
* commands/list.ts — upulse list (show current rule chain)
*/
import { existsSync, readdirSync, readFileSync } from 'node:fs';
import { join } from 'node:path';
import type { Command } from 'commander';
import { loadConfig, resolveDir } from '../config.js';
export function registerListCommand(program: Command): void {
program
.command('list')
.description('Show current rule chain (from engine)')
.action(() => {
const config = loadConfig(resolveDir(program.opts().dir));
const rulesDir = join(config.engine.path, 'rules');
if (!existsSync(rulesDir)) {
console.error('Error: rules/ directory not found in engine.');
process.exit(1);
}
// List rule files sorted by name (number prefix = order)
const files = readdirSync(rulesDir)
.filter((f) => f.endsWith('.ts'))
.sort();
if (files.length === 0) {
console.log('No rules found.');
return;
}
console.log('Rule chain:');
console.log('');
for (let i = 0; i < files.length; i++) {
const file = files[i];
const filePath = join(rulesDir, file);
// Try to extract a description from comments
const content = readFileSync(filePath, 'utf-8');
const commentMatch = content.match(/\/\/\s*(.+)/);
const desc = commentMatch ? commentMatch[1].trim() : '(no description)';
console.log(` ${i + 1}. ${file}`);
console.log(` ${desc}`);
}
// Also check pulse.config.ts for inline rules
const configFile = join(config.engine.path, 'pulse.config.ts');
if (existsSync(configFile)) {
const configContent = readFileSync(configFile, 'utf-8');
const rulesMatch = configContent.match(
/const rules\s*=\s*\[([\s\S]*?)\]/,
);
if (rulesMatch) {
console.log(`\n Inline rules in pulse.config.ts:`);
const inlineRules = rulesMatch[1]
.split('\n')
.map((l) => l.trim())
.filter((l) => l && !l.startsWith('//') && !l.startsWith(']'));
for (const rule of inlineRules) {
console.log(` ${rule.replace(/,\s*$/, '')}`);
}
}
}
console.log(`\n Total: ${files.length} rule file(s)`);
});
}
+99
View File
@@ -0,0 +1,99 @@
/**
* commands/status.ts — systemd workflow service + workflows.db summary
*/
import { execSync } from 'node:child_process';
import { Database } from 'bun:sqlite';
import type { Command } from 'commander';
import { loadConfig, resolveDir } from '../config.js';
import { workflowDataPaths } from '../store.js';
function systemctlStatus(unit: string): string {
try {
return execSync(`systemctl --user status ${unit} --no-pager 2>&1`, {
encoding: 'utf-8',
maxBuffer: 1024 * 1024,
});
} catch (err) {
const e = err as { stdout?: Buffer; stderr?: Buffer; message?: string };
return (
(e.stdout && e.stdout.toString()) ||
(e.stderr && e.stderr.toString()) ||
e.message ||
String(err)
);
}
}
export function registerStatusCommand(program: Command): void {
program
.command('status')
.description('Workflow daemon systemd status and store summary')
.action(() => {
const config = loadConfig(resolveDir(program.opts().dir));
const { eventsDbPath } = workflowDataPaths(config);
console.log('=== systemd (user) ===\n');
const units = ['pulse-workflow.service', 'pulse.service'];
for (const u of units) {
console.log(`--- ${u} ---`);
console.log(systemctlStatus(u));
console.log('');
}
console.log('=== workflows.db ===\n');
console.log(`Path: ${eventsDbPath}\n`);
try {
const db = new Database(eventsDbPath, { readonly: true });
try {
const row = db
.query(
'SELECT COUNT(*) AS c, MAX(occurred_at) AS m FROM events',
)
.get() as { c: number; m: number | null };
console.log(`Event count: ${row.c}`);
if (row.m != null) {
console.log(`Latest event: ${new Date(row.m).toISOString()}`);
}
} finally {
db.close();
}
} catch {
console.log('(database not readable or missing)');
}
console.log('\n=== active topics (recent keys) ===\n');
try {
const db = new Database(eventsDbPath, { readonly: true });
try {
const recent = db
.query(
`SELECT key, kind, occurred_at FROM events
WHERE key IS NOT NULL AND key != ''
ORDER BY occurred_at DESC, id DESC
LIMIT 30`,
)
.all() as { key: string; kind: string; occurred_at: number }[];
const seen = new Set<string>();
const lines: string[] = [];
for (const r of recent) {
if (seen.has(r.key)) continue;
seen.add(r.key);
lines.push(
` ${r.key} ${r.kind} ${new Date(r.occurred_at).toISOString()}`,
);
if (lines.length >= 10) break;
}
console.log(lines.length ? lines.join('\n') : ' (none)');
} finally {
db.close();
}
} catch {
console.log(' (could not query topics)');
}
console.log('');
});
}
-212
View File
@@ -1,212 +0,0 @@
/**
* commands/tick.ts — upulse tick [--dry-run] [--verbose]
*
* Manually trigger one tick using snapshots rebuilt from vitals/events.
*
* tick is a pure "read → decide → execute" operation:
* 1. Rebuild snapshot from existing vitals/events (no collection)
* 2. Run rules to produce effects
* 3. Execute effects (unless --dry-run)
*
* Data collection is the watcher's responsibility (via daemon).
* If the store is empty, tick sees an empty snapshot — rules will
* produce collect effects, which are logged but not executed.
*/
import { existsSync } from 'node:fs';
import { join } from 'node:path';
import { pathToFileURL } from 'node:url';
import type { Command } from 'commander';
import { loadConfig, resolveDir } from '../config.js';
import { migrateToScoped } from '../migrate.js';
export function registerTickCommand(program: Command): void {
program
.command('tick')
.description('Manually trigger one tick')
.option('--dry-run', 'Calculate effects without executing')
.option('--verbose', 'Show each rule input/output')
.action(async (opts: { dryRun?: boolean; verbose?: boolean }) => {
const config = loadConfig(resolveDir(program.opts().dir));
migrateToScoped(config);
const enginePath = config.engine.path;
try {
// Import @uncaged/pulse from engine's node_modules
const pulseModulePath = join(
enginePath,
'node_modules',
'@uncaged',
'pulse',
'dist',
'index.js',
);
let rebuildSnapshot: (
store: unknown,
senseKeys: string[],
epoch?: unknown,
) => unknown;
let findEffectiveEpoch: (store: unknown) => unknown;
let createScopedStore: (options: {
basePath: string;
objectsDir: string;
}) => { scope(name: string): unknown; close(): void };
let composeRules: (
rules: unknown[],
defaultTickMs?: number,
) => (prev: unknown, curr: unknown) => Promise<[unknown[], number]>;
if (existsSync(pulseModulePath)) {
const pulseModule = await import(pathToFileURL(pulseModulePath).href);
rebuildSnapshot = pulseModule.rebuildSnapshot;
findEffectiveEpoch = pulseModule.findEffectiveEpoch;
createScopedStore = pulseModule.createScopedStore;
composeRules = pulseModule.composeRules;
} else {
console.error(
'Error: @uncaged/pulse not found in engine node_modules.',
);
console.error('Run: cd ~/.upulse/engine && bun install');
process.exit(1);
}
// Import rules (dynamically discover .ts rule files)
const rulesDir = join(enginePath, 'rules');
const ruleFiles: string[] = [];
if (existsSync(rulesDir)) {
const { readdirSync } = await import('node:fs');
ruleFiles.push(
...readdirSync(rulesDir)
.filter((f: string) => f.endsWith('.ts'))
.sort(),
);
}
const rules: unknown[] = [];
for (const file of ruleFiles) {
const rulePath = pathToFileURL(join(rulesDir, file)).href;
const ruleModule = await import(rulePath);
rules.push(ruleModule.default);
}
// Open or create store via ScopedStore
const scopedStore = createScopedStore({
basePath: config.store.scopesDir,
objectsDir: config.store.objectsDir,
});
const store = scopedStore.scope('_system') as {
hasEvents(): boolean;
appendEvent(e: Record<string, unknown>): unknown;
putObject(d: unknown): string;
queryByKind(
kind: string,
opts?: { limit?: number },
): Array<{ key?: string }>;
};
// Discover sense keys from existing collect events in the store
const collects = store.queryByKind('collect', { limit: 100 });
const senseKeys = [
...new Set(collects.map((e) => e.key).filter(Boolean) as string[]),
];
if (senseKeys.length === 0 && opts.verbose) {
console.log(
'Note: no collect events found in store. Snapshot will be empty.',
);
console.log(
'Start the daemon first to collect data, or rules will emit collect effects.',
);
}
// Determine version epoch
const epoch = findEffectiveEpoch(store);
// Rebuild snapshot from vitals/events — no collection
const snapshot = rebuildSnapshot(store, senseKeys, epoch) as {
timestamp: number;
};
console.log('--- Snapshot ---');
console.log(JSON.stringify(snapshot, null, 2));
// Run rule chain (prev = curr for single-tick: no time delta)
const pulse = composeRules(rules, 15000);
const [effects, tickMs] = await pulse(snapshot, snapshot);
if (opts.verbose) {
console.log('\n--- Rules applied ---');
console.log(` Rules count: ${rules.length}`);
for (let i = 0; i < ruleFiles.length; i++) {
console.log(` [${i}] ${ruleFiles[i]}`);
}
}
console.log('\n--- Result ---');
console.log(`Effects (${effects.length}):`);
if (effects.length === 0) {
console.log(' (none)');
} else {
for (const e of effects) {
console.log(` ${JSON.stringify(e)}`);
}
}
console.log(`Next tick: ${tickMs}ms`);
if (opts.dryRun) {
console.log('\n(dry-run: effects not executed)');
} else {
// Execute effects with built-in handlers
for (const effect of effects) {
const e = effect as {
kind: string;
message?: string;
key?: string;
};
switch (e.kind) {
case 'collect':
console.log(
` [COLLECT] ${e.key} (skipped — start daemon for live collection)`,
);
break;
case 'log':
console.log(` [LOG] ${e.message}`);
break;
default:
console.log(` [EFFECT] ${JSON.stringify(e)}`);
break;
}
}
if (effects.length > 0) {
console.log('\n✓ Effects handled (tick mode)');
}
// Record events like runPulse does
if (effects.length > 0) {
const effectsHash = store.putObject(effects);
store.appendEvent({
occurredAt: Date.now(),
kind: 'effect',
hash: effectsHash,
meta: JSON.stringify({ count: effects.length }),
});
}
// Record tick event
store.appendEvent({
occurredAt: Date.now(),
kind: 'tick',
meta: JSON.stringify({
tick_ms: tickMs,
effect_count: effects.length,
}),
});
}
} catch (err: unknown) {
console.error(
`Error during tick: ${err instanceof Error ? err.message : String(err)}`,
);
process.exit(1);
}
});
}
-46
View File
@@ -1,46 +0,0 @@
/**
* commands/ui.ts — upulse ui [--port] [--no-open]
*
* Start a local WebUI dashboard for monitoring Pulse.
*/
import type { Command } from 'commander';
import { loadConfig, resolveDir } from '../config.js';
import { createUIServer } from '../ui/server.js';
export function registerUICommand(program: Command): void {
program
.command('ui')
.description('Start local WebUI dashboard')
.option('--port <number>', 'HTTP port', '3140')
.option('--no-open', 'Do not open browser automatically')
.action(async (opts: { port: string; open: boolean }) => {
const config = loadConfig(resolveDir(program.opts().dir));
const port = parseInt(opts.port, 10);
const server = createUIServer(config, port);
console.log(`\n ⚡ Pulse UI running at http://localhost:${port}\n`);
if (opts.open) {
try {
const { execSync } = await import('node:child_process');
const cmd =
process.platform === 'darwin'
? 'open'
: process.platform === 'win32'
? 'start'
: 'xdg-open';
execSync(`${cmd} http://localhost:${port}`, { stdio: 'ignore' });
} catch {
// Ignore — headless environments
}
}
// Keep alive
process.on('SIGINT', () => {
server.stop();
process.exit(0);
});
});
}
+107 -138
View File
@@ -1,171 +1,140 @@
/**
* workflow.test.ts — Tests for upulse workflow create/list logic (message chain model).
* workflow.test.ts — workflows.db 模型:__start__、按 workflow 分组 list
*/
import { afterEach, beforeEach, describe, expect, it } from 'bun:test';
import { mkdtempSync, rmSync } from 'node:fs';
import { mkdirSync, rmSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import {
createScopedStore,
type PulseStore,
type ScopedStore,
} from '@uncaged/pulse';
import { createStore, type PulseStore } from '@uncaged/pulse';
describe('workflow', () => {
function parseWorkflowKind(
kind: string,
): { wf: string; role: string } | null {
const i = kind.lastIndexOf('.');
if (i <= 0) return null;
return { wf: kind.slice(0, i), role: kind.slice(i + 1) };
}
function workflowForTopic(
events: { kind: string; key?: string }[],
topicKey: string,
): string | null {
for (const e of events) {
if (e.key !== topicKey) continue;
const p = parseWorkflowKind(e.kind);
if (p?.role === '__start__') return p.wf;
}
return null;
}
describe('workflow store (daemon model)', () => {
let tmpDir: string;
let scopedStore: ScopedStore;
let store: PulseStore;
beforeEach(() => {
tmpDir = mkdtempSync(join(tmpdir(), 'pulse-topic-test-'));
scopedStore = createScopedStore({
basePath: join(tmpDir, 'scopes'),
objectsDir: join(tmpDir, 'objects'),
});
store = scopedStore.scope('_system');
tmpDir = join(tmpdir(), `upulse-wf-test-${Date.now()}`);
mkdirSync(tmpDir, { recursive: true });
const eventsDbPath = join(tmpDir, 'workflows.db');
const objectsDir = join(tmpDir, 'objects');
store = createStore({ eventsDbPath, objectsDir });
});
afterEach(() => {
scopedStore.close();
store.close();
rmSync(tmpDir, { recursive: true, force: true });
});
it('create topic writes coding.user event with CAS', () => {
const topicId = 'fix-login-bug-a3k';
const hash = store.putObject({
content: 'The login page crashes on submit',
artifacts: { title: 'Fix login bug', repoDir: '/tmp/repo' },
});
it('__start__ stores task content in CAS', () => {
const topicKey = 'task-a';
const hash = store.putObject('hello task');
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.user',
key: topicId,
kind: 'coding.__start__',
key: topicKey,
hash,
});
const events = store.queryByKind('coding.user');
expect(events.length).toBe(1);
expect(events[0].key).toBe(topicId);
expect(events[0].hash).toBeTruthy();
const content = store.getObject(events[0].hash!) as any;
expect(content.artifacts.title).toBe('Fix login bug');
expect(content.content).toBe('The login page crashes on submit');
expect(content.artifacts.repoDir).toBe('/tmp/repo');
const ev = store.queryByKind('coding.__start__', { key: topicKey });
expect(ev.length).toBe(1);
expect(store.getObject(ev[0].hash!)).toBe('hello task');
});
it('list topics shows created topic', () => {
const topicId = 'add-feature-x-b2c';
const hash = store.putObject({
content: 'Implement feature X',
artifacts: { title: 'Add feature X', repoDir: '/tmp/repo' },
it('list grouping uses __start__ workflow prefix and latest kind', () => {
const k1 = 'topic-1';
const k2 = 'topic-2';
const ts = Date.now();
store.appendEvent({
occurredAt: ts,
kind: 'coding.__start__',
key: k1,
hash: store.putObject('t1'),
});
store.appendEvent({
occurredAt: ts + 1,
kind: 'coding.architect',
key: k1,
hash: store.putObject('arch'),
});
store.appendEvent({
occurredAt: ts + 2,
kind: 'meta.__start__',
key: k2,
hash: store.putObject('m'),
});
const events = store.getAfter(0);
const startByKey = new Map<string, string>();
for (const e of events) {
if (!e.key) continue;
const p = parseWorkflowKind(e.kind);
if (p?.role === '__start__') startByKey.set(e.key, p.wf);
}
expect(startByKey.get(k1)).toBe('coding');
expect(startByKey.get(k2)).toBe('meta');
expect(workflowForTopic(events, k1)).toBe('coding');
expect(workflowForTopic(events, k2)).toBe('meta');
const latestByKey = new Map<string, (typeof events)[0]>();
for (const e of events) {
if (!e.key) continue;
const cur = latestByKey.get(e.key);
if (
!cur ||
e.occurredAt > cur.occurredAt ||
(e.occurredAt === cur.occurredAt && e.id > cur.id)
) {
latestByKey.set(e.key, e);
}
}
expect(latestByKey.get(k1)?.kind).toBe('coding.architect');
expect(latestByKey.get(k2)?.kind).toBe('meta.__start__');
});
it('list without --all omits topics whose latest event is only __start__', () => {
const hash = store.putObject('pending');
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.user',
key: topicId,
kind: 'meta.__start__',
key: 'stuck-topic',
hash,
});
const allEvents = store
.getAfter(0)
.filter((e) => e.kind.startsWith('coding.'));
expect(allEvents.length).toBe(1);
const topics = new Map<
string,
{ title: string; role: string; lastUpdated: number }
>();
for (const event of allEvents) {
const tid = event.key;
if (!tid) continue;
const role = event.kind.replace('coding.', '');
let title = tid;
if (role === 'user' && event.hash) {
try {
const obj = store.getObject(event.hash) as any;
if (obj?.artifacts?.title) title = obj.artifacts.title;
} catch {}
}
topics.set(tid, { title, role, lastUpdated: event.occurredAt });
}
expect(topics.size).toBe(1);
const t = topics.get(topicId)!;
expect(t.title).toBe('Add feature X');
expect(t.role).toBe('user');
});
it('list without --all hides closed topics', () => {
const now = Date.now();
// Topic 1: active (user)
const h1 = store.putObject({
content: 'Active',
artifacts: { title: 'Active Topic' },
});
store.appendEvent({
occurredAt: now,
kind: 'coding.user',
key: 'active-topic-1a2',
hash: h1,
});
// Topic 2: closed
const h2 = store.putObject({
content: 'Closed',
artifacts: { title: 'Closed Topic' },
});
store.appendEvent({
occurredAt: now,
kind: 'coding.user',
key: 'closed-topic-3b4',
hash: h2,
});
const h3 = store.putObject({ content: 'Done' });
store.appendEvent({
occurredAt: now + 1,
kind: 'coding.closer',
key: 'closed-topic-3b4',
hash: h3,
});
const allEvents = store
.getAfter(0)
.filter((e) => e.kind.startsWith('coding.'));
const topics = new Map<
string,
{ title: string; role: string; lastUpdated: number }
>();
for (const event of allEvents) {
const tid = event.key;
if (!tid) continue;
const role = event.kind.replace('coding.', '');
let title = tid;
if (role === 'user' && event.hash) {
try {
const obj = store.getObject(event.hash) as any;
if (obj?.artifacts?.title) title = obj.artifacts.title;
} catch {}
}
const existing = topics.get(tid);
if (!existing) {
topics.set(tid, { title, role, lastUpdated: event.occurredAt });
} else {
existing.role = role;
if (event.occurredAt > existing.lastUpdated)
existing.lastUpdated = event.occurredAt;
const events = store.getAfter(0);
const latestByKey = new Map<string, (typeof events)[0]>();
for (const e of events) {
if (!e.key) continue;
const cur = latestByKey.get(e.key);
if (
!cur ||
e.occurredAt > cur.occurredAt ||
(e.occurredAt === cur.occurredAt && e.id > cur.id)
) {
latestByKey.set(e.key, e);
}
}
// Without --all: only non-closer
const active = [...topics.entries()].filter(([, t]) => t.role !== 'closer');
expect(active.length).toBe(1);
expect(active[0][0]).toBe('active-topic-1a2');
// With --all: all topics
expect(topics.size).toBe(2);
const last = latestByKey.get('stuck-topic')!;
const p = parseWorkflowKind(last.kind);
expect(p?.role === '__start__').toBe(true);
});
});
+140 -144
View File
@@ -1,54 +1,67 @@
/**
* commands/workflow.ts — upulse workflow create/list
* commands/workflow.ts — Workflow daemon CLI (list / timeline / submit / init)
*
* Manage coding workflows via the routine scope store.
* Events follow the coding.{role} convention (message chain model).
* Event kinds: `{workflow}.{role}` with `{workflow}.__start__` to begin a topic.
*/
import { randomBytes } from 'node:crypto';
import { createRequire } from 'node:module';
import { existsSync, readFileSync } from 'node:fs';
import { dirname, join } from 'node:path';
import { pathToFileURL } from 'node:url';
import type { Command } from 'commander';
import { loadConfig, resolveDir } from '../config.js';
import { migrateToScoped } from '../migrate.js';
import { openOrCreateScopedStore, openScopedStore } from '../store.js';
import type { EventRecord } from '../store.js';
import { openOrCreateWorkflowsStore, openWorkflowsStore } from '../store.js';
function slugify(text: string): string {
return text
.toLowerCase()
.replace(/[^a-z0-9]+/g, '-')
.replace(/^-+|-+$/g, '')
.slice(0, 48);
function parseWorkflowKind(
kind: string,
): { wf: string; role: string } | null {
const i = kind.lastIndexOf('.');
if (i <= 0) return null;
return { wf: kind.slice(0, i), role: kind.slice(i + 1) };
}
function shortId(): string {
return randomBytes(2).toString('hex').slice(0, 3);
}
type CodingRole = 'user' | 'architect' | 'coder' | 'reviewer' | 'closer';
function kindToRole(kind: string): CodingRole {
return kind.replace('coding.', '') as CodingRole;
function workflowForTopic(
events: EventRecord[],
topicKey: string,
): string | null {
for (const e of events) {
if (e.key !== topicKey) continue;
const p = parseWorkflowKind(e.kind);
if (p?.role === '__start__') return p.wf;
}
return null;
}
export function registerWorkflowCommand(program: Command): void {
const topic = program
.command('workflow')
.description('Manage coding workflows');
.description('Inspect and submit workflow tasks');
topic
.command('init <name>')
.description('Scaffold a new workflow (generates skeleton files)')
.option('--roles <roles>', 'Comma-separated role names', 'processor')
.option('--dir <dir>', 'Workflows directory', '')
.action((name: string, opts: { roles: string; dir: string }) => {
const {
scaffoldWorkflow,
} = require('@uncaged/pulse/workflows/scaffold.js');
const workflowsDir =
opts.dir ||
require('node:path').join(
process.cwd(),
'packages/pulse/src/workflows',
.action(async (name: string, opts: { roles: string; dir: string }) => {
const require = createRequire(import.meta.url);
const pulseRoot = dirname(require.resolve('@uncaged/pulse/package.json'));
const scaffoldPath = join(pulseRoot, 'dist/workflows/scaffold.js');
if (!existsSync(scaffoldPath)) {
console.error(
`Missing ${scaffoldPath}. Build @uncaged/pulse first: cd packages/pulse && bun run build`,
);
process.exit(1);
}
const { scaffoldWorkflow } = (await import(
pathToFileURL(scaffoldPath).href
)) as { scaffoldWorkflow: (o: {
name: string;
roles: string[];
workflowsDir: string;
}) => string[] };
const workflowsDir =
opts.dir || join(process.cwd(), 'packages/pulse/src/workflows');
const roles = opts.roles
.split(',')
.map((r: string) => r.trim())
@@ -63,68 +76,60 @@ export function registerWorkflowCommand(program: Command): void {
});
topic
.command('create <title>')
.description('Create a new coding workflow')
.requiredOption('--desc <description>', 'Workflow description')
.requiredOption('--repo <repoDir>', 'Repository directory')
.action((title: string, opts: { desc: string; repo: string }) => {
.command('submit')
.description('Submit a task (append {workflow}.__start__ event)')
.argument('<workflow>', 'Workflow name (e.g. meta, coding)')
.argument('<topicKey>', 'Topic key')
.argument('<taskFile>', 'Path to task file')
.action((workflow: string, topicKey: string, taskFile: string) => {
const config = loadConfig(resolveDir(program.opts().dir));
migrateToScoped(config);
const scopedStore = openOrCreateScopedStore(
config.store.scopesDir,
config.store.objectsDir,
);
const store = scopedStore.scope('workflows');
const topicId = `${slugify(title)}-${shortId()}`;
// Store full content in CAS
const hash = store.putObject({
content: opts.desc,
artifacts: { title, repoDir: opts.repo },
});
const store = openOrCreateWorkflowsStore(config);
const content = readFileSync(taskFile, 'utf-8');
const hash = store.putObject(content);
store.appendEvent({
occurredAt: Date.now(),
kind: 'coding.user',
key: topicId,
kind: `${workflow}.__start__`,
key: topicKey,
hash,
});
console.log(topicId);
scopedStore.close();
console.log(`✅ Submitted: ${workflow}.__start__ [${topicKey}]`);
console.log(` Hash: ${hash}`);
store.close();
});
topic
.command('timeline <key>')
.description('Show workflow event timeline')
.description('Show workflow event timeline for a topic key')
.option('--json', 'Output as JSON')
.option('--content', 'Include content preview')
.option('--content', 'Include CAS content preview')
.action((key: string, opts: { json?: boolean; content?: boolean }) => {
const config = loadConfig(resolveDir(program.opts().dir));
migrateToScoped(config);
const scopedStore = openScopedStore(
config.store.scopesDir,
config.store.objectsDir,
);
if (!scopedStore) {
console.error('No store found.');
const store = openWorkflowsStore(config);
if (!store) {
console.error('workflows.db not found.');
process.exit(1);
}
const store = scopedStore.scope('workflows');
const events = store
.getAfter(0)
.filter((e) => e.kind.startsWith('coding.') && e.key === key);
const all = store.getAfter(0);
const wf = workflowForTopic(all, key);
const events = all
.filter((e) => e.key === key)
.filter((e) => {
if (!wf) return true;
const p = parseWorkflowKind(e.kind);
return !p || p.wf === wf;
})
.sort((a, b) => a.occurredAt - b.occurredAt || a.id - b.id);
if (events.length === 0) {
console.error(`No events found for key: ${key}`);
scopedStore.close();
store.close();
process.exit(1);
}
const t0 = events[0].occurredAt;
const entries = events.map((e, i) => {
const role = e.kind.replace('coding.', '');
const prevTime = i > 0 ? events[i - 1].occurredAt : t0;
const durationMs = e.occurredAt - prevTime;
const meta = e.meta ? JSON.parse(e.meta) : null;
@@ -135,11 +140,13 @@ export function registerWorkflowCommand(program: Command): void {
const text = typeof obj === 'string' ? obj : JSON.stringify(obj);
contentPreview =
text.slice(0, 200) + (text.length > 200 ? '...' : '');
} catch {}
} catch {
/* ignore */
}
}
return {
id: e.id,
role,
kind: e.kind,
offsetMs: e.occurredAt - t0,
durationMs: i === 0 ? 0 : durationMs,
meta,
@@ -161,8 +168,9 @@ export function registerWorkflowCommand(program: Command): void {
? ` (${(entry.durationMs / 1000).toFixed(1)}s)`
: '';
const metaStr = entry.meta ? ` ${JSON.stringify(entry.meta)}` : '';
const kindPad = entry.kind.padEnd(28);
console.log(
` [${offset.padStart(8)}] #${String(entry.id).padStart(2)} ${entry.role.padEnd(12)}${dur}${metaStr}`,
` [${offset.padStart(8)}] #${String(entry.id).padStart(4)} ${kindPad}${dur}${metaStr}`,
);
if (entry.contentPreview) {
console.log(` ${entry.contentPreview.split('\n')[0]}`);
@@ -171,97 +179,85 @@ export function registerWorkflowCommand(program: Command): void {
console.log();
}
scopedStore.close();
store.close();
});
topic
.command('list')
.description('List coding topics')
.option('--all', 'Include closed topics')
.description('List topics grouped by workflow (__start__) with latest role')
.option('--all', 'Include topics whose latest event is __start__ only')
.action((opts: { all?: boolean }) => {
const config = loadConfig(resolveDir(program.opts().dir));
migrateToScoped(config);
const scopedStore = openScopedStore(
config.store.scopesDir,
config.store.objectsDir,
);
const store = openWorkflowsStore(config);
if (!scopedStore) {
console.log('No store found. Run "upulse init" first.');
if (!store) {
console.log('No workflows.db. Submit a task or run the workflow daemon.');
return;
}
const store = scopedStore.scope('workflows');
const events = store.getAfter(0);
const startByKey = new Map<string, string>();
for (const e of events) {
if (!e.key) continue;
const p = parseWorkflowKind(e.kind);
if (p?.role === '__start__') startByKey.set(e.key, p.wf);
}
const allEvents = store
.getAfter(0)
.filter((e) => e.kind.startsWith('coding.'));
const latestByKey = new Map<string, EventRecord>();
for (const e of events) {
if (!e.key) continue;
const cur = latestByKey.get(e.key);
if (
!cur ||
e.occurredAt > cur.occurredAt ||
(e.occurredAt === cur.occurredAt && e.id > cur.id)
) {
latestByKey.set(e.key, e);
}
}
if (allEvents.length === 0) {
type Row = { key: string; wf: string; latestKind: string; at: number };
const rows: Row[] = [];
for (const [topicKey, last] of latestByKey) {
const p = parseWorkflowKind(last.kind);
if (!opts.all && p?.role === '__start__') continue;
const wf = startByKey.get(topicKey) ?? p?.wf ?? 'unknown';
rows.push({
key: topicKey,
wf,
latestKind: last.kind,
at: last.occurredAt,
});
}
rows.sort((a, b) => {
if (a.wf !== b.wf) return a.wf.localeCompare(b.wf);
return b.at - a.at;
});
if (rows.length === 0) {
console.log('No topics found.');
scopedStore.close();
store.close();
return;
}
// Group by key (topicId), track latest role
const topics = new Map<
string,
{ title: string; role: CodingRole; lastUpdated: number }
>();
for (const event of allEvents) {
const topicId = event.key;
if (!topicId) continue;
const role = kindToRole(event.kind);
// Try to get title from CAS for user events
let title = topicId;
if (role === 'user' && event.hash) {
try {
const obj = store.getObject(event.hash) as any;
if (obj?.artifacts?.title) title = obj.artifacts.title;
} catch {}
let currentWf = '';
for (const r of rows) {
if (r.wf !== currentWf) {
currentWf = r.wf;
console.log(`\n[${currentWf}]`);
console.log(
'topicKey | latest kind | updated',
);
console.log('-'.repeat(100));
}
const existing = topics.get(topicId);
if (!existing) {
topics.set(topicId, { title, role, lastUpdated: event.occurredAt });
} else {
existing.role = role;
if (event.occurredAt > existing.lastUpdated) {
existing.lastUpdated = event.occurredAt;
}
if (role === 'user' && title !== topicId) {
existing.title = title;
}
}
}
// Filter closed unless --all
const entries = [...topics.entries()].filter(
([, t]) => opts.all || t.role !== 'closer',
);
if (entries.length === 0) {
console.log('No active topics. Use --all to include closed topics.');
scopedStore.close();
return;
}
console.log(
'topicId | title | role | last updated',
);
console.log('-'.repeat(120));
for (const [topicId, t] of entries) {
const ts = new Date(t.lastUpdated).toISOString();
const ts = new Date(r.at).toISOString();
console.log(
`${topicId.padEnd(49)}| ${t.title.slice(0, 30).padEnd(31)}| ${t.role.padEnd(9)}| ${ts}`,
`${r.key.padEnd(49)}| ${r.latestKind.slice(0, 28).padEnd(29)}| ${ts}`,
);
}
console.log(`\nTotal: ${entries.length} topic(s)`);
scopedStore.close();
console.log(`\nTotal: ${rows.length} topic(s)`);
store.close();
});
}
+1 -1
View File
@@ -30,7 +30,7 @@ export function daemonStart(
const existingPid = readPid(config);
if (existingPid !== null && isProcessAlive(existingPid)) {
console.error(
`Error: daemon is already running (PID: ${existingPid}). Stop it first with "upulse daemon stop".`,
`Error: daemon is already running (PID: ${existingPid}). Stop that process or remove ${config.daemon.pidFile}.`,
);
process.exit(1);
}
-157
View File
@@ -1,157 +0,0 @@
/**
* E2E Test Helper — shared utilities for upulse E2E tests.
*
* Provides isolated temp directories, CLI runner, and DB query helpers.
*/
import { Database } from 'bun:sqlite';
import { execSync } from 'node:child_process';
import { existsSync, mkdtempSync, readdirSync, rmSync } from 'node:fs';
import { tmpdir } from 'node:os';
import { dirname, join } from 'node:path';
import { fileURLToPath } from 'node:url';
const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);
/** Path to the upulse CLI entrypoint (src/cli.ts). */
const CLI_PATH = join(__dirname, '..', 'cli.ts');
export interface E2EContext {
/** Temp directory used as HOME for the test. */
baseDir: string;
/** Path to .upulse inside baseDir. */
upulseDir: string;
/** Path to scopes/ directory (new layout). */
scopesDir: string;
/** Path to scopes/_system.db (events). */
eventsDbPath: string;
/** Path to scopes/_vitals.db (vitals). */
vitalsDbPath: string;
/** Path to objects/. */
objectsDir: string;
/** Path to engine/. */
engineDir: string;
/** Path to staging/. */
stagingDir: string;
}
/**
* Create an isolated E2E test environment with a fresh temp directory.
*/
export function createE2EContext(): E2EContext {
const baseDir = mkdtempSync(join(tmpdir(), 'pulse-e2e-'));
const upulseDir = join(baseDir, '.upulse');
const scopesDir = join(upulseDir, 'scopes');
return {
baseDir,
upulseDir,
scopesDir,
eventsDbPath: join(scopesDir, '_system.db'),
vitalsDbPath: join(scopesDir, '_vitals.db'),
objectsDir: join(upulseDir, 'objects'),
engineDir: join(upulseDir, 'engine'),
stagingDir: join(upulseDir, 'staging'),
};
}
/**
* Clean up an E2E test environment (remove temp directory).
*/
export function cleanupE2EContext(ctx: E2EContext): void {
try {
rmSync(ctx.baseDir, { recursive: true, force: true });
} catch {
// best-effort cleanup; CI temp dirs are ephemeral anyway
}
}
export interface RunOptions {
/** If true, expect the command to exit non-zero. Returns stderr. */
expectFail?: boolean;
/** Timeout in ms (default: 120_000 for bun install during init). */
timeout?: number;
}
/**
* Run an upulse CLI command in the E2E context.
*
* - Sets HOME so `homedir()` → ctx.baseDir → config lands in ctx.baseDir/.upulse/
* - Sets PULSE_BASE_DIR so engine's pulse.config.ts uses the right paths.
* - Uses `bun` to run the TypeScript CLI directly.
*/
export function runUpulse(
ctx: E2EContext,
args: string,
options?: RunOptions,
): string {
const timeout = options?.timeout ?? 120_000;
const env: Record<string, string> = {
...(process.env as Record<string, string>),
HOME: ctx.baseDir,
PULSE_BASE_DIR: ctx.upulseDir,
};
try {
const result = execSync(`bun ${CLI_PATH} ${args}`, {
env,
cwd: ctx.baseDir,
timeout,
encoding: 'utf-8',
stdio: ['pipe', 'pipe', 'pipe'],
});
if (options?.expectFail) {
throw new Error(
`Expected command to fail but it succeeded: upulse ${args}\nstdout: ${result}`,
);
}
return result;
} catch (err: unknown) {
if (options?.expectFail) {
const e = err as { stderr?: string; stdout?: string };
return e.stderr || e.stdout || '';
}
const e = err as { stdout?: string; stderr?: string; message?: string };
throw new Error(
`upulse ${args} failed:\nstdout: ${e.stdout ?? ''}\nstderr: ${e.stderr ?? ''}\nmessage: ${e.message ?? ''}`,
);
}
}
/**
* Open events DB (scopes/_system.db) and run a query.
* Returns empty array if DB does not exist.
*/
export function queryEventsDb(ctx: E2EContext, sql: string): unknown[] {
if (!existsSync(ctx.eventsDbPath)) return [];
const db = new Database(ctx.eventsDbPath);
try {
return db.prepare(sql).all();
} finally {
db.close();
}
}
/**
* Open vitals DB (scopes/_vitals.db) and run a query.
* Vitals are stored as events (kind='vital') in the events table.
* Returns empty array if DB does not exist.
*/
export function queryVitalsDb(ctx: E2EContext, sql: string): unknown[] {
if (!existsSync(ctx.vitalsDbPath)) return [];
const db = new Database(ctx.vitalsDbPath);
try {
return db.prepare(sql).all();
} finally {
db.close();
}
}
/**
* List .json files in the objects/ directory.
*/
export function listObjects(ctx: E2EContext): string[] {
if (!existsSync(ctx.objectsDir)) return [];
return readdirSync(ctx.objectsDir).filter((f) => f.endsWith('.json'));
}
@@ -1,144 +0,0 @@
/**
* E2E T1 — Init → Daemon → Tick 完整链路测试
*
* Validates the full lifecycle:
* upulse init → creates directories, git repo, bun install
* upulse tick → compiles engine, runs one tick, writes to DB
* upulse daemon status → reports daemon state
*
* Run: bun test src/e2e/t1-init-daemon-tick.test.ts
*/
import { afterAll, beforeAll, describe, expect, it } from 'bun:test';
import { execSync } from 'node:child_process';
import { existsSync, readFileSync } from 'node:fs';
import { join } from 'node:path';
import {
cleanupE2EContext,
createE2EContext,
type E2EContext,
listObjects,
queryEventsDb,
runUpulse,
} from './helper.js';
describe('E2E T1: Init → Daemon → Tick', () => {
let ctx: E2EContext;
// Use a single context for all tests in this suite because
// `upulse init` runs bun install which is slow (~15-30s).
// Tests are ordered: init → tick → daemon.
beforeAll(() => {
ctx = createE2EContext();
});
afterAll(() => {
cleanupE2EContext(ctx);
});
// ── Init ─────────────────────────────────────────────────────
it('upulse init creates .upulse directory', () => {
const output = runUpulse(ctx, 'init');
expect(output.includes('initialized')).toBeTruthy();
expect(existsSync(ctx.upulseDir)).toBeTruthy();
});
it('init creates config.json', () => {
const cfgPath = join(ctx.upulseDir, 'config.json');
expect(existsSync(cfgPath)).toBeTruthy();
const cfg = JSON.parse(readFileSync(cfgPath, 'utf-8'));
expect(cfg.dir).toBe(ctx.upulseDir);
});
it('init creates complete engine directory structure', () => {
expect(existsSync(ctx.engineDir)).toBeTruthy();
expect(existsSync(join(ctx.engineDir, 'rules'))).toBeTruthy();
expect(existsSync(join(ctx.engineDir, 'executors'))).toBeTruthy();
expect(existsSync(join(ctx.engineDir, 'AGENTS.md'))).toBeTruthy();
expect(existsSync(join(ctx.engineDir, 'types.ts'))).toBeTruthy();
expect(existsSync(join(ctx.engineDir, 'pulse.config.ts'))).toBeTruthy();
expect(existsSync(join(ctx.engineDir, 'tsconfig.json'))).toBeTruthy();
expect(existsSync(join(ctx.engineDir, '.gitignore'))).toBeTruthy();
expect(existsSync(join(ctx.engineDir, 'package.json'))).toBeTruthy();
});
it('init creates git repo with initial commit', () => {
expect(existsSync(join(ctx.engineDir, '.git'))).toBeTruthy();
// Verify there is at least one commit
const log = execSync('git log --oneline', {
cwd: ctx.engineDir,
encoding: 'utf-8',
}).trim();
expect(log.length > 0).toBeTruthy();
expect(log.includes('init')).toBeTruthy();
});
it('init creates staging worktree', () => {
expect(existsSync(ctx.stagingDir)).toBeTruthy();
});
it('init installs node_modules in engine', () => {
expect(existsSync(join(ctx.engineDir, 'node_modules'))).toBeTruthy();
expect(
existsSync(join(ctx.engineDir, 'node_modules', '@uncaged', 'pulse')),
).toBeTruthy();
});
it('init is idempotent — running again fails gracefully', () => {
const output = runUpulse(ctx, 'init', { expectFail: true });
expect(
output.includes('already initialized') || output.includes('config.json'),
).toBeTruthy();
});
// ── Tick ─────────────────────────────────────────────────────
it('upulse tick --dry-run succeeds', () => {
const output = runUpulse(ctx, 'tick --dry-run');
expect(output.includes('dry-run')).toBeTruthy();
});
it('upulse tick runs and writes events to DB', () => {
const output = runUpulse(ctx, 'tick');
// tick should produce snapshot output
expect(
output.includes('Snapshot') || output.includes('Result'),
).toBeTruthy();
// Verify scopes/_system.db has entries
expect(existsSync(ctx.eventsDbPath)).toBeTruthy();
const events = queryEventsDb(ctx, 'SELECT * FROM events');
expect(events.length > 0).toBeTruthy();
});
it('tick creates objects', () => {
// After tick, objects/ should have at least one snapshot file
const objects = listObjects(ctx);
expect(objects.length > 0).toBeTruthy();
});
it('tick records tick event', () => {
const tickEvents = queryEventsDb(
ctx,
"SELECT * FROM events WHERE kind = 'tick'",
) as Array<{
kind: string;
meta: string;
}>;
expect(tickEvents.length > 0).toBeTruthy();
// tick event should have meta with tick_ms and effect_count
const meta = JSON.parse(tickEvents[tickEvents.length - 1].meta);
expect(typeof meta.tick_ms).toBe('number');
expect(typeof meta.effect_count).toBe('number');
});
// ── Daemon Status ────────────────────────────────────────────
it('upulse daemon status shows STOPPED when daemon is not running', () => {
const output = runUpulse(ctx, 'daemon status');
expect(output.includes('STOPPED')).toBeTruthy();
});
});
@@ -1,109 +0,0 @@
/**
* E2E T2 — Staging → Promote 测试
*
* 验证 staging 到 promote 的完整流程:
* upulse init → 初始化项目
* 在 staging 里添加新 rule
* git add + commit
* upulse deploy promote → 将 staging 合并到 engine
* 验证 promote 成功并且新 rule 生效
*
* Run: bun test src/e2e/t2-staging-promote.test.ts
*/
import { afterAll, beforeAll, describe, expect, it } from 'bun:test';
import { execSync } from 'node:child_process';
import { existsSync, writeFileSync } from 'node:fs';
import { join } from 'node:path';
import {
cleanupE2EContext,
createE2EContext,
type E2EContext,
queryEventsDb,
runUpulse,
} from './helper.js';
describe('E2E T2: Staging → Promote', () => {
let ctx: E2EContext;
beforeAll(() => {
ctx = createE2EContext();
});
afterAll(() => {
cleanupE2EContext(ctx);
});
it('upulse init initializes project', () => {
const output = runUpulse(ctx, 'init');
expect(output.includes('initialized')).toBeTruthy();
expect(existsSync(ctx.upulseDir)).toBeTruthy();
expect(existsSync(ctx.stagingDir)).toBeTruthy();
expect(existsSync(ctx.engineDir)).toBeTruthy();
});
it('add new rule to staging directory', () => {
const newRuleContent = `import type { Snapshot, Effect } from '../types.js';
import type { Rule } from '@uncaged/pulse';
const rule: Rule<Snapshot, Effect> = async (_prev, _curr, inner) => {
const [effects, tickMs] = await inner(_prev, _curr);
return [[...effects, { kind: 'log', message: 'hello from v2' }], tickMs];
};
export default rule;`;
const ruleFilePath = join(ctx.stagingDir, 'rules', '02-log-hello.ts');
writeFileSync(ruleFilePath, newRuleContent);
expect(existsSync(ruleFilePath)).toBeTruthy();
});
it('commit changes in staging', () => {
execSync('git add .', { cwd: ctx.stagingDir });
execSync('git commit -m "Add 02-log-hello rule"', { cwd: ctx.stagingDir });
// Verify commit exists
const log = execSync('git log --oneline -n 1', {
cwd: ctx.stagingDir,
encoding: 'utf-8',
}).trim();
expect(log.includes('02-log-hello')).toBeTruthy();
});
it('upulse deploy promote succeeds', async () => {
const output = runUpulse(ctx, 'deploy promote', { timeout: 30_000 });
expect(!output.includes('error') && !output.includes('Error')).toBeTruthy();
}, 30_000);
it('promote creates promote event in events.db', () => {
const promoteEvents = queryEventsDb(
ctx,
"SELECT * FROM events WHERE kind = 'promote'",
) as Array<{
kind: string;
code_rev: string;
}>;
expect(promoteEvents.length > 0).toBeTruthy();
const latestPromote = promoteEvents[promoteEvents.length - 1];
expect(latestPromote.code_rev).toBeTruthy();
});
it('new rule is merged to engine directory', () => {
const mergedRulePath = join(ctx.engineDir, 'rules', '02-log-hello.ts');
expect(existsSync(mergedRulePath)).toBeTruthy();
});
it('upulse tick produces effect from new rule', () => {
const output = runUpulse(ctx, 'tick');
expect(
output.includes('Snapshot') || output.includes('Result'),
).toBeTruthy();
// The new rule should produce a log effect visible in tick output
expect(
output.includes('log') || output.includes('hello from v2'),
).toBeTruthy();
});
});
@@ -1,125 +0,0 @@
/**
* E2E T3 — Promote Guard 测试(编译失败阻止 promote)
*
* 验证 promote 的编译守门机制:
* upulse init → 初始化项目
* 在 staging 里添加编译错误的 rule
* git add + commit
* upulse deploy promote → 期望失败(编译不通过)
* 验证 promote 被阻止且 engine 目录未被污染
*
* Run: bun test src/e2e/t3-promote-guard.test.ts
*/
import { afterAll, beforeAll, describe, expect, it } from 'bun:test';
import { execSync } from 'node:child_process';
import { existsSync, writeFileSync } from 'node:fs';
import { join } from 'node:path';
import {
cleanupE2EContext,
createE2EContext,
type E2EContext,
queryEventsDb,
runUpulse,
} from './helper.js';
describe('E2E T3: Promote Guard (Compilation Failed)', () => {
let ctx: E2EContext;
beforeAll(() => {
ctx = createE2EContext();
});
afterAll(() => {
cleanupE2EContext(ctx);
});
it('upulse init initializes project', () => {
const output = runUpulse(ctx, 'init');
expect(
output.includes('initialized'),
`init output should mention initialized, got:\n${output}`,
).toBeTruthy();
expect(existsSync(ctx.upulseDir), '.upulse/ should exist').toBeTruthy();
expect(existsSync(ctx.stagingDir), 'staging/ should exist').toBeTruthy();
expect(existsSync(ctx.engineDir), 'engine/ should exist').toBeTruthy();
});
it('add bad rule with compilation error to staging', () => {
const badRuleContent = `// 语法错误:缺少类型
const bad: NONEXISTENT_TYPE = 42;
export default bad;`;
const badRuleFilePath = join(ctx.stagingDir, 'rules', '99-bad-rule.ts');
writeFileSync(badRuleFilePath, badRuleContent);
expect(
existsSync(badRuleFilePath),
'99-bad-rule.ts should be written to staging/rules/',
).toBeTruthy();
});
it('commit bad rule in staging', () => {
execSync('git add .', { cwd: ctx.stagingDir });
execSync('git commit -m "Add bad rule with compilation error"', {
cwd: ctx.stagingDir,
});
// Verify commit exists
const log = execSync('git log --oneline -n 1', {
cwd: ctx.stagingDir,
encoding: 'utf-8',
}).trim();
expect(
log.includes('bad rule'),
'commit should mention the bad rule',
).toBeTruthy();
});
it('upulse deploy promote fails due to compilation error', () => {
const output = runUpulse(ctx, 'deploy promote', { expectFail: true });
// Check that output contains type-check-related error messages
expect(
output.includes('Type check') ||
output.includes('tsc') ||
output.includes('type check failed'),
).toBeTruthy();
});
it('no promote event created in events.db after failed promote', () => {
const promoteEvents = queryEventsDb(
ctx,
"SELECT * FROM events WHERE kind = 'promote'",
);
expect(promoteEvents.length).toBe(0);
});
it('bad rule file not merged to engine directory', () => {
const badRulePath = join(ctx.engineDir, 'rules', '99-bad-rule.ts');
expect(
!existsSync(badRulePath),
'bad rule should not exist in engine/rules/ after failed promote',
).toBeTruthy();
});
it('engine directory remains clean after failed promote', () => {
// Verify engine still has only the original files (no contamination from failed promote)
const engineRulesPath = join(
ctx.engineDir,
'rules',
'01-collect-system.ts',
);
expect(
existsSync(engineRulesPath),
'original rules should still exist in engine',
).toBeTruthy();
// But the bad rule should not be there
const badRulePath = join(ctx.engineDir, 'rules', '99-bad-rule.ts');
expect(
!existsSync(badRulePath),
'bad rule should not contaminate engine directory',
).toBeTruthy();
});
});
-308
View File
@@ -1,308 +0,0 @@
/**
* E2E T4 — Rollback: append-only + epoch 正确
*
* Validates the full rollback lifecycle:
* init → promote v1 → tick (v1) → promote v2 → tick (v2) → rollback → verify
*
* Key assertions:
* - rollback event is written correctly (code_rev, meta.from, meta.to)
* - append-only: event count is monotonically increasing
* - v2 events still exist after rollback (not deleted)
* - engine code reverts after rollback
* - rollback with no previous version fails gracefully
*
* Run: bun test src/e2e/t4-rollback.test.ts
*/
import {
afterAll,
beforeAll,
describe,
expect,
it,
setDefaultTimeout,
} from 'bun:test';
setDefaultTimeout(30_000);
import { execSync } from 'node:child_process';
import { existsSync, mkdirSync, writeFileSync } from 'node:fs';
import { join } from 'node:path';
import {
cleanupE2EContext,
createE2EContext,
type E2EContext,
queryEventsDb,
runUpulse,
} from './helper.js';
// ── Helpers ────────────────────────────────────────────────────
/** Count events in the DB. */
function eventCount(ctx: E2EContext): number {
const rows = queryEventsDb(
ctx,
'SELECT COUNT(*) as cnt FROM events',
) as Array<{ cnt: number }>;
return rows.length > 0 ? rows[0]?.cnt : 0;
}
/**
* Add a rule file to staging and commit.
* Uses targeted `git add` to avoid committing the node_modules symlink
* (git .gitignore pattern `node_modules/` does not match symlinks).
*/
function addRuleToStaging(
ctx: E2EContext,
filename: string,
commitMsg: string,
): void {
const rulesDir = join(ctx.stagingDir, 'rules');
if (!existsSync(rulesDir)) mkdirSync(rulesDir, { recursive: true });
const rulePath = join(rulesDir, filename);
writeFileSync(
rulePath,
`import type { Rule } from '@uncaged/pulse';
import type { Snapshot, Effect } from '../types.js';
/** Auto-generated test rule: ${filename} */
const rule: Rule<Snapshot, Effect> = async (_prev, _curr, inner) => inner(_prev, _curr);
export default rule;
`,
'utf-8',
);
execSync(`git add rules/${filename} && git commit -m "${commitMsg}"`, {
cwd: ctx.stagingDir,
encoding: 'utf-8',
stdio: ['pipe', 'pipe', 'pipe'],
});
}
// ── Tests ──────────────────────────────────────────────────────
describe('E2E T4: Rollback', () => {
let ctx: E2EContext;
// Track code_revs and event counts at each stage for verification.
let v1CodeRev: string;
let v2CodeRev: string;
const eventCounts: number[] = [];
// Single context — init + bun install is expensive (~15-30s).
// Tests are ordered and build on each other.
beforeAll(() => {
ctx = createE2EContext();
// 1. Init
runUpulse(ctx, 'init');
runUpulse(ctx, 'tick');
// 2. Promote v1 (first promote establishes a version baseline)
addRuleToStaging(ctx, '98-v1.ts', 'add v1 rule');
runUpulse(ctx, 'deploy promote');
eventCounts.push(eventCount(ctx));
// Record v1 code_rev
const v1Promotes = queryEventsDb(
ctx,
"SELECT * FROM events WHERE kind = 'promote' ORDER BY occurred_at ASC",
) as Array<{ code_rev: string }>;
v1CodeRev = v1Promotes[v1Promotes.length - 1]?.code_rev;
// 3. Tick under v1
runUpulse(ctx, 'tick');
runUpulse(ctx, 'tick');
eventCounts.push(eventCount(ctx));
// 4. Promote v2
addRuleToStaging(ctx, '99-v2.ts', 'add v2 rule');
runUpulse(ctx, 'deploy promote');
eventCounts.push(eventCount(ctx));
// Record v2 code_rev
const v2Promotes = queryEventsDb(
ctx,
"SELECT * FROM events WHERE kind = 'promote' ORDER BY occurred_at DESC LIMIT 1",
) as Array<{ code_rev: string }>;
v2CodeRev = v2Promotes[0]?.code_rev;
// 5. Tick under v2
runUpulse(ctx, 'tick');
runUpulse(ctx, 'tick');
eventCounts.push(eventCount(ctx));
});
afterAll(() => {
cleanupE2EContext(ctx);
});
// ── Pre-rollback Sanity ──────────────────────────────────────
it('two promotes created distinct code_revs', () => {
expect(v1CodeRev).toBeTruthy();
expect(v2CodeRev).toBeTruthy();
expect(v1CodeRev).not.toBe(v2CodeRev);
});
it('v2 rule (99-v2.ts) exists in engine before rollback', () => {
const v2RulePath = join(ctx.engineDir, 'rules', '99-v2.ts');
expect(
existsSync(v2RulePath),
'engine should have 99-v2.ts after v2 promote',
).toBeTruthy();
});
it('both promote events are in the database', () => {
const promotes = queryEventsDb(
ctx,
"SELECT * FROM events WHERE kind = 'promote'",
) as Array<{
code_rev: string;
meta: string;
}>;
expect(
promotes.length >= 2,
`should have at least 2 promote events, found ${promotes.length}`,
).toBeTruthy();
});
// ── Rollback ─────────────────────────────────────────────────
it('rollback succeeds', () => {
const countBefore = eventCount(ctx);
const output = runUpulse(ctx, 'deploy rollback');
expect(
output.includes('Rollback complete') ||
output.includes('rollback event written'),
).toBeTruthy();
// Event count must increase (append-only)
const countAfter = eventCount(ctx);
expect(countAfter > countBefore).toBeTruthy();
eventCounts.push(countAfter);
});
// ── Rollback Event Content ──────────────────────────────────
it('rollback event has correct code_rev and meta', () => {
const rollbackEvent = (
queryEventsDb(
ctx,
"SELECT * FROM events WHERE kind = 'rollback' ORDER BY occurred_at DESC LIMIT 1",
) as Array<{ code_rev: string; meta: string }>
)[0];
expect(rollbackEvent).toBeTruthy();
// code_rev should point to the target (v1)
expect(rollbackEvent.code_rev).toBe(v1CodeRev);
// meta should have from + to
const meta = JSON.parse(rollbackEvent.meta) as { from: string; to: string };
expect(meta.from).toBe(v2CodeRev);
expect(meta.to).toBe(v1CodeRev);
expect(meta.from).not.toBe(meta.to);
});
// ── Append-Only Invariant ───────────────────────────────────
it('append-only: event count is monotonically increasing', () => {
expect(eventCounts.length >= 5).toBeTruthy();
for (let i = 1; i < eventCounts.length; i++) {
expect(eventCounts[i]! >= eventCounts[i - 1]!).toBeTruthy();
}
});
it('all event IDs are ordered', () => {
const allEvents = queryEventsDb(
ctx,
'SELECT id FROM events ORDER BY id ASC',
) as Array<{ id: number }>;
expect(allEvents.length > 0).toBeTruthy();
for (let i = 1; i < allEvents.length; i++) {
expect(allEvents[i]?.id > allEvents[i - 1]?.id).toBeTruthy();
}
});
// ── v2 Events Preservation ──────────────────────────────────
it('v2 promote event still exists after rollback', () => {
const v2Promotes = queryEventsDb(
ctx,
`SELECT * FROM events WHERE kind = 'promote' AND code_rev = '${v2CodeRev}'`,
) as Array<{ code_rev: string }>;
expect(v2Promotes.length > 0).toBeTruthy();
});
it('v2 events are not deleted (append-only)', () => {
// All events that were in the DB before rollback should still be there.
// We check that at least the promote events for both versions exist.
const allPromotes = queryEventsDb(
ctx,
"SELECT * FROM events WHERE kind = 'promote'",
) as Array<{
code_rev: string;
}>;
const codeRevs = new Set(allPromotes.map((p) => p.code_rev));
expect(codeRevs.has(v1CodeRev)).toBeTruthy();
expect(codeRevs.has(v2CodeRev)).toBeTruthy();
});
// ── Engine Code Revert ──────────────────────────────────────
it('engine code reverts after rollback (99-v2.ts removed)', () => {
const v2RulePath = join(ctx.engineDir, 'rules', '99-v2.ts');
expect(!existsSync(v2RulePath)).toBeTruthy();
});
it('v1 rule (98-v1.ts) still exists in engine after rollback', () => {
const v1RulePath = join(ctx.engineDir, 'rules', '98-v1.ts');
expect(existsSync(v1RulePath)).toBeTruthy();
});
it('engine git log shows revert commit', () => {
const log = execSync('git log --oneline -5', {
cwd: ctx.engineDir,
encoding: 'utf-8',
}).trim();
expect(log.toLowerCase().includes('revert')).toBeTruthy();
});
});
// ── Boundary: Rollback with no previous version ────────────────
describe('E2E T4: Rollback boundary — no previous version', () => {
let ctx: E2EContext;
beforeAll(() => {
ctx = createE2EContext();
runUpulse(ctx, 'init');
runUpulse(ctx, 'tick');
});
afterAll(() => {
cleanupE2EContext(ctx);
});
it('rollback with no promote events fails gracefully', () => {
const output = runUpulse(ctx, 'deploy rollback', { expectFail: true });
expect(
output.includes('no promote event') ||
output.includes('Nothing to roll back') ||
output.includes('Error'),
).toBeTruthy();
});
it('rollback with only one promote (no previous version) fails gracefully', () => {
addRuleToStaging(ctx, '99-test.ts', 'add test rule');
runUpulse(ctx, 'deploy promote');
// Now try to rollback — should fail because there's no older version
const output = runUpulse(ctx, 'deploy rollback', { expectFail: true });
expect(
output.includes('no previous version') || output.includes('Error'),
).toBeTruthy();
});
});
-291
View File
@@ -1,291 +0,0 @@
/**
* E2E T5 — Migrate 链式迁移测试
*
* 验证数据格式变更时的迁移机制:
* upulse init → 初始化项目
* upulse tick → 跑一次,产生 v1 数据
* 修改 types.ts 改变 SystemSense 格式
* 添加 migrate.ts 处理 v1 → v2 数据迁移
* 更新 01-collect-system.ts 适配新格式
* upulse deploy promote → 部署新版本
* 验证 migrate 事件和新格式数据
*
* Run: bun test src/e2e/t5-migrate.test.ts
*/
import { afterAll, beforeAll, describe, expect, it } from 'bun:test';
import { execSync } from 'node:child_process';
import { existsSync, readFileSync, writeFileSync } from 'node:fs';
import { join } from 'node:path';
import {
cleanupE2EContext,
createE2EContext,
type E2EContext,
queryEventsDb,
runUpulse,
} from './helper.js';
describe('E2E T5: Migrate Chain Migration', () => {
let ctx: E2EContext;
beforeAll(() => {
ctx = createE2EContext();
});
afterAll(() => {
cleanupE2EContext(ctx);
});
it('upulse init initializes project', () => {
const output = runUpulse(ctx, 'init');
expect(output.includes('initialized')).toBeTruthy();
expect(existsSync(ctx.upulseDir)).toBeTruthy();
expect(existsSync(ctx.stagingDir)).toBeTruthy();
expect(existsSync(ctx.engineDir)).toBeTruthy();
});
it('upulse tick produces v1 data', () => {
const output = runUpulse(ctx, 'tick');
expect(
output.includes('Snapshot') || output.includes('Result'),
).toBeTruthy();
// Seed v1 system data into _vitals scope + objects
const v1Data = { memoryPct: 42.5, cpuIdlePct: 87.3 };
const v1Hash = require('node:crypto')
.createHash('sha256')
.update(JSON.stringify(v1Data))
.digest('hex')
.slice(0, 32);
const { mkdirSync, writeFileSync: wfs } = require('node:fs');
mkdirSync(ctx.objectsDir, { recursive: true });
wfs(join(ctx.objectsDir, `${v1Hash}.json`), JSON.stringify(v1Data));
// Insert vital event into scopes/_vitals.db (events table, kind='vital')
const Database = require('bun:sqlite').default;
const vitalsDb = new Database(ctx.vitalsDbPath);
vitalsDb.exec(`
CREATE TABLE IF NOT EXISTS events (
id TEXT PRIMARY KEY,
occurred_at INTEGER NOT NULL,
kind TEXT NOT NULL,
key TEXT,
hash TEXT,
code_rev TEXT,
meta TEXT
)
`);
vitalsDb.exec(`
INSERT INTO events (id, occurred_at, kind, key, hash, code_rev, meta)
VALUES ('test-vital-01', ${Date.now()}, 'vital', 'system', '${v1Hash}', NULL, NULL)
`);
vitalsDb.close();
// Insert collect event into scopes/_system.db
const eventsDb = new Database(ctx.eventsDbPath);
eventsDb.exec(`
INSERT INTO events (id, occurred_at, kind, key, hash, code_rev, meta)
VALUES (99999, ${Date.now()}, 'collect', 'system', '${v1Hash}', '', '{}')
`);
eventsDb.close();
});
it('modify types.ts to v2 format in staging', () => {
const newTypesContent = `import type { Sensed } from '@uncaged/pulse';
// ── Sense Types ────────────────────────────────────────────────
// Each collector returns its own sense type.
export interface SystemSense {
memory: { usedPct: number; totalMb: number }; // v2 格式
cpuIdlePct: number;
}
// ── Snapshot ───────────────────────────────────────────────────
// Rebuilt from events table. Each sense is Sensed<T> = { data, refreshedAt }.
// refreshedAt = the collect event's occurred_at timestamp.
export interface Snapshot {
timestamp: number;
system: Sensed<SystemSense>;
// Agent 按需添加:
// ograph: Sensed<OGraphSense>;
// executors: Sensed<ExecutorsSense>;
}
// ── Effects ────────────────────────────────────────────────────
export type Effect =
| { kind: 'collect'; key: string } // 触发采集(由 Rule 产生)
| { kind: 'log'; message: string }
// 按需添加:
// | { kind: 'dispatch'; target: string; payload: unknown }
// | { kind: 'notify'; message: string }
`;
const typesFilePath = join(ctx.stagingDir, 'types.ts');
writeFileSync(typesFilePath, newTypesContent);
expect(existsSync(typesFilePath)).toBeTruthy();
});
it('create migrate.ts in staging/rules/', () => {
const migrateContent = `export function migrate(snapshot: Record<string, any>): Record<string, any> {
const system = snapshot.system?.data;
if (system && 'memoryPct' in system) {
return {
...snapshot,
system: {
data: {
memory: { usedPct: system.memoryPct, totalMb: 2048 },
cpuIdlePct: system.cpuIdlePct,
},
refreshedAt: snapshot.system.refreshedAt,
},
};
}
return snapshot;
}`;
const migrateFilePath = join(ctx.stagingDir, 'rules', 'migrate.ts');
writeFileSync(migrateFilePath, migrateContent);
expect(existsSync(migrateFilePath)).toBeTruthy();
});
it('update 01-collect-system.ts to adapt new format in staging', () => {
const newSystemRuleContent = `import type { Snapshot, Effect } from '../types.js';
import type { Rule } from '@uncaged/pulse';
/**
* Trigger system collection every 15 seconds.
* If system sense is stale (refreshedAt > 15s ago), emit collect effect.
* Updated for v2 format.
*/
const collectSystemRule: Rule<Snapshot, Effect> = async (_prev, curr, inner) => {
const [effects, tickMs] = await inner(_prev, curr);
const refreshedAt = curr.system?.refreshedAt ?? 0;
if (Date.now() - refreshedAt > 15000) {
return [[...effects, { kind: 'collect', key: 'system' }], tickMs];
}
return [effects, tickMs];
};
export default collectSystemRule;
`;
const systemRuleFilePath = join(
ctx.stagingDir,
'rules',
'01-collect-system.ts',
);
writeFileSync(systemRuleFilePath, newSystemRuleContent);
expect(existsSync(systemRuleFilePath)).toBeTruthy();
});
it('update system executor for v2 format in staging', () => {
const newExecutorContent = `import type { SystemSense } from '../types.js';
import { freemem, totalmem, cpus } from 'node:os';
/**
* Collect system stats in v2 format.
* Called by execute() when it receives a { kind: 'collect', key: 'system' } effect.
* Returns data; execute() writes to events/vitals.
*/
export async function collectSystem(): Promise<SystemSense> {
const memoryUsedBytes = totalmem() - freemem();
const memoryTotalMb = Math.round(totalmem() / (1024 * 1024));
const usedPct = Math.round((memoryUsedBytes / totalmem()) * 100);
const cores = cpus();
let totalIdle = 0;
let totalTick = 0;
for (const core of cores) {
for (const type in core.times) {
totalTick += (core.times as Record<string, number>)[type]!;
}
totalIdle += core.times.idle;
}
const cpuIdlePct = Math.round((totalIdle / totalTick) * 100);
return {
memory: { usedPct, totalMb: memoryTotalMb },
cpuIdlePct
};
}`;
const executorFilePath = join(ctx.stagingDir, 'executors', 'system.ts');
writeFileSync(executorFilePath, newExecutorContent);
expect(existsSync(executorFilePath)).toBeTruthy();
});
it('commit changes in staging', () => {
execSync('git add .', { cwd: ctx.stagingDir });
execSync('git commit -m "Migrate to v2 SystemSense format"', {
cwd: ctx.stagingDir,
});
// Verify commit exists
const log = execSync('git log --oneline -n 1', {
cwd: ctx.stagingDir,
encoding: 'utf-8',
}).trim();
expect(log.includes('v2')).toBeTruthy();
});
it('upulse deploy promote succeeds', () => {
const output = runUpulse(ctx, 'deploy promote');
expect(!output.includes('error') && !output.includes('Error')).toBeTruthy();
}, 30_000);
it('migrate event created in events.db', () => {
const migrateEvents = queryEventsDb(
ctx,
"SELECT * FROM events WHERE kind = 'migrate' AND key = 'system'",
) as Array<{
kind: string;
key: string;
hash: string;
}>;
expect(migrateEvents.length > 0).toBeTruthy();
const latestMigrate = migrateEvents[migrateEvents.length - 1];
expect(latestMigrate.hash).toBeTruthy();
});
it('migrated object contains new format data', () => {
const migrateEvents = queryEventsDb(
ctx,
"SELECT * FROM events WHERE kind = 'migrate' AND key = 'system'",
) as Array<{
hash: string;
}>;
expect(migrateEvents.length > 0).toBeTruthy();
const migrateHash = migrateEvents[migrateEvents.length - 1].hash;
const objectPath = join(ctx.objectsDir, `${migrateHash}.json`);
expect(existsSync(objectPath)).toBeTruthy();
const objectData = JSON.parse(readFileSync(objectPath, 'utf-8'));
// The stored object is the sense value (data.data), not the full snapshot wrapper
expect(objectData.memory?.usedPct !== undefined).toBeTruthy();
expect(typeof objectData.memory.usedPct === 'number').toBeTruthy();
});
it('promote event created after migration', () => {
const promoteEvents = queryEventsDb(
ctx,
"SELECT * FROM events WHERE kind = 'promote'",
) as Array<{
kind: string;
code_rev: string;
}>;
expect(promoteEvents.length > 0).toBeTruthy();
const latestPromote = promoteEvents[promoteEvents.length - 1];
expect(latestPromote.code_rev).toBeTruthy();
});
});
@@ -1,84 +0,0 @@
/**
* E2E: --dir / -d flag integration test
*
* Validates that the global --dir flag correctly targets a specific pulse instance.
* Run: bun test src/e2e/t6-dir-flag.test.ts
*/
import { afterAll, beforeAll, describe, expect, it } from 'bun:test';
import { execSync } from 'node:child_process';
import { existsSync } from 'node:fs';
import { dirname, join } from 'node:path';
import { fileURLToPath } from 'node:url';
import {
cleanupE2EContext,
createE2EContext,
type E2EContext,
} from './helper.js';
const __dirname = dirname(fileURLToPath(import.meta.url));
const CLI_PATH = join(__dirname, '..', 'cli.ts');
function runCli(args: string, env?: Record<string, string>): string {
return execSync(`bun ${CLI_PATH} ${args}`, {
encoding: 'utf-8',
timeout: 120_000,
env: { ...process.env, ...env },
stdio: ['pipe', 'pipe', 'pipe'],
});
}
function runCliFail(args: string, env?: Record<string, string>): string {
try {
runCli(args, env);
throw new Error(`Expected command to fail: ${args}`);
} catch (err: unknown) {
const e = err as { stderr?: string; stdout?: string };
return e.stderr || e.stdout || '';
}
}
describe('E2E: --dir / -d flag', () => {
let ctx: E2EContext;
beforeAll(() => {
ctx = createE2EContext();
// Init using --dir flag (not env var)
runCli(`--dir ${ctx.upulseDir} init`);
});
afterAll(() => {
cleanupE2EContext(ctx);
});
it('--dir flag initializes in the specified directory', () => {
expect(existsSync(ctx.upulseDir)).toBe(true);
expect(existsSync(join(ctx.upulseDir, 'config.json'))).toBe(true);
expect(existsSync(join(ctx.upulseDir, 'engine'))).toBe(true);
});
it('-d short flag works the same as --dir', () => {
const output = runCli(`-d ${ctx.upulseDir} daemon status`);
expect(output).toContain('STOPPED');
});
it('--dir flag overrides PULSE_BASE_DIR env', () => {
// Set env to a nonexistent path, but --dir points to valid instance
const output = runCli(`--dir ${ctx.upulseDir} daemon status`, {
PULSE_BASE_DIR: '/tmp/this-does-not-exist',
});
expect(output).toContain('STOPPED');
});
it('PULSE_BASE_DIR env works when no --dir flag', () => {
const output = runCli('daemon status', {
PULSE_BASE_DIR: ctx.upulseDir,
});
expect(output).toContain('STOPPED');
});
it('nonexistent --dir fails gracefully', () => {
const output = runCliFail('--dir /tmp/pulse-nonexistent-dir daemon status');
expect(output).toContain('config not found');
});
});
+12 -10
View File
@@ -135,14 +135,15 @@ export function initUpulse(dir?: string): void {
console.log(`\nEngine: ${config.engine.path}`);
console.log(`Staging: ${config.staging.path}`);
if (process.platform === 'linux') {
console.log('\nSystemd service: pulse.service (enabled, not started)');
console.log(' systemctl --user start pulse.service # start daemon');
console.log(' systemctl --user status pulse.service # check status');
console.log(' journalctl --user -u pulse.service -f # follow logs');
console.log('\nSystemd user units: pulse.service, pulse-workflow.service');
console.log(' systemctl --user start pulse.service # engine daemon');
console.log(' systemctl --user start pulse-workflow.service # workflow daemon (if installed)');
console.log(' systemctl --user status pulse.service # check engine');
console.log(' upulse status # workflow DB + units');
}
console.log('\nManual:');
console.log(' upulse daemon start --foreground # start in foreground');
console.log(' upulse tick --dry-run # manual dry-run tick');
console.log('\nCLI:');
console.log(' upulse workflow submit <wf> <topicKey> <file> # enqueue workflow task');
console.log(' upulse inspect events --limit 5 # recent workflow events');
}
// ── Helpers ────────────────────────────────────────────────────
@@ -267,10 +268,11 @@ type Rule<S, E> = (prev: S, curr: S)
## 构建和测试
\`\`\`bash
bun run typecheck # tsc --noEmit 类型检查
upulse dev test # 历史 snapshot 回放 dry-run
upulse tick --dry-run # 手动触发一次(不执行 effect)
bun run typecheck # tsc --noEmit 类型检查
upulse status # workflow daemon / DB 概览(可选)
upulse inspect events --limit 5 # 最近 workflow 事件
\`\`\`
Linux 上引擎由 systemd 用户单元驱动:\`systemctl --user start pulse.service\`
## ⚠️ 红线
- **不要修改 pulse.config.ts 的 runPulse 调用签名**
-184
View File
@@ -1,184 +0,0 @@
/**
* migrate.test.ts — Tests for legacy → scoped data migration.
*/
import { Database } from 'bun:sqlite';
import { afterEach, beforeEach, describe, expect, it } from 'bun:test';
import {
existsSync,
mkdirSync,
mkdtempSync,
readFileSync,
rmSync,
writeFileSync,
} from 'node:fs';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import type { UpulseConfig } from './config.js';
import { migrateToScoped } from './migrate.js';
function makeConfig(dir: string): UpulseConfig {
return {
dir,
engine: { path: join(dir, 'engine'), entrypoint: 'pulse.config.ts' },
staging: { path: join(dir, 'staging') },
daemon: {
pidFile: join(dir, 'daemon.pid'),
logFile: join(dir, 'daemon.log'),
},
store: {
scopesDir: join(dir, 'scopes'),
objectsDir: join(dir, 'objects'),
},
};
}
function createLegacyEventsDb(path: string): void {
const db = new Database(path, { create: true });
db.exec('PRAGMA journal_mode = WAL');
db.exec(`
CREATE TABLE IF NOT EXISTS events (
id TEXT PRIMARY KEY,
occurred_at INTEGER NOT NULL,
kind TEXT NOT NULL,
key TEXT,
hash TEXT,
code_rev TEXT,
meta TEXT
)
`);
db.exec(`
INSERT INTO events (id, occurred_at, kind, key, hash, code_rev, meta)
VALUES ('evt-001', 1000, 'tick', NULL, NULL, NULL, '{"tick_ms":15000}')
`);
db.close();
}
function createLegacyVitalsDb(path: string): void {
const db = new Database(path, { create: true });
db.exec('PRAGMA journal_mode = WAL');
db.exec(`
CREATE TABLE IF NOT EXISTS vitals (
id TEXT PRIMARY KEY,
occurred_at INTEGER NOT NULL,
key TEXT NOT NULL,
hash TEXT,
meta TEXT
)
`);
db.exec(`
INSERT INTO vitals (id, occurred_at, key, hash, meta)
VALUES ('vit-001', 1000, 'system', 'abc123', '{}')
`);
db.close();
}
describe('migrateToScoped', () => {
let tmpDir: string;
beforeEach(() => {
tmpDir = mkdtempSync(join(tmpdir(), 'pulse-migrate-test-'));
mkdirSync(tmpDir, { recursive: true });
});
afterEach(() => {
rmSync(tmpDir, { recursive: true, force: true });
});
it('migrates events.db → scopes/_system.db', () => {
createLegacyEventsDb(join(tmpDir, 'events.db'));
const config = makeConfig(tmpDir);
migrateToScoped(config);
const systemDbPath = join(tmpDir, 'scopes', '_system.db');
expect(existsSync(systemDbPath)).toBe(true);
const db = new Database(systemDbPath);
const rows = db.prepare('SELECT * FROM events').all() as Array<{
id: string;
}>;
db.close();
expect(rows.length).toBe(1);
expect(rows[0].id).toBe('evt-001');
});
it('migrates vitals.db → scopes/_vitals.db as events', () => {
createLegacyEventsDb(join(tmpDir, 'events.db'));
createLegacyVitalsDb(join(tmpDir, 'vitals.db'));
const config = makeConfig(tmpDir);
migrateToScoped(config);
const vitalsDbPath = join(tmpDir, 'scopes', '_vitals.db');
expect(existsSync(vitalsDbPath)).toBe(true);
const db = new Database(vitalsDbPath);
const rows = db
.prepare('SELECT * FROM events WHERE kind = ?')
.all('vital') as Array<{
id: string;
kind: string;
key: string;
hash: string | null;
}>;
db.close();
expect(rows.length).toBe(1);
expect(rows[0].kind).toBe('vital');
expect(rows[0].key).toBe('system');
expect(rows[0].hash).toBe('abc123');
});
it('preserves old files as backups', () => {
const legacyEventsPath = join(tmpDir, 'events.db');
const legacyVitalsPath = join(tmpDir, 'vitals.db');
createLegacyEventsDb(legacyEventsPath);
createLegacyVitalsDb(legacyVitalsPath);
const config = makeConfig(tmpDir);
migrateToScoped(config);
expect(existsSync(legacyEventsPath)).toBe(true);
expect(existsSync(legacyVitalsPath)).toBe(true);
});
it('repeated migration is a no-op', () => {
createLegacyEventsDb(join(tmpDir, 'events.db'));
const config = makeConfig(tmpDir);
migrateToScoped(config);
const systemDbPath = join(tmpDir, 'scopes', '_system.db');
// Record modification time
const stat1 = Bun.file(systemDbPath).lastModified;
// Migrate again — should be a no-op
migrateToScoped(config);
const stat2 = Bun.file(systemDbPath).lastModified;
expect(stat2).toBe(stat1);
});
it('skips when no legacy files exist', () => {
const config = makeConfig(tmpDir);
migrateToScoped(config);
expect(existsSync(join(tmpDir, 'scopes'))).toBe(false);
});
it('copies WAL and SHM files if they exist', () => {
createLegacyEventsDb(join(tmpDir, 'events.db'));
writeFileSync(join(tmpDir, 'events.db-wal'), 'wal-data');
writeFileSync(join(tmpDir, 'events.db-shm'), 'shm-data');
const config = makeConfig(tmpDir);
migrateToScoped(config);
const systemDbPath = join(tmpDir, 'scopes', '_system.db');
expect(existsSync(`${systemDbPath}-wal`)).toBe(true);
expect(existsSync(`${systemDbPath}-shm`)).toBe(true);
expect(readFileSync(`${systemDbPath}-wal`, 'utf-8')).toBe('wal-data');
expect(readFileSync(`${systemDbPath}-shm`, 'utf-8')).toBe('shm-data');
});
});
-79
View File
@@ -1,79 +0,0 @@
/**
* migrate.ts — Automatic data migration from legacy flat layout to scoped layout.
*
* Legacy: ~/.upulse/events.db + ~/.upulse/vitals.db
* Scoped: ~/.upulse/scopes/_system.db + ~/.upulse/scopes/_vitals.db (events table)
*
* Vitals records are converted to events with kind='vital'.
* Old files are preserved as backups — they are never deleted.
*/
import { Database } from 'bun:sqlite';
import { copyFileSync, existsSync, mkdirSync } from 'node:fs';
import { join } from 'node:path';
import { createScopedStore } from '@uncaged/pulse';
import type { UpulseConfig } from './config.js';
export function migrateToScoped(config: UpulseConfig): void {
const scopesDir = config.store.scopesDir;
const systemDbPath = join(scopesDir, '_system.db');
// Already migrated — skip
if (existsSync(systemDbPath)) return;
const legacyEvents = join(config.dir, 'events.db');
const legacyVitals = join(config.dir, 'vitals.db');
// Nothing to migrate
if (!existsSync(legacyEvents) && !existsSync(legacyVitals)) return;
mkdirSync(scopesDir, { recursive: true });
if (existsSync(legacyEvents)) {
copyFileSync(legacyEvents, systemDbPath);
for (const ext of ['-wal', '-shm']) {
if (existsSync(legacyEvents + ext)) {
copyFileSync(legacyEvents + ext, systemDbPath + ext);
}
}
console.log('Migrated events.db → scopes/_system.db');
}
if (existsSync(legacyVitals)) {
const vitalsDbPath = join(scopesDir, '_vitals.db');
if (!existsSync(vitalsDbPath)) {
const oldDb = new Database(legacyVitals, { readonly: true });
const rows = oldDb
.query(
'SELECT id, occurred_at, key, hash, meta FROM vitals ORDER BY occurred_at',
)
.all() as Array<{
id: string;
occurred_at: number;
key: string;
hash: string | null;
meta: string | null;
}>;
oldDb.close();
const scopedStore = createScopedStore({
basePath: scopesDir,
objectsDir: config.store.objectsDir,
});
const vitalsStore = scopedStore.scope('_vitals');
for (const row of rows) {
vitalsStore.appendEvent({
occurredAt: row.occurred_at,
kind: 'vital',
key: row.key,
hash: row.hash ?? undefined,
meta: row.meta ?? undefined,
});
}
scopedStore.close();
console.log(
`Migrated vitals.db → scopes/_vitals.db (${rows.length} records as events)`,
);
}
}
}
+29 -1
View File
@@ -5,13 +5,16 @@
* Vitals use ScopedStore → scope('_vitals') with kind='vital' events.
*/
import { existsSync } from 'node:fs';
import { existsSync, mkdirSync } from 'node:fs';
import { dirname, join } from 'node:path';
import {
createScopedStore,
createStore,
type EventRecord,
type PulseStore,
type ScopedStore,
} from '@uncaged/pulse';
import type { UpulseConfig } from './config.js';
export type { EventRecord, PulseStore, ScopedStore };
@@ -65,3 +68,28 @@ export function openOrCreateStore(
const scoped = openOrCreateScopedStore(scopesDir, objectsDir);
return scoped.scope('_system');
}
/** Paths for workflow daemon / CLI (workflows.db + scopes/objects). */
export function workflowDataPaths(config: UpulseConfig): {
eventsDbPath: string;
objectsDir: string;
} {
return {
eventsDbPath: join(config.store.scopesDir, 'workflows.db'),
objectsDir: join(config.store.scopesDir, 'objects'),
};
}
/** Open workflow event store if workflows.db exists (aligns with workflow-daemon). */
export function openWorkflowsStore(config: UpulseConfig): PulseStore | null {
const { eventsDbPath, objectsDir } = workflowDataPaths(config);
if (!existsSync(eventsDbPath)) return null;
return createStore({ eventsDbPath, objectsDir });
}
/** Create dirs and open workflow store (for submit). */
export function openOrCreateWorkflowsStore(config: UpulseConfig): PulseStore {
const { eventsDbPath, objectsDir } = workflowDataPaths(config);
mkdirSync(dirname(eventsDbPath), { recursive: true });
return createStore({ eventsDbPath, objectsDir });
}
+3 -3
View File
@@ -610,7 +610,7 @@ function renderEvents(events) {
const list = document.getElementById('eventList');
if (!events || events.length === 0) {
list.innerHTML = '<div class="empty"><div class="empty-icon">◇</div>' +
'<div class="empty-text">No events yet. Start the daemon to begin collecting data.</div></div>';
'<div class="empty-text">No events yet. Start the pulse engine (e.g. systemctl --user start pulse.service) to begin collecting data.</div></div>';
return;
}
@@ -658,7 +658,7 @@ async function loadSenseKeys() {
const tabs = document.getElementById('vitalsTabs');
if (!data.keys || data.keys.length === 0) {
tabs.innerHTML = '<div class="empty-text" style="color:var(--text-dim)">No sense keys found. Start the daemon to collect data.</div>';
tabs.innerHTML = '<div class="empty-text" style="color:var(--text-dim)">No sense keys found. Start the pulse engine (e.g. systemctl --user start pulse.service) to collect data.</div>';
return;
}
@@ -839,7 +839,7 @@ async function loadDeploys() {
if (!data.deploys || data.deploys.length === 0) {
list.innerHTML = '<div class="empty"><div class="empty-icon">↑</div>' +
'<div class="empty-text">No deploy history. Use "upulse deploy promote" to create one.</div></div>';
'<div class="empty-text">No deploy history yet. Promote/rollback events appear here when recorded by the engine.</div></div>';
return;
}