diff --git a/packages/cli/src/__tests__/daemon-types.test.ts b/packages/cli/src/__tests__/daemon-types.test.ts index 79594a8..9766507 100644 --- a/packages/cli/src/__tests__/daemon-types.test.ts +++ b/packages/cli/src/__tests__/daemon-types.test.ts @@ -4,15 +4,21 @@ */ import type { + ArchiveLogsDayResult as DaemonArchiveLogsDayResult, + ArchiveLogsOptions as DaemonArchiveLogsOptions, + ArchiveLogsResult as DaemonArchiveLogsResult, LogEntry as DaemonLogEntry, LogQuery as DaemonLogQuery, LogStore as DaemonLogStore, WorkflowRun as DaemonWorkflowRun, WorkflowRunStatus as DaemonWorkflowRunStatus, } from "@uncaged/nerve-daemon"; -import { describe, it, expectTypeOf } from "vitest"; +import { describe, expectTypeOf, it } from "vitest"; import type { + ArchiveLogsDayResult, + ArchiveLogsOptions, + ArchiveLogsResult, LogEntry, LogQuery, LogStore, @@ -42,6 +48,26 @@ describe("daemon-types drift guard", () => { }); it("LogStore has all required methods", () => { - expectTypeOf().toMatchTypeOf>(); + expectTypeOf().toMatchTypeOf< + Pick< + DaemonLogStore, + | "query" + | "getWorkflowRun" + | "getActiveWorkflowRuns" + | "getAllWorkflowRuns" + | "upsertWorkflowRun" + | "archiveLogs" + | "close" + > + >(); + }); + + it("ArchiveLogs types match daemon", () => { + expectTypeOf().toMatchTypeOf(); + expectTypeOf().toMatchTypeOf(); + expectTypeOf().toMatchTypeOf(); + expectTypeOf().toMatchTypeOf(); + expectTypeOf().toMatchTypeOf(); + expectTypeOf().toMatchTypeOf(); }); }); diff --git a/packages/cli/src/cli.ts b/packages/cli/src/cli.ts index ae0e4dc..7289a94 100644 --- a/packages/cli/src/cli.ts +++ b/packages/cli/src/cli.ts @@ -6,6 +6,7 @@ import { senseCommand } from "./commands/sense.js"; import { startCommand } from "./commands/start.js"; import { statusCommand } from "./commands/status.js"; import { stopCommand } from "./commands/stop.js"; +import { storeCommand } from "./commands/store.js"; import { validateCommand } from "./commands/validate.js"; import { workflowCommand } from "./commands/workflow.js"; @@ -22,6 +23,7 @@ const main = defineCommand({ logs: logsCommand, validate: validateCommand, sense: senseCommand, + store: storeCommand, workflow: workflowCommand, }, }); diff --git a/packages/cli/src/commands/store.ts b/packages/cli/src/commands/store.ts new file mode 100644 index 0000000..98c9048 --- /dev/null +++ b/packages/cli/src/commands/store.ts @@ -0,0 +1,70 @@ +import { existsSync } from "node:fs"; +import { join } from "node:path"; + +import { defineCommand } from "citty"; + +import { loadDaemonModule } from "../workspace-daemon.js"; +import { getNerveRoot } from "../workspace.js"; + +// --------------------------------------------------------------------------- +// nerve store archive +// --------------------------------------------------------------------------- + +const storeArchiveCommand = defineCommand({ + meta: { + name: "archive", + description: + "Export logs older than 30 days from logs.db to data/archive/logs/YYYY-MM-DD.jsonl and delete those rows (RFC-001 §5.4)", + }, + args: { + vacuum: { + type: "boolean", + description: "Run SQLite VACUUM after archiving", + default: false, + }, + }, + async run({ args }) { + const nerveRoot = getNerveRoot(); + const dbPath = join(nerveRoot, "data", "logs.db"); + if (!existsSync(dbPath)) { + process.stderr.write("❌ No data/logs.db found — start the daemon at least once.\n"); + process.exit(1); + } + + const { createLogStore } = await loadDaemonModule(nerveRoot); + const store = createLogStore(dbPath); + + try { + const result = store.archiveLogs({ vacuum: args.vacuum }); + if (result.days.length === 0) { + process.stdout.write( + "✅ Nothing to archive (no eligible UTC days beyond the 30-day window).\n", + ); + } else { + process.stdout.write(`✅ Archived ${result.days.length} day(s):\n`); + for (const d of result.days) { + process.stdout.write(` ${d.day} rows=${d.rowCount} ${d.filePath}\n`); + } + } + if (result.vacuumed) { + process.stdout.write(" VACUUM completed.\n"); + } + } finally { + store.close(); + } + }, +}); + +// --------------------------------------------------------------------------- +// nerve store +// --------------------------------------------------------------------------- + +export const storeCommand = defineCommand({ + meta: { + name: "store", + description: "Maintain local Nerve SQLite stores (log cold-archive, …)", + }, + subCommands: { + archive: storeArchiveCommand, + }, +}); diff --git a/packages/cli/src/daemon-types.ts b/packages/cli/src/daemon-types.ts index 351c3a8..f03854a 100644 --- a/packages/cli/src/daemon-types.ts +++ b/packages/cli/src/daemon-types.ts @@ -40,6 +40,24 @@ export type LogQuery = { limit?: number; }; +export type ArchiveLogsOptions = { + now?: number; + vacuum?: boolean; + maxDays?: number; + retentionMs?: number; +}; + +export type ArchiveLogsDayResult = { + day: string; + rowCount: number; + filePath: string; +}; + +export type ArchiveLogsResult = { + days: ArchiveLogsDayResult[]; + vacuumed: boolean; +}; + /** Subset of daemon LogStore used by the CLI workflow commands. */ export type LogStore = { query: (filter?: LogQuery) => LogEntry[]; @@ -47,5 +65,6 @@ export type LogStore = { getActiveWorkflowRuns: (workflowName?: string) => WorkflowRun[]; getAllWorkflowRuns: (workflowName: string | null) => WorkflowRun[]; upsertWorkflowRun: (entry: Omit, run: WorkflowRun) => LogEntry; + archiveLogs: (options?: ArchiveLogsOptions) => ArchiveLogsResult; close: () => void; }; diff --git a/packages/daemon/src/__tests__/crash-recovery.test.ts b/packages/daemon/src/__tests__/crash-recovery.test.ts index 137dce7..faf4a23 100644 --- a/packages/daemon/src/__tests__/crash-recovery.test.ts +++ b/packages/daemon/src/__tests__/crash-recovery.test.ts @@ -91,6 +91,7 @@ function makeLogStore( }), getTriggerPayload: vi.fn(() => ({ value: 42 })), getThreadEvents: vi.fn(() => [{ type: "thread_start", triggerPayload: {} }]), + archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })), close: vi.fn(), }; return store; diff --git a/packages/daemon/src/__tests__/hot-reload.test.ts b/packages/daemon/src/__tests__/hot-reload.test.ts index 5c307db..12318d6 100644 --- a/packages/daemon/src/__tests__/hot-reload.test.ts +++ b/packages/daemon/src/__tests__/hot-reload.test.ts @@ -77,6 +77,7 @@ function makeLogStore() { getActiveWorkflowRuns: vi.fn(() => []), getTriggerPayload: vi.fn(() => null), getThreadEvents: vi.fn(() => []), + archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })), close: vi.fn(), }; } diff --git a/packages/daemon/src/__tests__/kernel-trigger-sense.test.ts b/packages/daemon/src/__tests__/kernel-trigger-sense.test.ts index 086709b..a224ca3 100644 --- a/packages/daemon/src/__tests__/kernel-trigger-sense.test.ts +++ b/packages/daemon/src/__tests__/kernel-trigger-sense.test.ts @@ -74,6 +74,7 @@ function makeMockLogStore() { getAllWorkflowRuns: vi.fn(() => []), getTriggerPayload: vi.fn(() => null), getThreadEvents: vi.fn(() => []), + archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })), close: vi.fn(), }; } diff --git a/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts b/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts index 3c5a0a5..7991398 100644 --- a/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts +++ b/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts @@ -80,6 +80,7 @@ function makeLogStore() { getActiveWorkflowRuns: vi.fn(() => []), getTriggerPayload: vi.fn(() => null), getThreadEvents: vi.fn(() => []), + archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })), close: vi.fn(), }; } diff --git a/packages/daemon/src/__tests__/log-archive.test.ts b/packages/daemon/src/__tests__/log-archive.test.ts new file mode 100644 index 0000000..7613847 --- /dev/null +++ b/packages/daemon/src/__tests__/log-archive.test.ts @@ -0,0 +1,40 @@ +import { describe, expect, it } from "vitest"; + +import { + assertValidUtcDay, + compareIsoDays, + lastArchivableUtcDay, + nextUtcDay, + prevUtcDay, + utcDateStringFromMs, + utcDayEndExclusiveMs, + utcDayStartMs, +} from "../log-archive.js"; + +describe("log-archive UTC helpers", () => { + it("lastArchivableUtcDay matches RFC-style boundary (exclusive end of day ≤ boundary)", () => { + const boundary = Date.UTC(2026, 1, 2, 12, 0, 0); // 2026-02-02 12:00 UTC + expect(lastArchivableUtcDay(boundary)).toBe("2026-02-01"); + }); + + it("round-trips UTC day bounds", () => { + expect(utcDayStartMs("2026-02-01")).toBe(Date.UTC(2026, 1, 1)); + expect(utcDayEndExclusiveMs("2026-02-01")).toBe(Date.UTC(2026, 1, 2)); + expect(utcDateStringFromMs(Date.UTC(2026, 1, 1, 23, 59))).toBe("2026-02-01"); + }); + + it("nextUtcDay / prevUtcDay", () => { + expect(nextUtcDay("2026-02-01")).toBe("2026-02-02"); + expect(prevUtcDay("2026-02-01")).toBe("2026-01-31"); + }); + + it("compareIsoDays sorts lexicographically for YYYY-MM-DD", () => { + expect(compareIsoDays("2026-01-01", "2026-02-01")).toBeLessThan(0); + expect(compareIsoDays("2026-02-01", "2026-02-01")).toBe(0); + }); + + it("assertValidUtcDay rejects invalid calendars", () => { + expect(() => assertValidUtcDay("2026-02-31")).toThrow(); + expect(() => assertValidUtcDay("bad")).toThrow(); + }); +}); diff --git a/packages/daemon/src/__tests__/log-store-archive.test.ts b/packages/daemon/src/__tests__/log-store-archive.test.ts new file mode 100644 index 0000000..d3cf8e3 --- /dev/null +++ b/packages/daemon/src/__tests__/log-store-archive.test.ts @@ -0,0 +1,139 @@ +import { mkdtempSync, readFileSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; + +import { LOG_ARCHIVE_META_KEY, createLogStore } from "../log-store.js"; +import type { LogStore } from "../log-store.js"; + +const DAY_MS = 86_400_000; + +/** `now` such that 2026-02-01 is the last archivable UTC day under a 30-day window. */ +function nowForLastArchivableFeb1(): number { + const boundary = Date.UTC(2026, 1, 2, 12, 0, 0); + return boundary + 30 * DAY_MS; +} + +describe("LogStore — cold archive (RFC-001 §5.4)", () => { + let tmpDir: string; + let store: LogStore; + + beforeEach(() => { + tmpDir = mkdtempSync(join(tmpdir(), "nerve-archive-")); + store = createLogStore(join(tmpDir, "data", "logs.db")); + }); + + afterEach(() => { + store.close(); + rmSync(tmpDir, { recursive: true, force: true }); + }); + + it("exports one UTC day to JSONL, deletes rows, advances archived_up_to", () => { + const ts = Date.UTC(2026, 1, 1, 10, 0, 0); + store.append({ source: "system", type: "x", refId: null, payload: '{"a":1}', ts }); + store.append({ source: "reflex", type: "y", refId: "z", payload: null, ts: ts + 1 }); + + const now = nowForLastArchivableFeb1(); + const result = store.archiveLogs({ now, retentionMs: 30 * DAY_MS }); + expect(result.days).toHaveLength(1); + expect(result.days[0].day).toBe("2026-02-01"); + expect(result.days[0].rowCount).toBe(2); + + const jsonlPath = join(tmpDir, "data", "archive", "logs", "2026-02-01.jsonl"); + expect(result.days[0].filePath).toBe(jsonlPath); + + const lines = readFileSync(jsonlPath, "utf8").trim().split("\n"); + expect(lines).toHaveLength(2); + const o = JSON.parse(lines[0] ?? "{}") as Record; + expect(o.source).toBe("system"); + expect(o.refId).toBeNull(); + + expect(store.query()).toHaveLength(0); + expect(store.getMeta(LOG_ARCHIVE_META_KEY)).toBe("2026-02-01"); + }); + + it("returns nothing for an empty logs table", () => { + const r = store.archiveLogs({ now: nowForLastArchivableFeb1(), retentionMs: 30 * DAY_MS }); + expect(r.days).toHaveLength(0); + expect(store.getMeta(LOG_ARCHIVE_META_KEY)).toBeNull(); + }); + + it("does nothing when all logs are inside the hot window", () => { + const now = Date.UTC(2026, 3, 23, 12, 0, 0); + const ts = now - 5 * DAY_MS; + store.append({ source: "system", type: "warm", refId: null, payload: null, ts }); + const r = store.archiveLogs({ now, retentionMs: 30 * DAY_MS }); + expect(r.days).toHaveLength(0); + expect(store.query()).toHaveLength(1); + }); + + it("second archive with same clock is a no-op (watermark already caught up)", () => { + const ts = Date.UTC(2026, 1, 1, 10, 0, 0); + store.append({ source: "system", type: "x", refId: null, payload: null, ts }); + const now = nowForLastArchivableFeb1(); + store.archiveLogs({ now, retentionMs: 30 * DAY_MS }); + const path = join(tmpDir, "data", "archive", "logs", "2026-02-01.jsonl"); + const first = readFileSync(path, "utf8"); + + const second = store.archiveLogs({ now, retentionMs: 30 * DAY_MS }); + expect(second.days).toHaveLength(0); + expect(readFileSync(path, "utf8")).toBe(first); + }); + + it("overwrites JSONL when the same UTC day is archived again after watermark rewind", () => { + const ts = Date.UTC(2026, 1, 1, 10, 0, 0); + store.append({ source: "a", type: "1", refId: null, payload: null, ts }); + const now = nowForLastArchivableFeb1(); + store.archiveLogs({ now, retentionMs: 30 * DAY_MS }); + store.setMeta(LOG_ARCHIVE_META_KEY, "2026-01-31"); + store.append({ source: "b", type: "2", refId: null, payload: null, ts: ts + 100 }); + store.archiveLogs({ now, retentionMs: 30 * DAY_MS }); + + const path = join(tmpDir, "data", "archive", "logs", "2026-02-01.jsonl"); + const lines = readFileSync(path, "utf8").trim().split("\n"); + expect(lines).toHaveLength(1); + expect(JSON.parse(lines[0] ?? "{}").source).toBe("b"); + }); + + it("respects maxDays across invocations", () => { + const t1 = Date.UTC(2026, 1, 1, 10, 0, 0); + const t2 = Date.UTC(2026, 1, 2, 10, 0, 0); + store.append({ source: "system", type: "a", refId: null, payload: null, ts: t1 }); + store.append({ source: "system", type: "b", refId: null, payload: null, ts: t2 }); + + const now = Date.UTC(2027, 0, 1, 12, 0, 0); + const r1 = store.archiveLogs({ now, retentionMs: 30 * DAY_MS, maxDays: 1 }); + expect(r1.days).toHaveLength(1); + expect(r1.days[0].day).toBe("2026-02-01"); + + const r2 = store.archiveLogs({ now, retentionMs: 30 * DAY_MS, maxDays: 1 }); + expect(r2.days).toHaveLength(1); + expect(r2.days[0].day).toBe("2026-02-02"); + expect(store.getMeta(LOG_ARCHIVE_META_KEY)).toBe("2026-02-02"); + expect(store.query()).toHaveLength(0); + }); + + it("starts from earliest log day when it is before watermark+1", () => { + store.setMeta(LOG_ARCHIVE_META_KEY, "2026-01-10"); + const ts = Date.UTC(2026, 1, 1, 10, 0, 0); + store.append({ source: "x", type: "p", refId: null, payload: null, ts }); + const result = store.archiveLogs({ now: nowForLastArchivableFeb1(), retentionMs: 30 * DAY_MS }); + expect(result.days.map((d) => d.day)).toContain("2026-02-01"); + }); + + it("throws on invalid archived_up_to watermark", () => { + store.setMeta(LOG_ARCHIVE_META_KEY, "not-a-date"); + expect(() => store.archiveLogs({ now: Date.now() })).toThrow(); + }); + + it("runs VACUUM when vacuum: true", () => { + const ts = Date.UTC(2026, 1, 1, 10, 0, 0); + store.append({ source: "system", type: "x", refId: null, payload: null, ts }); + const r = store.archiveLogs({ + now: nowForLastArchivableFeb1(), + retentionMs: 30 * DAY_MS, + vacuum: true, + }); + expect(r.vacuumed).toBe(true); + }); +}); diff --git a/packages/daemon/src/__tests__/workflow-manager.test.ts b/packages/daemon/src/__tests__/workflow-manager.test.ts index 969ed3b..c0cd2e3 100644 --- a/packages/daemon/src/__tests__/workflow-manager.test.ts +++ b/packages/daemon/src/__tests__/workflow-manager.test.ts @@ -74,6 +74,7 @@ function makeLogStore() { getActiveWorkflowRuns: vi.fn(() => []), getTriggerPayload: vi.fn(() => null), getThreadEvents: vi.fn(() => []), + archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })), close: vi.fn(), }; } diff --git a/packages/daemon/src/index.ts b/packages/daemon/src/index.ts index 61da3bd..ed758a9 100644 --- a/packages/daemon/src/index.ts +++ b/packages/daemon/src/index.ts @@ -32,13 +32,16 @@ export type { Kernel, KernelOptions, KernelHealth } from "./kernel.js"; export { createFileWatcher } from "./file-watcher.js"; export type { FileWatcher, FileChange, FileChangeHandler } from "./file-watcher.js"; -export { createLogStore } from "./log-store.js"; +export { createLogStore, LOG_ARCHIVE_META_KEY } from "./log-store.js"; export type { LogStore, LogEntry, LogQuery, WorkflowRun, WorkflowRunStatus, + ArchiveLogsDayResult, + ArchiveLogsOptions, + ArchiveLogsResult, } from "./log-store.js"; export { createWorkflowManager } from "./workflow-manager.js"; diff --git a/packages/daemon/src/log-archive.ts b/packages/daemon/src/log-archive.ts new file mode 100644 index 0000000..17dff7a --- /dev/null +++ b/packages/daemon/src/log-archive.ts @@ -0,0 +1,78 @@ +/** Log cold-archive helpers (RFC-001 §5.4) — UTC calendar days, JSONL export. */ + +export const LOG_ARCHIVE_META_KEY = "archived_up_to"; + +export const DEFAULT_LOG_RETENTION_MS = 30 * 86_400_000; + +export type ArchiveLogsOptions = { + /** Wall clock for retention boundary (default: `Date.now()`). */ + now?: number; + /** Run `VACUUM` after archiving (outside the per-day transaction). */ + vacuum?: boolean; + /** Max UTC days to process in one call (default: unlimited). */ + maxDays?: number; + /** Override default 30-day retention (tests). */ + retentionMs?: number; +}; + +export type ArchiveLogsDayResult = { + day: string; + rowCount: number; + filePath: string; +}; + +export type ArchiveLogsResult = { + days: ArchiveLogsDayResult[]; + vacuumed: boolean; +}; + +export function utcDateStringFromMs(ms: number): string { + return new Date(ms).toISOString().slice(0, 10); +} + +function parseUtcDayParts(day: string): [number, number, number] { + const m = /^(\d{4})-(\d{2})-(\d{2})$/.exec(day); + if (m === null) { + throw new Error(`Invalid UTC day (expected YYYY-MM-DD): ${day}`); + } + const y = Number(m[1]); + const mo = Number(m[2]); + const d = Number(m[3]); + const t = Date.UTC(y, mo - 1, d); + if (utcDateStringFromMs(t) !== day) { + throw new Error(`Invalid UTC calendar day: ${day}`); + } + return [y, mo, d]; +} + +export function assertValidUtcDay(day: string): void { + parseUtcDayParts(day); +} + +export function utcDayStartMs(day: string): number { + const [y, mo, d] = parseUtcDayParts(day); + return Date.UTC(y, mo - 1, d); +} + +export function utcDayEndExclusiveMs(day: string): number { + return utcDayStartMs(day) + 86_400_000; +} + +export function prevUtcDay(day: string): string { + return utcDateStringFromMs(utcDayStartMs(day) - 86_400_000); +} + +export function nextUtcDay(day: string): string { + return utcDateStringFromMs(utcDayEndExclusiveMs(day)); +} + +/** Last UTC calendar day D such that the exclusive end of D is ≤ boundaryMs. */ +export function lastArchivableUtcDay(boundaryMs: number): string { + return prevUtcDay(utcDateStringFromMs(boundaryMs)); +} + +export function compareIsoDays(a: string, b: string): number { + if (a < b) return -1; + if (a > b) return 1; + return 0; +} diff --git a/packages/daemon/src/log-store.ts b/packages/daemon/src/log-store.ts index ff1dc5e..521c161 100644 --- a/packages/daemon/src/log-store.ts +++ b/packages/daemon/src/log-store.ts @@ -7,11 +7,27 @@ * Also provides a `meta` key-value table for bookkeeping (e.g. archive watermarks). */ -import { mkdirSync } from "node:fs"; -import { dirname } from "node:path"; +import { mkdirSync, writeFileSync } from "node:fs"; +import { dirname, join } from "node:path"; import Database from "better-sqlite3"; import type BetterSqlite3 from "better-sqlite3"; +import { + DEFAULT_LOG_RETENTION_MS, + LOG_ARCHIVE_META_KEY, + assertValidUtcDay, + compareIsoDays, + lastArchivableUtcDay, + nextUtcDay, + utcDateStringFromMs, + utcDayEndExclusiveMs, + utcDayStartMs, +} from "./log-archive.js"; +import type { ArchiveLogsDayResult, ArchiveLogsOptions, ArchiveLogsResult } from "./log-archive.js"; + +export { LOG_ARCHIVE_META_KEY } from "./log-archive.js"; +export type { ArchiveLogsDayResult, ArchiveLogsOptions, ArchiveLogsResult } from "./log-archive.js"; + export type LogEntry = { id?: number; source: string; @@ -105,6 +121,12 @@ export type LogStore = { * Used for crash recovery to rebuild ThreadState. */ getThreadEvents: (runId: string) => Array<{ type: string; [key: string]: unknown }>; + /** + * Export logs older than the retention window to `data/archive/logs/YYYY-MM-DD.jsonl`, + * then delete those rows and advance `meta.archived_up_to` in one transaction per day + * (RFC-001 §5.4). + */ + archiveLogs: (options?: ArchiveLogsOptions) => ArchiveLogsResult; close: () => void; }; @@ -138,6 +160,78 @@ CREATE INDEX IF NOT EXISTS idx_workflow_runs_status ON workflow_runs(status); CREATE INDEX IF NOT EXISTS idx_workflow_runs_workflow ON workflow_runs(workflow); `; +type SqlLogRow = { + id: number; + source: string; + type: string; + ref_id: string | null; + payload: string | null; + ts: number; +}; + +function buildJsonlBody(rows: SqlLogRow[]): string { + if (rows.length === 0) return ""; + const lines = rows.map((r) => + JSON.stringify({ + id: r.id, + source: r.source, + type: r.type, + refId: r.ref_id, + payload: r.payload, + ts: r.ts, + }), + ); + return `${lines.join("\n")}\n`; +} + +function runOptionalVacuum(sqlite: BetterSqlite3.Database, vacuum?: boolean): boolean { + if (vacuum !== true) return false; + sqlite.exec("VACUUM"); + return true; +} + +function resolveArchiveStartDay(watermark: string | null, minDay: string): string { + if (watermark === null) return minDay; + const afterWatermark = nextUtcDay(watermark); + return compareIsoDays(minDay, afterWatermark) < 0 ? minDay : afterWatermark; +} + +function runArchiveDayLoop( + dbPath: string, + options: ArchiveLogsOptions, + selectLogsForDayStmt: BetterSqlite3.Statement, + archiveDayTx: (day: string, start: number, endExclusive: number) => void, + startDay: string, + lastDay: string, +): ArchiveLogsDayResult[] { + const archiveDir = join(dirname(dbPath), "archive", "logs"); + mkdirSync(archiveDir, { recursive: true }); + + const days: ArchiveLogsDayResult[] = []; + let d = startDay; + let processed = 0; + + while (compareIsoDays(d, lastDay) <= 0) { + if (options.maxDays !== undefined && processed >= options.maxDays) { + break; + } + + const start = utcDayStartMs(d); + const endExclusive = utcDayEndExclusiveMs(d); + const rows = selectLogsForDayStmt.all({ start, endExclusive }) as SqlLogRow[]; + + const filePath = join(archiveDir, `${d}.jsonl`); + writeFileSync(filePath, buildJsonlBody(rows), "utf8"); + archiveDayTx(d, start, endExclusive); + + days.push({ day: d, rowCount: rows.length, filePath }); + processed += 1; + d = nextUtcDay(d); + } + + return days; +} + export function createLogStore(dbPath: string): LogStore { mkdirSync(dirname(dbPath), { recursive: true }); @@ -186,6 +280,14 @@ export function createLogStore(dbPath: string): LogStore { "SELECT run_id, workflow, status, ts FROM workflow_runs WHERE workflow = ? ORDER BY ts DESC", ); + const minLogTsStmt = sqlite.prepare("SELECT MIN(ts) AS m FROM logs"); + const selectLogsForDayStmt = sqlite.prepare( + "SELECT id, source, type, ref_id, payload, ts FROM logs WHERE ts >= @start AND ts < @endExclusive ORDER BY id ASC", + ); + const deleteLogsForDayStmt = sqlite.prepare( + "DELETE FROM logs WHERE ts >= @start AND ts < @endExclusive", + ); + const upsertWorkflowRunTx = sqlite.transaction( (entry: Omit, run: WorkflowRun) => { const info = insertStmt.run({ @@ -358,6 +460,52 @@ export function createLogStore(dbPath: string): LogStore { return result; } + const archiveDayTx = sqlite.transaction((day: string, start: number, endExclusive: number) => { + deleteLogsForDayStmt.run({ start, endExclusive }); + setMetaStmt.run({ key: LOG_ARCHIVE_META_KEY, value: day }); + }); + + function readWatermark(): string | null { + const raw = getMeta(LOG_ARCHIVE_META_KEY); + if (raw === null) return null; + assertValidUtcDay(raw); + return raw; + } + + function firstLogUtcDay(): string | null { + const row = minLogTsStmt.get() as { m: number | null } | undefined; + const m = row?.m; + if (m === null || m === undefined) return null; + return utcDateStringFromMs(m); + } + + function archiveLogs(options: ArchiveLogsOptions = {}): ArchiveLogsResult { + const now = options.now ?? Date.now(); + const retentionMs = options.retentionMs ?? DEFAULT_LOG_RETENTION_MS; + const lastDay = lastArchivableUtcDay(now - retentionMs); + + const watermark = readWatermark(); + const minDay = firstLogUtcDay(); + if (minDay === null) { + return { days: [], vacuumed: runOptionalVacuum(sqlite, options.vacuum) }; + } + + const startDay = resolveArchiveStartDay(watermark, minDay); + if (compareIsoDays(startDay, lastDay) > 0) { + return { days: [], vacuumed: runOptionalVacuum(sqlite, options.vacuum) }; + } + + const days = runArchiveDayLoop( + dbPath, + options, + selectLogsForDayStmt, + archiveDayTx, + startDay, + lastDay, + ); + return { days, vacuumed: runOptionalVacuum(sqlite, options.vacuum) }; + } + function close(): void { sqlite.close(); } @@ -374,6 +522,7 @@ export function createLogStore(dbPath: string): LogStore { getAllWorkflowRuns, getTriggerPayload, getThreadEvents, + archiveLogs, close, }; }