Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 978b1680a3 |
@@ -4,15 +4,21 @@
|
||||
*/
|
||||
|
||||
import type {
|
||||
ArchiveLogsDayResult as DaemonArchiveLogsDayResult,
|
||||
ArchiveLogsOptions as DaemonArchiveLogsOptions,
|
||||
ArchiveLogsResult as DaemonArchiveLogsResult,
|
||||
LogEntry as DaemonLogEntry,
|
||||
LogQuery as DaemonLogQuery,
|
||||
LogStore as DaemonLogStore,
|
||||
WorkflowRun as DaemonWorkflowRun,
|
||||
WorkflowRunStatus as DaemonWorkflowRunStatus,
|
||||
} from "@uncaged/nerve-daemon";
|
||||
import { describe, it, expectTypeOf } from "vitest";
|
||||
import { describe, expectTypeOf, it } from "vitest";
|
||||
|
||||
import type {
|
||||
ArchiveLogsDayResult,
|
||||
ArchiveLogsOptions,
|
||||
ArchiveLogsResult,
|
||||
LogEntry,
|
||||
LogQuery,
|
||||
LogStore,
|
||||
@@ -42,6 +48,26 @@ describe("daemon-types drift guard", () => {
|
||||
});
|
||||
|
||||
it("LogStore has all required methods", () => {
|
||||
expectTypeOf<LogStore>().toMatchTypeOf<Pick<DaemonLogStore, "query" | "getWorkflowRun" | "getActiveWorkflowRuns" | "getAllWorkflowRuns" | "upsertWorkflowRun" | "close">>();
|
||||
expectTypeOf<LogStore>().toMatchTypeOf<
|
||||
Pick<
|
||||
DaemonLogStore,
|
||||
| "query"
|
||||
| "getWorkflowRun"
|
||||
| "getActiveWorkflowRuns"
|
||||
| "getAllWorkflowRuns"
|
||||
| "upsertWorkflowRun"
|
||||
| "archiveLogs"
|
||||
| "close"
|
||||
>
|
||||
>();
|
||||
});
|
||||
|
||||
it("ArchiveLogs types match daemon", () => {
|
||||
expectTypeOf<ArchiveLogsOptions>().toMatchTypeOf<DaemonArchiveLogsOptions>();
|
||||
expectTypeOf<DaemonArchiveLogsOptions>().toMatchTypeOf<ArchiveLogsOptions>();
|
||||
expectTypeOf<ArchiveLogsResult>().toMatchTypeOf<DaemonArchiveLogsResult>();
|
||||
expectTypeOf<DaemonArchiveLogsResult>().toMatchTypeOf<ArchiveLogsResult>();
|
||||
expectTypeOf<ArchiveLogsDayResult>().toMatchTypeOf<DaemonArchiveLogsDayResult>();
|
||||
expectTypeOf<DaemonArchiveLogsDayResult>().toMatchTypeOf<ArchiveLogsDayResult>();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -6,6 +6,7 @@ import { senseCommand } from "./commands/sense.js";
|
||||
import { startCommand } from "./commands/start.js";
|
||||
import { statusCommand } from "./commands/status.js";
|
||||
import { stopCommand } from "./commands/stop.js";
|
||||
import { storeCommand } from "./commands/store.js";
|
||||
import { validateCommand } from "./commands/validate.js";
|
||||
import { workflowCommand } from "./commands/workflow.js";
|
||||
|
||||
@@ -22,6 +23,7 @@ const main = defineCommand({
|
||||
logs: logsCommand,
|
||||
validate: validateCommand,
|
||||
sense: senseCommand,
|
||||
store: storeCommand,
|
||||
workflow: workflowCommand,
|
||||
},
|
||||
});
|
||||
|
||||
@@ -219,23 +219,6 @@ const initWorkspaceCommand = defineCommand({
|
||||
},
|
||||
});
|
||||
|
||||
async function tryRequireSqlite(nerveRoot: string): Promise<boolean> {
|
||||
try {
|
||||
const modulePath = join(nerveRoot, "node_modules", "better-sqlite3");
|
||||
// Use a child process to test if the native module loads
|
||||
const { execFile } = await import("node:child_process");
|
||||
const { promisify } = await import("node:util");
|
||||
const execFileAsync = promisify(execFile);
|
||||
await execFileAsync("node", ["-e", `require(${JSON.stringify(modulePath)})`], {
|
||||
cwd: nerveRoot,
|
||||
timeout: 10_000,
|
||||
});
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
async function runInitWorkspace(force: boolean): Promise<void> {
|
||||
const nerveRoot = getNerveRoot();
|
||||
|
||||
@@ -259,36 +242,20 @@ async function runInitWorkspace(force: boolean): Promise<void> {
|
||||
);
|
||||
|
||||
process.stdout.write("Installing dependencies…\n");
|
||||
const { cmd, installArgs } = await detectPackageManager();
|
||||
try {
|
||||
const { cmd, installArgs } = await detectPackageManager();
|
||||
await runCommand(cmd, installArgs, nerveRoot);
|
||||
} catch {
|
||||
process.stdout.write(
|
||||
`⚠️ Install failed. Try manually:\n cd ${nerveRoot} && ${cmd} ${installArgs.join(" ")}\n`,
|
||||
);
|
||||
}
|
||||
|
||||
// Verify better-sqlite3 native module — rebuild up to 2 times if broken
|
||||
const sqlitePath = join(nerveRoot, "node_modules", "better-sqlite3");
|
||||
if (existsSync(sqlitePath)) {
|
||||
for (let attempt = 1; attempt <= 2; attempt++) {
|
||||
if (await tryRequireSqlite(nerveRoot)) break;
|
||||
process.stdout.write("Rebuilding native module better-sqlite3…\n");
|
||||
try {
|
||||
await runCommand(cmd, ["rebuild", "better-sqlite3"], nerveRoot);
|
||||
} catch {
|
||||
process.stdout.write(
|
||||
`${attempt === 1 ? "Building" : "Retrying build of"} native module better-sqlite3 (attempt ${attempt}/2)…\n`,
|
||||
);
|
||||
try {
|
||||
await runCommand(cmd, ["rebuild", "better-sqlite3"], nerveRoot);
|
||||
} catch {
|
||||
// will be caught by the verify below
|
||||
}
|
||||
}
|
||||
if (!(await tryRequireSqlite(nerveRoot))) {
|
||||
process.stdout.write(
|
||||
`⚠️ better-sqlite3 native module is not working. The daemon will fail to start.\n` +
|
||||
` Fix: cd ${nerveRoot} && ${cmd} rebuild better-sqlite3\n` +
|
||||
` Or: npm install --build-from-source better-sqlite3\n`,
|
||||
"⚠️ rebuild better-sqlite3 failed — if the daemon fails to start, reinstall from the workspace directory.\n",
|
||||
);
|
||||
}
|
||||
} catch {
|
||||
process.stdout.write("⚠️ Install failed — you may need to install dependencies manually.\n");
|
||||
}
|
||||
|
||||
if (!existsSync(join(nerveRoot, ".git"))) {
|
||||
|
||||
@@ -0,0 +1,70 @@
|
||||
import { existsSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
|
||||
import { defineCommand } from "citty";
|
||||
|
||||
import { loadDaemonModule } from "../workspace-daemon.js";
|
||||
import { getNerveRoot } from "../workspace.js";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// nerve store archive
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const storeArchiveCommand = defineCommand({
|
||||
meta: {
|
||||
name: "archive",
|
||||
description:
|
||||
"Export logs older than 30 days from logs.db to data/archive/logs/YYYY-MM-DD.jsonl and delete those rows (RFC-001 §5.4)",
|
||||
},
|
||||
args: {
|
||||
vacuum: {
|
||||
type: "boolean",
|
||||
description: "Run SQLite VACUUM after archiving",
|
||||
default: false,
|
||||
},
|
||||
},
|
||||
async run({ args }) {
|
||||
const nerveRoot = getNerveRoot();
|
||||
const dbPath = join(nerveRoot, "data", "logs.db");
|
||||
if (!existsSync(dbPath)) {
|
||||
process.stderr.write("❌ No data/logs.db found — start the daemon at least once.\n");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const { createLogStore } = await loadDaemonModule(nerveRoot);
|
||||
const store = createLogStore(dbPath);
|
||||
|
||||
try {
|
||||
const result = store.archiveLogs({ vacuum: args.vacuum });
|
||||
if (result.days.length === 0) {
|
||||
process.stdout.write(
|
||||
"✅ Nothing to archive (no eligible UTC days beyond the 30-day window).\n",
|
||||
);
|
||||
} else {
|
||||
process.stdout.write(`✅ Archived ${result.days.length} day(s):\n`);
|
||||
for (const d of result.days) {
|
||||
process.stdout.write(` ${d.day} rows=${d.rowCount} ${d.filePath}\n`);
|
||||
}
|
||||
}
|
||||
if (result.vacuumed) {
|
||||
process.stdout.write(" VACUUM completed.\n");
|
||||
}
|
||||
} finally {
|
||||
store.close();
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// nerve store
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export const storeCommand = defineCommand({
|
||||
meta: {
|
||||
name: "store",
|
||||
description: "Maintain local Nerve SQLite stores (log cold-archive, …)",
|
||||
},
|
||||
subCommands: {
|
||||
archive: storeArchiveCommand,
|
||||
},
|
||||
});
|
||||
@@ -40,6 +40,24 @@ export type LogQuery = {
|
||||
limit?: number;
|
||||
};
|
||||
|
||||
export type ArchiveLogsOptions = {
|
||||
now?: number;
|
||||
vacuum?: boolean;
|
||||
maxDays?: number;
|
||||
retentionMs?: number;
|
||||
};
|
||||
|
||||
export type ArchiveLogsDayResult = {
|
||||
day: string;
|
||||
rowCount: number;
|
||||
filePath: string;
|
||||
};
|
||||
|
||||
export type ArchiveLogsResult = {
|
||||
days: ArchiveLogsDayResult[];
|
||||
vacuumed: boolean;
|
||||
};
|
||||
|
||||
/** Subset of daemon LogStore used by the CLI workflow commands. */
|
||||
export type LogStore = {
|
||||
query: (filter?: LogQuery) => LogEntry[];
|
||||
@@ -47,5 +65,6 @@ export type LogStore = {
|
||||
getActiveWorkflowRuns: (workflowName?: string) => WorkflowRun[];
|
||||
getAllWorkflowRuns: (workflowName: string | null) => WorkflowRun[];
|
||||
upsertWorkflowRun: (entry: Omit<LogEntry, "id">, run: WorkflowRun) => LogEntry;
|
||||
archiveLogs: (options?: ArchiveLogsOptions) => ArchiveLogsResult;
|
||||
close: () => void;
|
||||
};
|
||||
|
||||
@@ -91,6 +91,7 @@ function makeLogStore(
|
||||
}),
|
||||
getTriggerPayload: vi.fn(() => ({ value: 42 })),
|
||||
getThreadEvents: vi.fn(() => [{ type: "thread_start", triggerPayload: {} }]),
|
||||
archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })),
|
||||
close: vi.fn(),
|
||||
};
|
||||
return store;
|
||||
|
||||
@@ -77,6 +77,7 @@ function makeLogStore() {
|
||||
getActiveWorkflowRuns: vi.fn(() => []),
|
||||
getTriggerPayload: vi.fn(() => null),
|
||||
getThreadEvents: vi.fn(() => []),
|
||||
archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })),
|
||||
close: vi.fn(),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -74,6 +74,7 @@ function makeMockLogStore() {
|
||||
getAllWorkflowRuns: vi.fn(() => []),
|
||||
getTriggerPayload: vi.fn(() => null),
|
||||
getThreadEvents: vi.fn(() => []),
|
||||
archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })),
|
||||
close: vi.fn(),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -80,6 +80,7 @@ function makeLogStore() {
|
||||
getActiveWorkflowRuns: vi.fn(() => []),
|
||||
getTriggerPayload: vi.fn(() => null),
|
||||
getThreadEvents: vi.fn(() => []),
|
||||
archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })),
|
||||
close: vi.fn(),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -0,0 +1,40 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
|
||||
import {
|
||||
assertValidUtcDay,
|
||||
compareIsoDays,
|
||||
lastArchivableUtcDay,
|
||||
nextUtcDay,
|
||||
prevUtcDay,
|
||||
utcDateStringFromMs,
|
||||
utcDayEndExclusiveMs,
|
||||
utcDayStartMs,
|
||||
} from "../log-archive.js";
|
||||
|
||||
describe("log-archive UTC helpers", () => {
|
||||
it("lastArchivableUtcDay matches RFC-style boundary (exclusive end of day ≤ boundary)", () => {
|
||||
const boundary = Date.UTC(2026, 1, 2, 12, 0, 0); // 2026-02-02 12:00 UTC
|
||||
expect(lastArchivableUtcDay(boundary)).toBe("2026-02-01");
|
||||
});
|
||||
|
||||
it("round-trips UTC day bounds", () => {
|
||||
expect(utcDayStartMs("2026-02-01")).toBe(Date.UTC(2026, 1, 1));
|
||||
expect(utcDayEndExclusiveMs("2026-02-01")).toBe(Date.UTC(2026, 1, 2));
|
||||
expect(utcDateStringFromMs(Date.UTC(2026, 1, 1, 23, 59))).toBe("2026-02-01");
|
||||
});
|
||||
|
||||
it("nextUtcDay / prevUtcDay", () => {
|
||||
expect(nextUtcDay("2026-02-01")).toBe("2026-02-02");
|
||||
expect(prevUtcDay("2026-02-01")).toBe("2026-01-31");
|
||||
});
|
||||
|
||||
it("compareIsoDays sorts lexicographically for YYYY-MM-DD", () => {
|
||||
expect(compareIsoDays("2026-01-01", "2026-02-01")).toBeLessThan(0);
|
||||
expect(compareIsoDays("2026-02-01", "2026-02-01")).toBe(0);
|
||||
});
|
||||
|
||||
it("assertValidUtcDay rejects invalid calendars", () => {
|
||||
expect(() => assertValidUtcDay("2026-02-31")).toThrow();
|
||||
expect(() => assertValidUtcDay("bad")).toThrow();
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,139 @@
|
||||
import { mkdtempSync, readFileSync, rmSync } from "node:fs";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it } from "vitest";
|
||||
|
||||
import { LOG_ARCHIVE_META_KEY, createLogStore } from "../log-store.js";
|
||||
import type { LogStore } from "../log-store.js";
|
||||
|
||||
const DAY_MS = 86_400_000;
|
||||
|
||||
/** `now` such that 2026-02-01 is the last archivable UTC day under a 30-day window. */
|
||||
function nowForLastArchivableFeb1(): number {
|
||||
const boundary = Date.UTC(2026, 1, 2, 12, 0, 0);
|
||||
return boundary + 30 * DAY_MS;
|
||||
}
|
||||
|
||||
describe("LogStore — cold archive (RFC-001 §5.4)", () => {
|
||||
let tmpDir: string;
|
||||
let store: LogStore;
|
||||
|
||||
beforeEach(() => {
|
||||
tmpDir = mkdtempSync(join(tmpdir(), "nerve-archive-"));
|
||||
store = createLogStore(join(tmpDir, "data", "logs.db"));
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
store.close();
|
||||
rmSync(tmpDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it("exports one UTC day to JSONL, deletes rows, advances archived_up_to", () => {
|
||||
const ts = Date.UTC(2026, 1, 1, 10, 0, 0);
|
||||
store.append({ source: "system", type: "x", refId: null, payload: '{"a":1}', ts });
|
||||
store.append({ source: "reflex", type: "y", refId: "z", payload: null, ts: ts + 1 });
|
||||
|
||||
const now = nowForLastArchivableFeb1();
|
||||
const result = store.archiveLogs({ now, retentionMs: 30 * DAY_MS });
|
||||
expect(result.days).toHaveLength(1);
|
||||
expect(result.days[0].day).toBe("2026-02-01");
|
||||
expect(result.days[0].rowCount).toBe(2);
|
||||
|
||||
const jsonlPath = join(tmpDir, "data", "archive", "logs", "2026-02-01.jsonl");
|
||||
expect(result.days[0].filePath).toBe(jsonlPath);
|
||||
|
||||
const lines = readFileSync(jsonlPath, "utf8").trim().split("\n");
|
||||
expect(lines).toHaveLength(2);
|
||||
const o = JSON.parse(lines[0] ?? "{}") as Record<string, unknown>;
|
||||
expect(o.source).toBe("system");
|
||||
expect(o.refId).toBeNull();
|
||||
|
||||
expect(store.query()).toHaveLength(0);
|
||||
expect(store.getMeta(LOG_ARCHIVE_META_KEY)).toBe("2026-02-01");
|
||||
});
|
||||
|
||||
it("returns nothing for an empty logs table", () => {
|
||||
const r = store.archiveLogs({ now: nowForLastArchivableFeb1(), retentionMs: 30 * DAY_MS });
|
||||
expect(r.days).toHaveLength(0);
|
||||
expect(store.getMeta(LOG_ARCHIVE_META_KEY)).toBeNull();
|
||||
});
|
||||
|
||||
it("does nothing when all logs are inside the hot window", () => {
|
||||
const now = Date.UTC(2026, 3, 23, 12, 0, 0);
|
||||
const ts = now - 5 * DAY_MS;
|
||||
store.append({ source: "system", type: "warm", refId: null, payload: null, ts });
|
||||
const r = store.archiveLogs({ now, retentionMs: 30 * DAY_MS });
|
||||
expect(r.days).toHaveLength(0);
|
||||
expect(store.query()).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("second archive with same clock is a no-op (watermark already caught up)", () => {
|
||||
const ts = Date.UTC(2026, 1, 1, 10, 0, 0);
|
||||
store.append({ source: "system", type: "x", refId: null, payload: null, ts });
|
||||
const now = nowForLastArchivableFeb1();
|
||||
store.archiveLogs({ now, retentionMs: 30 * DAY_MS });
|
||||
const path = join(tmpDir, "data", "archive", "logs", "2026-02-01.jsonl");
|
||||
const first = readFileSync(path, "utf8");
|
||||
|
||||
const second = store.archiveLogs({ now, retentionMs: 30 * DAY_MS });
|
||||
expect(second.days).toHaveLength(0);
|
||||
expect(readFileSync(path, "utf8")).toBe(first);
|
||||
});
|
||||
|
||||
it("overwrites JSONL when the same UTC day is archived again after watermark rewind", () => {
|
||||
const ts = Date.UTC(2026, 1, 1, 10, 0, 0);
|
||||
store.append({ source: "a", type: "1", refId: null, payload: null, ts });
|
||||
const now = nowForLastArchivableFeb1();
|
||||
store.archiveLogs({ now, retentionMs: 30 * DAY_MS });
|
||||
store.setMeta(LOG_ARCHIVE_META_KEY, "2026-01-31");
|
||||
store.append({ source: "b", type: "2", refId: null, payload: null, ts: ts + 100 });
|
||||
store.archiveLogs({ now, retentionMs: 30 * DAY_MS });
|
||||
|
||||
const path = join(tmpDir, "data", "archive", "logs", "2026-02-01.jsonl");
|
||||
const lines = readFileSync(path, "utf8").trim().split("\n");
|
||||
expect(lines).toHaveLength(1);
|
||||
expect(JSON.parse(lines[0] ?? "{}").source).toBe("b");
|
||||
});
|
||||
|
||||
it("respects maxDays across invocations", () => {
|
||||
const t1 = Date.UTC(2026, 1, 1, 10, 0, 0);
|
||||
const t2 = Date.UTC(2026, 1, 2, 10, 0, 0);
|
||||
store.append({ source: "system", type: "a", refId: null, payload: null, ts: t1 });
|
||||
store.append({ source: "system", type: "b", refId: null, payload: null, ts: t2 });
|
||||
|
||||
const now = Date.UTC(2027, 0, 1, 12, 0, 0);
|
||||
const r1 = store.archiveLogs({ now, retentionMs: 30 * DAY_MS, maxDays: 1 });
|
||||
expect(r1.days).toHaveLength(1);
|
||||
expect(r1.days[0].day).toBe("2026-02-01");
|
||||
|
||||
const r2 = store.archiveLogs({ now, retentionMs: 30 * DAY_MS, maxDays: 1 });
|
||||
expect(r2.days).toHaveLength(1);
|
||||
expect(r2.days[0].day).toBe("2026-02-02");
|
||||
expect(store.getMeta(LOG_ARCHIVE_META_KEY)).toBe("2026-02-02");
|
||||
expect(store.query()).toHaveLength(0);
|
||||
});
|
||||
|
||||
it("starts from earliest log day when it is before watermark+1", () => {
|
||||
store.setMeta(LOG_ARCHIVE_META_KEY, "2026-01-10");
|
||||
const ts = Date.UTC(2026, 1, 1, 10, 0, 0);
|
||||
store.append({ source: "x", type: "p", refId: null, payload: null, ts });
|
||||
const result = store.archiveLogs({ now: nowForLastArchivableFeb1(), retentionMs: 30 * DAY_MS });
|
||||
expect(result.days.map((d) => d.day)).toContain("2026-02-01");
|
||||
});
|
||||
|
||||
it("throws on invalid archived_up_to watermark", () => {
|
||||
store.setMeta(LOG_ARCHIVE_META_KEY, "not-a-date");
|
||||
expect(() => store.archiveLogs({ now: Date.now() })).toThrow();
|
||||
});
|
||||
|
||||
it("runs VACUUM when vacuum: true", () => {
|
||||
const ts = Date.UTC(2026, 1, 1, 10, 0, 0);
|
||||
store.append({ source: "system", type: "x", refId: null, payload: null, ts });
|
||||
const r = store.archiveLogs({
|
||||
now: nowForLastArchivableFeb1(),
|
||||
retentionMs: 30 * DAY_MS,
|
||||
vacuum: true,
|
||||
});
|
||||
expect(r.vacuumed).toBe(true);
|
||||
});
|
||||
});
|
||||
@@ -74,6 +74,7 @@ function makeLogStore() {
|
||||
getActiveWorkflowRuns: vi.fn(() => []),
|
||||
getTriggerPayload: vi.fn(() => null),
|
||||
getThreadEvents: vi.fn(() => []),
|
||||
archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })),
|
||||
close: vi.fn(),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -32,13 +32,16 @@ export type { Kernel, KernelOptions, KernelHealth } from "./kernel.js";
|
||||
export { createFileWatcher } from "./file-watcher.js";
|
||||
export type { FileWatcher, FileChange, FileChangeHandler } from "./file-watcher.js";
|
||||
|
||||
export { createLogStore } from "./log-store.js";
|
||||
export { createLogStore, LOG_ARCHIVE_META_KEY } from "./log-store.js";
|
||||
export type {
|
||||
LogStore,
|
||||
LogEntry,
|
||||
LogQuery,
|
||||
WorkflowRun,
|
||||
WorkflowRunStatus,
|
||||
ArchiveLogsDayResult,
|
||||
ArchiveLogsOptions,
|
||||
ArchiveLogsResult,
|
||||
} from "./log-store.js";
|
||||
|
||||
export { createWorkflowManager } from "./workflow-manager.js";
|
||||
|
||||
@@ -0,0 +1,78 @@
|
||||
/** Log cold-archive helpers (RFC-001 §5.4) — UTC calendar days, JSONL export. */
|
||||
|
||||
export const LOG_ARCHIVE_META_KEY = "archived_up_to";
|
||||
|
||||
export const DEFAULT_LOG_RETENTION_MS = 30 * 86_400_000;
|
||||
|
||||
export type ArchiveLogsOptions = {
|
||||
/** Wall clock for retention boundary (default: `Date.now()`). */
|
||||
now?: number;
|
||||
/** Run `VACUUM` after archiving (outside the per-day transaction). */
|
||||
vacuum?: boolean;
|
||||
/** Max UTC days to process in one call (default: unlimited). */
|
||||
maxDays?: number;
|
||||
/** Override default 30-day retention (tests). */
|
||||
retentionMs?: number;
|
||||
};
|
||||
|
||||
export type ArchiveLogsDayResult = {
|
||||
day: string;
|
||||
rowCount: number;
|
||||
filePath: string;
|
||||
};
|
||||
|
||||
export type ArchiveLogsResult = {
|
||||
days: ArchiveLogsDayResult[];
|
||||
vacuumed: boolean;
|
||||
};
|
||||
|
||||
export function utcDateStringFromMs(ms: number): string {
|
||||
return new Date(ms).toISOString().slice(0, 10);
|
||||
}
|
||||
|
||||
function parseUtcDayParts(day: string): [number, number, number] {
|
||||
const m = /^(\d{4})-(\d{2})-(\d{2})$/.exec(day);
|
||||
if (m === null) {
|
||||
throw new Error(`Invalid UTC day (expected YYYY-MM-DD): ${day}`);
|
||||
}
|
||||
const y = Number(m[1]);
|
||||
const mo = Number(m[2]);
|
||||
const d = Number(m[3]);
|
||||
const t = Date.UTC(y, mo - 1, d);
|
||||
if (utcDateStringFromMs(t) !== day) {
|
||||
throw new Error(`Invalid UTC calendar day: ${day}`);
|
||||
}
|
||||
return [y, mo, d];
|
||||
}
|
||||
|
||||
export function assertValidUtcDay(day: string): void {
|
||||
parseUtcDayParts(day);
|
||||
}
|
||||
|
||||
export function utcDayStartMs(day: string): number {
|
||||
const [y, mo, d] = parseUtcDayParts(day);
|
||||
return Date.UTC(y, mo - 1, d);
|
||||
}
|
||||
|
||||
export function utcDayEndExclusiveMs(day: string): number {
|
||||
return utcDayStartMs(day) + 86_400_000;
|
||||
}
|
||||
|
||||
export function prevUtcDay(day: string): string {
|
||||
return utcDateStringFromMs(utcDayStartMs(day) - 86_400_000);
|
||||
}
|
||||
|
||||
export function nextUtcDay(day: string): string {
|
||||
return utcDateStringFromMs(utcDayEndExclusiveMs(day));
|
||||
}
|
||||
|
||||
/** Last UTC calendar day D such that the exclusive end of D is ≤ boundaryMs. */
|
||||
export function lastArchivableUtcDay(boundaryMs: number): string {
|
||||
return prevUtcDay(utcDateStringFromMs(boundaryMs));
|
||||
}
|
||||
|
||||
export function compareIsoDays(a: string, b: string): number {
|
||||
if (a < b) return -1;
|
||||
if (a > b) return 1;
|
||||
return 0;
|
||||
}
|
||||
@@ -7,11 +7,27 @@
|
||||
* Also provides a `meta` key-value table for bookkeeping (e.g. archive watermarks).
|
||||
*/
|
||||
|
||||
import { mkdirSync } from "node:fs";
|
||||
import { dirname } from "node:path";
|
||||
import { mkdirSync, writeFileSync } from "node:fs";
|
||||
import { dirname, join } from "node:path";
|
||||
import Database from "better-sqlite3";
|
||||
import type BetterSqlite3 from "better-sqlite3";
|
||||
|
||||
import {
|
||||
DEFAULT_LOG_RETENTION_MS,
|
||||
LOG_ARCHIVE_META_KEY,
|
||||
assertValidUtcDay,
|
||||
compareIsoDays,
|
||||
lastArchivableUtcDay,
|
||||
nextUtcDay,
|
||||
utcDateStringFromMs,
|
||||
utcDayEndExclusiveMs,
|
||||
utcDayStartMs,
|
||||
} from "./log-archive.js";
|
||||
import type { ArchiveLogsDayResult, ArchiveLogsOptions, ArchiveLogsResult } from "./log-archive.js";
|
||||
|
||||
export { LOG_ARCHIVE_META_KEY } from "./log-archive.js";
|
||||
export type { ArchiveLogsDayResult, ArchiveLogsOptions, ArchiveLogsResult } from "./log-archive.js";
|
||||
|
||||
export type LogEntry = {
|
||||
id?: number;
|
||||
source: string;
|
||||
@@ -105,6 +121,12 @@ export type LogStore = {
|
||||
* Used for crash recovery to rebuild ThreadState.
|
||||
*/
|
||||
getThreadEvents: (runId: string) => Array<{ type: string; [key: string]: unknown }>;
|
||||
/**
|
||||
* Export logs older than the retention window to `data/archive/logs/YYYY-MM-DD.jsonl`,
|
||||
* then delete those rows and advance `meta.archived_up_to` in one transaction per day
|
||||
* (RFC-001 §5.4).
|
||||
*/
|
||||
archiveLogs: (options?: ArchiveLogsOptions) => ArchiveLogsResult;
|
||||
close: () => void;
|
||||
};
|
||||
|
||||
@@ -138,6 +160,78 @@ CREATE INDEX IF NOT EXISTS idx_workflow_runs_status ON workflow_runs(status);
|
||||
CREATE INDEX IF NOT EXISTS idx_workflow_runs_workflow ON workflow_runs(workflow);
|
||||
`;
|
||||
|
||||
type SqlLogRow = {
|
||||
id: number;
|
||||
source: string;
|
||||
type: string;
|
||||
ref_id: string | null;
|
||||
payload: string | null;
|
||||
ts: number;
|
||||
};
|
||||
|
||||
function buildJsonlBody(rows: SqlLogRow[]): string {
|
||||
if (rows.length === 0) return "";
|
||||
const lines = rows.map((r) =>
|
||||
JSON.stringify({
|
||||
id: r.id,
|
||||
source: r.source,
|
||||
type: r.type,
|
||||
refId: r.ref_id,
|
||||
payload: r.payload,
|
||||
ts: r.ts,
|
||||
}),
|
||||
);
|
||||
return `${lines.join("\n")}\n`;
|
||||
}
|
||||
|
||||
function runOptionalVacuum(sqlite: BetterSqlite3.Database, vacuum?: boolean): boolean {
|
||||
if (vacuum !== true) return false;
|
||||
sqlite.exec("VACUUM");
|
||||
return true;
|
||||
}
|
||||
|
||||
function resolveArchiveStartDay(watermark: string | null, minDay: string): string {
|
||||
if (watermark === null) return minDay;
|
||||
const afterWatermark = nextUtcDay(watermark);
|
||||
return compareIsoDays(minDay, afterWatermark) < 0 ? minDay : afterWatermark;
|
||||
}
|
||||
|
||||
function runArchiveDayLoop(
|
||||
dbPath: string,
|
||||
options: ArchiveLogsOptions,
|
||||
selectLogsForDayStmt: BetterSqlite3.Statement,
|
||||
archiveDayTx: (day: string, start: number, endExclusive: number) => void,
|
||||
startDay: string,
|
||||
lastDay: string,
|
||||
): ArchiveLogsDayResult[] {
|
||||
const archiveDir = join(dirname(dbPath), "archive", "logs");
|
||||
mkdirSync(archiveDir, { recursive: true });
|
||||
|
||||
const days: ArchiveLogsDayResult[] = [];
|
||||
let d = startDay;
|
||||
let processed = 0;
|
||||
|
||||
while (compareIsoDays(d, lastDay) <= 0) {
|
||||
if (options.maxDays !== undefined && processed >= options.maxDays) {
|
||||
break;
|
||||
}
|
||||
|
||||
const start = utcDayStartMs(d);
|
||||
const endExclusive = utcDayEndExclusiveMs(d);
|
||||
const rows = selectLogsForDayStmt.all({ start, endExclusive }) as SqlLogRow[];
|
||||
|
||||
const filePath = join(archiveDir, `${d}.jsonl`);
|
||||
writeFileSync(filePath, buildJsonlBody(rows), "utf8");
|
||||
archiveDayTx(d, start, endExclusive);
|
||||
|
||||
days.push({ day: d, rowCount: rows.length, filePath });
|
||||
processed += 1;
|
||||
d = nextUtcDay(d);
|
||||
}
|
||||
|
||||
return days;
|
||||
}
|
||||
|
||||
export function createLogStore(dbPath: string): LogStore {
|
||||
mkdirSync(dirname(dbPath), { recursive: true });
|
||||
|
||||
@@ -186,6 +280,14 @@ export function createLogStore(dbPath: string): LogStore {
|
||||
"SELECT run_id, workflow, status, ts FROM workflow_runs WHERE workflow = ? ORDER BY ts DESC",
|
||||
);
|
||||
|
||||
const minLogTsStmt = sqlite.prepare("SELECT MIN(ts) AS m FROM logs");
|
||||
const selectLogsForDayStmt = sqlite.prepare(
|
||||
"SELECT id, source, type, ref_id, payload, ts FROM logs WHERE ts >= @start AND ts < @endExclusive ORDER BY id ASC",
|
||||
);
|
||||
const deleteLogsForDayStmt = sqlite.prepare(
|
||||
"DELETE FROM logs WHERE ts >= @start AND ts < @endExclusive",
|
||||
);
|
||||
|
||||
const upsertWorkflowRunTx = sqlite.transaction(
|
||||
(entry: Omit<LogEntry, "id">, run: WorkflowRun) => {
|
||||
const info = insertStmt.run({
|
||||
@@ -358,6 +460,52 @@ export function createLogStore(dbPath: string): LogStore {
|
||||
return result;
|
||||
}
|
||||
|
||||
const archiveDayTx = sqlite.transaction((day: string, start: number, endExclusive: number) => {
|
||||
deleteLogsForDayStmt.run({ start, endExclusive });
|
||||
setMetaStmt.run({ key: LOG_ARCHIVE_META_KEY, value: day });
|
||||
});
|
||||
|
||||
function readWatermark(): string | null {
|
||||
const raw = getMeta(LOG_ARCHIVE_META_KEY);
|
||||
if (raw === null) return null;
|
||||
assertValidUtcDay(raw);
|
||||
return raw;
|
||||
}
|
||||
|
||||
function firstLogUtcDay(): string | null {
|
||||
const row = minLogTsStmt.get() as { m: number | null } | undefined;
|
||||
const m = row?.m;
|
||||
if (m === null || m === undefined) return null;
|
||||
return utcDateStringFromMs(m);
|
||||
}
|
||||
|
||||
function archiveLogs(options: ArchiveLogsOptions = {}): ArchiveLogsResult {
|
||||
const now = options.now ?? Date.now();
|
||||
const retentionMs = options.retentionMs ?? DEFAULT_LOG_RETENTION_MS;
|
||||
const lastDay = lastArchivableUtcDay(now - retentionMs);
|
||||
|
||||
const watermark = readWatermark();
|
||||
const minDay = firstLogUtcDay();
|
||||
if (minDay === null) {
|
||||
return { days: [], vacuumed: runOptionalVacuum(sqlite, options.vacuum) };
|
||||
}
|
||||
|
||||
const startDay = resolveArchiveStartDay(watermark, minDay);
|
||||
if (compareIsoDays(startDay, lastDay) > 0) {
|
||||
return { days: [], vacuumed: runOptionalVacuum(sqlite, options.vacuum) };
|
||||
}
|
||||
|
||||
const days = runArchiveDayLoop(
|
||||
dbPath,
|
||||
options,
|
||||
selectLogsForDayStmt,
|
||||
archiveDayTx,
|
||||
startDay,
|
||||
lastDay,
|
||||
);
|
||||
return { days, vacuumed: runOptionalVacuum(sqlite, options.vacuum) };
|
||||
}
|
||||
|
||||
function close(): void {
|
||||
sqlite.close();
|
||||
}
|
||||
@@ -374,6 +522,7 @@ export function createLogStore(dbPath: string): LogStore {
|
||||
getAllWorkflowRuns,
|
||||
getTriggerPayload,
|
||||
getThreadEvents,
|
||||
archiveLogs,
|
||||
close,
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user