diff --git a/packages/daemon/src/__tests__/log-store-integration.test.ts b/packages/daemon/src/__tests__/log-store-integration.test.ts new file mode 100644 index 0000000..57f10e9 --- /dev/null +++ b/packages/daemon/src/__tests__/log-store-integration.test.ts @@ -0,0 +1,117 @@ +import { mkdtempSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import type { NerveConfig, Signal } from "@uncaged/nerve-core"; +import { createLogStore } from "../log-store.js"; +import type { LogStore } from "../log-store.js"; +import { createReflexScheduler } from "../reflex-scheduler.js"; +import { createSignalBus } from "../signal-bus.js"; + +describe("LogStore + ReflexScheduler integration", () => { + let tmpDir: string; + let logStore: LogStore; + + beforeEach(() => { + tmpDir = mkdtempSync(join(tmpdir(), "nerve-log-int-")); + logStore = createLogStore(join(tmpDir, "logs.db")); + }); + + afterEach(() => { + logStore.close(); + rmSync(tmpDir, { recursive: true, force: true }); + }); + + it("logs run_start when reflex triggers a compute", () => { + const config: NerveConfig = { + senses: { + "cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + }, + reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }], + workflows: null, + }; + const bus = createSignalBus(); + const triggered: string[] = []; + const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name), { + logStore, + }); + + const signal: Signal = { id: 1, senseId: "cpu-usage", payload: 42, ts: Date.now() }; + bus.emit(signal); + + const logs = logStore.query({ source: "reflex", type: "run_start" }); + expect(logs).toHaveLength(1); + expect(logs[0].refId).toBe("cpu-usage"); + expect(triggered).toHaveLength(1); + + scheduler.stop(); + }); + + it("interval reflex produces run_start logs", () => { + vi.useFakeTimers(); + + const config: NerveConfig = { + senses: { + "cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + }, + reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 1000, on: null }], + workflows: null, + }; + const bus = createSignalBus(); + const ref: { scheduler: ReturnType | null } = { scheduler: null }; + const scheduler = createReflexScheduler( + config, + bus, + (name) => { + ref.scheduler?.onComputeComplete(name); + }, + { logStore }, + ); + ref.scheduler = scheduler; + + vi.advanceTimersByTime(3000); + + const logs = logStore.query({ source: "reflex", type: "run_start" }); + expect(logs.length).toBeGreaterThanOrEqual(3); + expect(logs.every((l) => l.refId === "cpu-usage")).toBe(true); + + scheduler.stop(); + vi.useRealTimers(); + }); + + it("logs cannot trigger reflexes (architectural constraint)", () => { + const config: NerveConfig = { + senses: { + "cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + }, + reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }], + workflows: null, + }; + const bus = createSignalBus(); + const triggered: string[] = []; + const scheduler = createReflexScheduler( + config, + bus, + (name) => { + triggered.push(name); + scheduler.onComputeComplete(name); + }, + { logStore }, + ); + + logStore.append({ + source: "reflex", + type: "run_complete", + refId: "cpu-usage", + payload: '{"v":99}', + ts: Date.now(), + }); + + // Writing to the log store should NOT trigger any reflex. + // Only bus.emit(signal) triggers reflexes. + expect(triggered).toHaveLength(0); + + scheduler.stop(); + }); +}); diff --git a/packages/daemon/src/__tests__/log-store.test.ts b/packages/daemon/src/__tests__/log-store.test.ts new file mode 100644 index 0000000..7e75b72 --- /dev/null +++ b/packages/daemon/src/__tests__/log-store.test.ts @@ -0,0 +1,214 @@ +import { mkdtempSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; + +import { createLogStore } from "../log-store.js"; +import type { LogStore } from "../log-store.js"; + +describe("LogStore", () => { + let tmpDir: string; + let store: LogStore; + + beforeEach(() => { + tmpDir = mkdtempSync(join(tmpdir(), "nerve-log-test-")); + store = createLogStore(join(tmpDir, "data", "logs.db")); + }); + + afterEach(() => { + store.close(); + rmSync(tmpDir, { recursive: true, force: true }); + }); + + describe("append + query", () => { + it("appends an entry and returns it with an id", () => { + const entry = store.append({ + source: "system", + type: "start", + refId: null, + payload: null, + ts: 1000, + }); + + expect(entry.id).toBe(1); + expect(entry.source).toBe("system"); + expect(entry.type).toBe("start"); + }); + + it("auto-increments ids", () => { + const e1 = store.append({ + source: "system", + type: "start", + refId: null, + payload: null, + ts: 1000, + }); + const e2 = store.append({ + source: "system", + type: "stop", + refId: null, + payload: null, + ts: 2000, + }); + + expect(e2.id).toBe((e1.id ?? 0) + 1); + }); + + it("returns all entries when queried with no filter", () => { + store.append({ source: "system", type: "start", refId: null, payload: null, ts: 1000 }); + store.append({ source: "reflex", type: "run_start", refId: "cpu", payload: null, ts: 2000 }); + store.append({ + source: "reflex", + type: "run_complete", + refId: "cpu", + payload: '{"v":42}', + ts: 3000, + }); + + const all = store.query(); + expect(all).toHaveLength(3); + }); + }); + + describe("query filters", () => { + beforeEach(() => { + store.append({ source: "system", type: "start", refId: null, payload: null, ts: 1000 }); + store.append({ source: "reflex", type: "run_start", refId: "cpu", payload: null, ts: 2000 }); + store.append({ + source: "reflex", + type: "run_complete", + refId: "cpu", + payload: '{"v":42}', + ts: 3000, + }); + store.append({ + source: "system", + type: "error", + refId: "disk", + payload: '{"error":"fail"}', + ts: 4000, + }); + store.append({ source: "system", type: "stop", refId: null, payload: null, ts: 5000 }); + }); + + it("filters by source", () => { + const results = store.query({ source: "reflex" }); + expect(results).toHaveLength(2); + expect(results.every((r) => r.source === "reflex")).toBe(true); + }); + + it("filters by type", () => { + const results = store.query({ type: "error" }); + expect(results).toHaveLength(1); + expect(results[0].refId).toBe("disk"); + }); + + it("filters by refId", () => { + const results = store.query({ refId: "cpu" }); + expect(results).toHaveLength(2); + }); + + it("filters by since (inclusive)", () => { + const results = store.query({ since: 3000 }); + expect(results).toHaveLength(3); + expect(results[0].ts).toBe(3000); + }); + + it("filters by until (inclusive)", () => { + const results = store.query({ until: 2000 }); + expect(results).toHaveLength(2); + }); + + it("filters by since + until range", () => { + const results = store.query({ since: 2000, until: 4000 }); + expect(results).toHaveLength(3); + }); + + it("applies limit", () => { + const results = store.query({ limit: 2 }); + expect(results).toHaveLength(2); + expect(results[0].type).toBe("start"); + expect(results[1].type).toBe("run_start"); + }); + + it("combines multiple filters", () => { + const results = store.query({ source: "system", since: 4000 }); + expect(results).toHaveLength(2); + expect(results[0].type).toBe("error"); + expect(results[1].type).toBe("stop"); + }); + + it("returns empty array when no matches", () => { + const results = store.query({ source: "workflow" }); + expect(results).toHaveLength(0); + }); + }); + + describe("query ordering", () => { + it("returns entries in insertion order (ascending id)", () => { + store.append({ source: "system", type: "start", refId: null, payload: null, ts: 5000 }); + store.append({ source: "reflex", type: "run_start", refId: "a", payload: null, ts: 1000 }); + + const all = store.query(); + expect(all[0].ts).toBe(5000); + expect(all[1].ts).toBe(1000); + }); + }); + + describe("meta table", () => { + it("returns null for nonexistent key", () => { + expect(store.getMeta("missing")).toBeNull(); + }); + + it("sets and gets a value", () => { + store.setMeta("archived_up_to", "2026-03-22"); + expect(store.getMeta("archived_up_to")).toBe("2026-03-22"); + }); + + it("upserts on duplicate key", () => { + store.setMeta("version", "1"); + store.setMeta("version", "2"); + expect(store.getMeta("version")).toBe("2"); + }); + + it("supports multiple keys", () => { + store.setMeta("a", "1"); + store.setMeta("b", "2"); + expect(store.getMeta("a")).toBe("1"); + expect(store.getMeta("b")).toBe("2"); + }); + }); + + describe("append-only semantics", () => { + it("ids are always increasing", () => { + const entries = Array.from({ length: 10 }, (_, i) => + store.append({ source: "system", type: "test", refId: null, payload: null, ts: i }), + ); + + for (let i = 1; i < entries.length; i++) { + expect(entries[i].id).toBeGreaterThan(entries[i - 1].id ?? 0); + } + }); + }); + + describe("payload JSON round-trip", () => { + it("preserves JSON payload", () => { + const payload = JSON.stringify({ cpu: 95, host: "node-1" }); + store.append({ source: "reflex", type: "run_complete", refId: "cpu", payload, ts: 1000 }); + + const results = store.query({ refId: "cpu" }); + expect(results).toHaveLength(1); + expect(JSON.parse(results[0].payload ?? "null")).toEqual({ cpu: 95, host: "node-1" }); + }); + }); + + describe("creates parent directories", () => { + it("creates nested directory structure for db path", () => { + const deepPath = join(tmpDir, "a", "b", "c", "test.db"); + const deepStore = createLogStore(deepPath); + deepStore.append({ source: "system", type: "start", refId: null, payload: null, ts: 1000 }); + expect(deepStore.query()).toHaveLength(1); + deepStore.close(); + }); + }); +}); diff --git a/packages/daemon/src/index.ts b/packages/daemon/src/index.ts index 86f4bb6..dba5af2 100644 --- a/packages/daemon/src/index.ts +++ b/packages/daemon/src/index.ts @@ -27,3 +27,6 @@ export type { Kernel, KernelOptions, KernelHealth } from "./kernel.js"; export { createFileWatcher } from "./file-watcher.js"; export type { FileWatcher, FileChange, FileChangeHandler } from "./file-watcher.js"; + +export { createLogStore } from "./log-store.js"; +export type { LogStore, LogEntry, LogQuery } from "./log-store.js"; diff --git a/packages/daemon/src/kernel.ts b/packages/daemon/src/kernel.ts index 2a854f7..6ef4d04 100644 --- a/packages/daemon/src/kernel.ts +++ b/packages/daemon/src/kernel.ts @@ -25,6 +25,8 @@ import { createFileWatcher } from "./file-watcher.js"; import type { FileWatcher } from "./file-watcher.js"; import type { ComputeMessage, ShutdownMessage } from "./ipc.js"; import { parseWorkerMessage } from "./ipc.js"; +import { createLogStore } from "./log-store.js"; +import type { LogStore } from "./log-store.js"; import { createReflexScheduler } from "./reflex-scheduler.js"; import type { ReflexScheduler } from "./reflex-scheduler.js"; import { createSignalBus } from "./signal-bus.js"; @@ -43,6 +45,7 @@ export type Kernel = { groups: Set; senseCount: number; bus: SignalBus; + logStore: LogStore; /** Resolves when all workers have sent their initial "ready" message. */ ready: Promise; /** Returns the PID of the worker process for a given group, or null if not found. */ @@ -106,6 +109,8 @@ function groupForSense(config: NerveConfig, senseName: string): string | null { export type KernelOptions = { workerScript: string; enableFileWatcher?: boolean; + /** Override the LogStore instance (useful for testing). */ + logStore?: LogStore; }; function defaultKernelOptions(): KernelOptions { @@ -120,6 +125,15 @@ export function createKernel( const bus: SignalBus = createSignalBus(); const workerScript = options.workerScript; const startTime = Date.now(); + const logStore: LogStore = options.logStore ?? createLogStore(join(nerveRoot, "data", "logs.db")); + + logStore.append({ + source: "system", + type: "start", + refId: null, + payload: null, + ts: startTime, + }); let config = initialConfig; @@ -168,6 +182,13 @@ export function createKernel( if (msg.type === "error") { process.stderr.write(`[kernel] sense "${msg.sense}" error: ${msg.error}\n`); + logStore.append({ + source: "system", + type: "error", + refId: msg.sense, + payload: JSON.stringify({ error: msg.error }), + ts: Date.now(), + }); scheduler.onComputeComplete(msg.sense); return; } @@ -179,6 +200,13 @@ export function createKernel( payload: msg.payload, ts: Date.now(), }; + logStore.append({ + source: "reflex", + type: "run_complete", + refId: msg.sense, + payload: JSON.stringify(msg.payload), + ts: signal.ts, + }); bus.emit(signal); scheduler.onComputeComplete(msg.sense); } @@ -239,7 +267,7 @@ export function createKernel( sendCompute(entry.process, senseName); } - scheduler = createReflexScheduler(config, bus, triggerFn); + scheduler = createReflexScheduler(config, bus, triggerFn, { logStore }); if (groups.size === 0) { readyResolve?.(); @@ -322,7 +350,7 @@ export function createKernel( // Note: pending/throttled computes in the old scheduler are silently dropped here. // In-flight state is not preserved across reloadConfig. scheduler.stop(); - scheduler = createReflexScheduler(config, bus, triggerFn); + scheduler = createReflexScheduler(config, bus, triggerFn, { logStore }); const newGroups = collectGroups(newConfig); removeStaleGroups(oldGroups, newGroups); addNewGroups(oldGroups, newGroups); @@ -360,6 +388,13 @@ export function createKernel( process.stderr.write( `[kernel] sense file changed: "${senseName}", restarting group "${sc.group}"\n`, ); + logStore.append({ + source: "system", + type: "sense_reload", + refId: senseName, + payload: null, + ts: Date.now(), + }); restartGroup(sc.group).catch((e) => { const msg = e instanceof Error ? e.message : String(e); process.stderr.write(`[kernel] restartGroup error: ${msg}\n`); @@ -368,6 +403,13 @@ export function createKernel( function handleConfigFileChange(): void { process.stderr.write("[kernel] nerve.yaml changed, reloading config\n"); + logStore.append({ + source: "system", + type: "config_reload", + refId: null, + payload: null, + ts: Date.now(), + }); try { const raw = readFileSync(join(nerveRoot, "nerve.yaml"), "utf8"); const parseResult = parseNerveConfig(raw); @@ -405,6 +447,14 @@ export function createKernel( exitPromises.push(waitForExit(entry.process, 5000)); } await Promise.all(exitPromises); + logStore.append({ + source: "system", + type: "stop", + refId: null, + payload: null, + ts: Date.now(), + }); + logStore.close(); } function getWorkerPid(group: string): number | null { @@ -418,6 +468,7 @@ export function createKernel( groups, senseCount, bus, + logStore, ready, getWorkerPid, triggerCompute: triggerFn, diff --git a/packages/daemon/src/log-store.ts b/packages/daemon/src/log-store.ts new file mode 100644 index 0000000..9ed3c3a --- /dev/null +++ b/packages/daemon/src/log-store.ts @@ -0,0 +1,150 @@ +/** + * Log Store — append-only structured log storage backed by SQLite. + * + * Stores system, reflex, and workflow log entries in a single table. + * Logs are data assets for audit/analysis — they MUST NOT trigger reflexes. + * + * Also provides a `meta` key-value table for bookkeeping (e.g. archive watermarks). + */ + +import { mkdirSync } from "node:fs"; +import { dirname } from "node:path"; +import Database from "better-sqlite3"; +import type BetterSqlite3 from "better-sqlite3"; + +export type LogEntry = { + id?: number; + source: string; + type: string; + refId: string | null; + payload: string | null; + ts: number; +}; + +export type LogQuery = { + source?: string; + type?: string; + refId?: string; + since?: number; + until?: number; + limit?: number; +}; + +export type LogStore = { + append: (entry: Omit) => LogEntry; + query: (filter?: LogQuery) => LogEntry[]; + getMeta: (key: string) => string | null; + setMeta: (key: string, value: string) => void; + close: () => void; +}; + +const SCHEMA_SQL = ` +CREATE TABLE IF NOT EXISTS logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + source TEXT NOT NULL, + type TEXT NOT NULL, + ref_id TEXT, + payload TEXT, + ts INTEGER NOT NULL +); + +CREATE INDEX IF NOT EXISTS idx_logs_source_type ON logs(source, type); +CREATE INDEX IF NOT EXISTS idx_logs_ts ON logs(ts); +CREATE INDEX IF NOT EXISTS idx_logs_ref_id ON logs(ref_id); + +CREATE TABLE IF NOT EXISTS meta ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL +); +`; + +export function createLogStore(dbPath: string): LogStore { + mkdirSync(dirname(dbPath), { recursive: true }); + + const sqlite: BetterSqlite3.Database = new Database(dbPath); + sqlite.pragma("journal_mode = WAL"); + sqlite.exec(SCHEMA_SQL); + + const insertStmt = sqlite.prepare( + "INSERT INTO logs (source, type, ref_id, payload, ts) VALUES (@source, @type, @refId, @payload, @ts)", + ); + + const getMetaStmt = sqlite.prepare("SELECT value FROM meta WHERE key = ?"); + const setMetaStmt = sqlite.prepare( + "INSERT INTO meta (key, value) VALUES (@key, @value) ON CONFLICT(key) DO UPDATE SET value = @value", + ); + + function append(entry: Omit): LogEntry { + const info = insertStmt.run({ + source: entry.source, + type: entry.type, + refId: entry.refId, + payload: entry.payload, + ts: entry.ts, + }); + return { ...entry, id: Number(info.lastInsertRowid) }; + } + + function query(filter: LogQuery = {}): LogEntry[] { + const conditions: string[] = []; + const params: Record = {}; + + if (filter.source !== undefined) { + conditions.push("source = @source"); + params.source = filter.source; + } + if (filter.type !== undefined) { + conditions.push("type = @type"); + params.type = filter.type; + } + if (filter.refId !== undefined) { + conditions.push("ref_id = @refId"); + params.refId = filter.refId; + } + if (filter.since !== undefined) { + conditions.push("ts >= @since"); + params.since = filter.since; + } + if (filter.until !== undefined) { + conditions.push("ts <= @until"); + params.until = filter.until; + } + + const where = conditions.length > 0 ? `WHERE ${conditions.join(" AND ")}` : ""; + const limit = filter.limit !== undefined ? `LIMIT ${filter.limit}` : ""; + const sql = `SELECT id, source, type, ref_id, payload, ts FROM logs ${where} ORDER BY id ASC ${limit}`; + + const rows = sqlite.prepare(sql).all(params) as Array<{ + id: number; + source: string; + type: string; + ref_id: string | null; + payload: string | null; + ts: number; + }>; + + return rows.map((r) => ({ + id: r.id, + source: r.source, + type: r.type, + refId: r.ref_id, + payload: r.payload, + ts: r.ts, + })); + } + + function getMeta(key: string): string | null { + const row = getMetaStmt.get(key) as { value: string } | undefined; + return row?.value ?? null; + } + + function setMeta(key: string, value: string): void { + setMetaStmt.run({ key, value }); + } + + function close(): void { + sqlite.close(); + } + + return { append, query, getMeta, setMeta, close }; +} diff --git a/packages/daemon/src/reflex-scheduler.ts b/packages/daemon/src/reflex-scheduler.ts index 327876e..d1c416d 100644 --- a/packages/daemon/src/reflex-scheduler.ts +++ b/packages/daemon/src/reflex-scheduler.ts @@ -10,6 +10,7 @@ */ import type { NerveConfig } from "@uncaged/nerve-core"; +import type { LogStore } from "./log-store.js"; import type { SignalBus, Unsubscribe } from "./signal-bus.js"; /** Sends a compute message to the worker responsible for the given sense. */ @@ -34,19 +35,26 @@ function makeSenseState(): SenseState { return { lastComputeAt: 0, inFlight: false, pending: false, deferredTimer: null }; } +export type ReflexSchedulerOptions = { + logStore?: LogStore; +}; + /** * Create and start a reflex scheduler. * * @param config Full NerveConfig (reads senses for throttle/timeout, reflexes for schedule). * @param bus SignalBus to subscribe for event-driven reflexes. * @param triggerFn Called with the sense name when a compute should be dispatched. + * @param opts Optional: logStore for structured logging. * @returns ReflexScheduler with stop() and onComputeComplete() methods. */ export function createReflexScheduler( config: NerveConfig, bus: SignalBus, triggerFn: TriggerFn, + opts?: ReflexSchedulerOptions, ): ReflexScheduler { + const logStore = opts?.logStore; const intervals: ReturnType[] = []; const unsubscribers: Unsubscribe[] = []; const states = new Map(); @@ -65,6 +73,13 @@ export function createReflexScheduler( state.inFlight = true; state.pending = false; state.lastComputeAt = Date.now(); + logStore?.append({ + source: "reflex", + type: "run_start", + refId: senseName, + payload: null, + ts: Date.now(), + }); triggerFn(senseName); }