test(cli): add e2e test for nerve store archive #174
@@ -52,6 +52,7 @@ import { logsCommand } from "../commands/logs.js";
|
||||
import { senseCommand } from "../commands/sense.js";
|
||||
import { statusCommand } from "../commands/status.js";
|
||||
import { stopCommand } from "../commands/stop.js";
|
||||
import { storeCommand } from "../commands/store.js";
|
||||
import { threadCommand } from "../commands/thread.js";
|
||||
import { workflowCommand } from "../commands/workflow.js";
|
||||
|
||||
@@ -105,6 +106,25 @@ export default {
|
||||
};
|
||||
`;
|
||||
|
||||
const nerveYamlWithNoopWorkflow = `senses:
|
||||
counter:
|
||||
group: e2e
|
||||
|
||||
reflexes: []
|
||||
|
||||
workflows:
|
||||
noop:
|
||||
concurrency: 1
|
||||
overflow: drop
|
||||
|
||||
max_rounds: 10
|
||||
|
||||
api:
|
||||
port: null
|
||||
token: null
|
||||
host: 127.0.0.1
|
||||
`;
|
||||
|
||||
/** Empty migration — counter sense uses only `_signals` (auto-created by daemon). */
|
||||
const counterMigration = `-- no-op migration for e2e counter sense
|
||||
SELECT 1;
|
||||
@@ -122,6 +142,28 @@ export async function compute(_db, _peers, _options) {
|
||||
}
|
||||
`;
|
||||
|
||||
/** First trigger launches local noop workflow; later triggers emit a plain signal. */
|
||||
const counterIndexJsWithNoopWorkflow = `let _launched = false;
|
||||
export async function compute(_db, _peers, _options) {
|
||||
if (!_launched) {
|
||||
_launched = true;
|
||||
return { workflow: "noop|3|e2e-archive" };
|
||||
}
|
||||
return { idle: true };
|
||||
}
|
||||
`;
|
||||
|
||||
/** Minimal workflow: moderator ends immediately (no role rounds). */
|
||||
const noopWorkflowIndexJs = `const END = "__end__";
|
||||
export default {
|
||||
name: "noop",
|
||||
roles: {
|
||||
bot: async () => ({ content: "ok", meta: {} }),
|
||||
},
|
||||
moderator: () => END,
|
||||
};
|
||||
`;
|
||||
|
||||
const e2eRootCommand = defineCommand({
|
||||
meta: { name: "nerve", description: "e2e" },
|
||||
subCommands: {
|
||||
@@ -131,46 +173,52 @@ const e2eRootCommand = defineCommand({
|
||||
status: statusCommand,
|
||||
stop: stopCommand,
|
||||
workflow: workflowCommand,
|
||||
store: storeCommand,
|
||||
thread: threadCommand,
|
||||
},
|
||||
});
|
||||
|
||||
function defaultTestConfig(): NerveConfig {
|
||||
function defaultTestConfig(withNoopWorkflow: boolean): NerveConfig {
|
||||
return {
|
||||
senses: {
|
||||
counter: { group: "e2e", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: {
|
||||
echo: { concurrency: 1, overflow: "queue", maxQueue: 10 },
|
||||
echo: { concurrency: 1, overflow: "queue" as const, maxQueue: 10 },
|
||||
...(withNoopWorkflow ? { noop: { concurrency: 1, overflow: "drop" as const } } : {}),
|
||||
},
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
};
|
||||
}
|
||||
|
||||
/** Symlink monorepo `@uncaged/nerve-daemon` so `loadDaemonModule` (thread/workflow store) resolves under the temp workspace. */
|
||||
function linkWorkspaceDaemonPackage(nerveRoot: string): void {
|
||||
const scopeDir = join(nerveRoot, "node_modules", "@uncaged");
|
||||
mkdirSync(scopeDir, { recursive: true });
|
||||
const linkPath = join(scopeDir, "nerve-daemon");
|
||||
symlinkSync(nerveDaemonRoot, linkPath);
|
||||
}
|
||||
|
||||
function writeWorkspaceLayout(nerveRoot: string): void {
|
||||
function writeWorkspaceLayout(nerveRoot: string, withNoopWorkflow: boolean): void {
|
||||
mkdirSync(join(nerveRoot, "data", "senses"), { recursive: true });
|
||||
mkdirSync(join(nerveRoot, "data", "blobs"), { recursive: true });
|
||||
mkdirSync(join(nerveRoot, "senses", "counter", "migrations"), { recursive: true });
|
||||
mkdirSync(join(nerveRoot, "workflows", "echo"), { recursive: true });
|
||||
writeFileSync(join(nerveRoot, "nerve.yaml"), nerveYamlTemplate, "utf8");
|
||||
writeFileSync(
|
||||
join(nerveRoot, "nerve.yaml"),
|
||||
withNoopWorkflow ? nerveYamlWithNoopWorkflow : nerveYamlTemplate,
|
||||
"utf8",
|
||||
);
|
||||
writeFileSync(
|
||||
join(nerveRoot, "senses", "counter", "migrations", "001.sql"),
|
||||
counterMigration,
|
||||
"utf8",
|
||||
);
|
||||
writeFileSync(join(nerveRoot, "senses", "counter", "index.js"), counterIndexJs, "utf8");
|
||||
writeFileSync(
|
||||
join(nerveRoot, "senses", "counter", "index.js"),
|
||||
withNoopWorkflow ? counterIndexJsWithNoopWorkflow : counterIndexJs,
|
||||
"utf8",
|
||||
);
|
||||
writeFileSync(join(nerveRoot, "workflows", "echo", "index.js"), echoWorkflowIndexJs, "utf8");
|
||||
linkWorkspaceDaemonPackage(nerveRoot);
|
||||
if (withNoopWorkflow) {
|
||||
mkdirSync(join(nerveRoot, "workflows", "noop", "migrations"), { recursive: true });
|
||||
writeFileSync(join(nerveRoot, "workflows", "noop", "index.js"), noopWorkflowIndexJs, "utf8");
|
||||
}
|
||||
linkWorkspaceDaemonIntoNerveRoot(nerveRoot);
|
||||
}
|
||||
|
||||
export type TestDaemonHandle = {
|
||||
@@ -180,8 +228,30 @@ export type TestDaemonHandle = {
|
||||
kernel: Kernel;
|
||||
};
|
||||
|
||||
/** Reserved for future overrides; pass `null` today. */
|
||||
export type StartTestDaemonOpts = null;
|
||||
export type StartTestDaemonOpts = {
|
||||
/**
|
||||
* When true, counter sense's first compute launches a local `noop` workflow (real
|
||||
* workflow-worker child). Requires built `workflow-worker.js` next to `sense-worker.js`.
|
||||
*/
|
||||
withNoopWorkflow: boolean;
|
||||
} | null;
|
||||
|
||||
function useNoopWorkflow(opts: StartTestDaemonOpts): boolean {
|
||||
return opts !== null && opts.withNoopWorkflow === true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Symlink workspace `@uncaged/nerve-daemon` into `<nerveRoot>/node_modules` so
|
||||
* `loadDaemonModule(nerveRoot)` resolves for `nerve store` / `nerve thread` in e2e.
|
||||
*/
|
||||
export function linkWorkspaceDaemonIntoNerveRoot(nerveRoot: string): void {
|
||||
const daemonPkgRoot = dirname(require.resolve("@uncaged/nerve-daemon/package.json"));
|
||||
const linkDir = join(nerveRoot, "node_modules", "@uncaged");
|
||||
const linkPath = join(linkDir, "nerve-daemon");
|
||||
mkdirSync(linkDir, { recursive: true });
|
||||
if (existsSync(linkPath)) return;
|
||||
symlinkSync(daemonPkgRoot, linkPath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Poll until predicate returns true, or reject after `timeoutMs`.
|
||||
@@ -216,7 +286,7 @@ export async function pollUntil(
|
||||
export async function startTestDaemon(
|
||||
_opts: StartTestDaemonOpts = null,
|
||||
): Promise<TestDaemonHandle> {
|
||||
void _opts;
|
||||
const withNoop = useNoopWorkflow(_opts);
|
||||
if (!existsSync(senseWorkerScript)) {
|
||||
throw new Error(
|
||||
`Missing "${senseWorkerScript}". Run \`pnpm --filter @uncaged/nerve-daemon build\` (cli package "pretest" runs this automatically).`,
|
||||
@@ -230,9 +300,9 @@ export async function startTestDaemon(
|
||||
|
||||
const fakeHome = mkdtempSync(join(tmpdir(), "nerve-cli-e2e-"));
|
||||
const nerveRoot = join(fakeHome, ".uncaged-nerve");
|
||||
writeWorkspaceLayout(nerveRoot);
|
||||
writeWorkspaceLayout(nerveRoot, withNoop);
|
||||
|
||||
const config = defaultTestConfig();
|
||||
const config = defaultTestConfig(withNoop);
|
||||
const socketPath = join(nerveRoot, "nerve.sock");
|
||||
const kernel = createKernel(config, nerveRoot, {
|
||||
workerScript: senseWorkerScript,
|
||||
@@ -306,8 +376,8 @@ function patchWriteStream(stream: NodeJS.WriteStream, sink: string[]): () => voi
|
||||
|
||||
/**
|
||||
* Runs `nerve <args>` for the subset wired in `e2eRootCommand` (`sense`, `logs`, `daemon`,
|
||||
* `status`, `stop`, `workflow`, `thread`), with `process.env.HOME` pointing at `handle.fakeHome` so
|
||||
* `getNerveRoot()` resolves to the test workspace. Captures stdout/stderr; sets `exitCode`
|
||||
* `status`, `stop`, `store`, `workflow`, `thread`), with `process.env.HOME` pointing at `handle.fakeHome`
|
||||
* so `getNerveRoot()` resolves to the test workspace. Captures stdout/stderr; sets `exitCode`
|
||||
* when `process.exit` is invoked or on thrown errors.
|
||||
*/
|
||||
export async function runCli(handle: TestDaemonHandle, args: string[]): Promise<CliRunResult> {
|
||||
|
||||
@@ -0,0 +1,132 @@
|
||||
/**
|
||||
* E2E: `nerve store archive` against a real daemon + logs.db (issue #163).
|
||||
*
|
||||
* Archive eligibility is by `logs.timestamp` (ms; there is no `created_at` column);
|
||||
* RFC-001 §5.4 cold-archive uses UTC days.
|
||||
*/
|
||||
|
||||
import { existsSync, readFileSync, readdirSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
import { DatabaseSync } from "node:sqlite";
|
||||
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
|
||||
import {
|
||||
type TestDaemonHandle,
|
||||
linkWorkspaceDaemonIntoNerveRoot,
|
||||
pollUntil,
|
||||
runCli,
|
||||
startTestDaemon,
|
||||
stopTestDaemon,
|
||||
} from "./e2e-harness.js";
|
||||
|
||||
/** Wall time safely outside the 30-day hot window (RFC-001 archive). */
|
||||
const ARCHIVED_TEST_TS = Date.UTC(2020, 5, 15, 12, 0, 0);
|
||||
const EXPECTED_ARCHIVE_DAY = "2020-06-15";
|
||||
|
||||
describe("e2e store archive", () => {
|
||||
let daemon: TestDaemonHandle | null = null;
|
||||
|
||||
afterEach(async () => {
|
||||
const h = daemon;
|
||||
daemon = null;
|
||||
if (h === null) return;
|
||||
await Promise.race([
|
||||
stopTestDaemon(h),
|
||||
new Promise<never>((_, reject) =>
|
||||
setTimeout(() => reject(new Error("stopTestDaemon timed out after 10s")), 10_000),
|
||||
),
|
||||
]);
|
||||
});
|
||||
|
||||
it(
|
||||
"archives old workflow logs to JSONL, removes rows from logs, thread list still reads workflow_runs",
|
||||
{ timeout: 30_000 },
|
||||
async () => {
|
||||
daemon = await startTestDaemon({ withNoopWorkflow: true });
|
||||
linkWorkspaceDaemonIntoNerveRoot(daemon.nerveRoot);
|
||||
|
||||
const triggerResult = await runCli(daemon, ["sense", "trigger", "counter"]);
|
||||
expect(triggerResult.exitCode).toBe(0);
|
||||
|
||||
const logsDb = join(daemon.nerveRoot, "data", "logs.db");
|
||||
await pollUntil(() => {
|
||||
if (!existsSync(logsDb)) return false;
|
||||
try {
|
||||
const db = new DatabaseSync(logsDb, { readOnly: true });
|
||||
const row = db
|
||||
.prepare(
|
||||
"SELECT COUNT(*) AS c FROM logs WHERE source = 'workflow' AND type = 'completed'",
|
||||
)
|
||||
.get() as { c: number } | undefined;
|
||||
db.close();
|
||||
return (row?.c ?? 0) > 0;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}, 20_000);
|
||||
|
||||
const dbMut = new DatabaseSync(logsDb);
|
||||
dbMut.exec(`UPDATE logs SET timestamp = ${String(ARCHIVED_TEST_TS)} WHERE 1=1`);
|
||||
dbMut.close();
|
||||
|
||||
const archiveResult = await runCli(daemon, ["store", "archive"]);
|
||||
expect(archiveResult.exitCode).toBe(0);
|
||||
expect(archiveResult.stdout).toContain("✅ Archived");
|
||||
expect(archiveResult.stdout).toContain("rows=");
|
||||
expect(archiveResult.stdout).toContain(EXPECTED_ARCHIVE_DAY);
|
||||
|
||||
const dbAfter = new DatabaseSync(logsDb, { readOnly: true });
|
||||
const logCountRow = dbAfter.prepare("SELECT COUNT(*) AS c FROM logs").get() as { c: number };
|
||||
dbAfter.close();
|
||||
expect(logCountRow.c).toBe(0);
|
||||
|
||||
const archiveDir = join(daemon.nerveRoot, "data", "archive", "logs");
|
||||
expect(existsSync(archiveDir)).toBe(true);
|
||||
const names = readdirSync(archiveDir);
|
||||
expect(names.some((n) => n === `${EXPECTED_ARCHIVE_DAY}.jsonl`)).toBe(true);
|
||||
const jsonlPath = join(archiveDir, `${EXPECTED_ARCHIVE_DAY}.jsonl`);
|
||||
const jsonl = readFileSync(jsonlPath, "utf8");
|
||||
expect(jsonl.length).toBeGreaterThan(0);
|
||||
expect(jsonl).toContain('"source":"workflow"');
|
||||
|
||||
const listResult = await runCli(daemon, ["thread", "list", "--all"]);
|
||||
expect(listResult.exitCode).toBe(0);
|
||||
// workflow_runs is not pruned by archive — list may still show completed runs; hot logs are empty.
|
||||
expect(listResult.stdout).toContain("noop");
|
||||
},
|
||||
);
|
||||
|
||||
it("store archive --vacuum completes VACUUM after archiving", { timeout: 30_000 }, async () => {
|
||||
daemon = await startTestDaemon({ withNoopWorkflow: true });
|
||||
linkWorkspaceDaemonIntoNerveRoot(daemon.nerveRoot);
|
||||
|
||||
const triggerResult = await runCli(daemon, ["sense", "trigger", "counter"]);
|
||||
expect(triggerResult.exitCode).toBe(0);
|
||||
|
||||
const logsDb = join(daemon.nerveRoot, "data", "logs.db");
|
||||
await pollUntil(() => {
|
||||
if (!existsSync(logsDb)) return false;
|
||||
try {
|
||||
const db = new DatabaseSync(logsDb, { readOnly: true });
|
||||
const row = db
|
||||
.prepare(
|
||||
"SELECT COUNT(*) AS c FROM logs WHERE source = 'workflow' AND type = 'completed'",
|
||||
)
|
||||
.get() as { c: number } | undefined;
|
||||
db.close();
|
||||
return (row?.c ?? 0) > 0;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}, 20_000);
|
||||
|
||||
const dbMut = new DatabaseSync(logsDb);
|
||||
dbMut.exec(`UPDATE logs SET timestamp = ${String(ARCHIVED_TEST_TS)} WHERE 1=1`);
|
||||
dbMut.close();
|
||||
|
||||
const archiveVac = await runCli(daemon, ["store", "archive", "--vacuum"]);
|
||||
expect(archiveVac.exitCode).toBe(0);
|
||||
expect(archiveVac.stdout).toContain("VACUUM completed.");
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user