Phase 7: Structured Logging System #15
@@ -0,0 +1,117 @@
|
||||
import { mkdtempSync, rmSync } from "node:fs";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
import type { NerveConfig, Signal } from "@uncaged/nerve-core";
|
||||
import { createLogStore } from "../log-store.js";
|
||||
import type { LogStore } from "../log-store.js";
|
||||
import { createReflexScheduler } from "../reflex-scheduler.js";
|
||||
import { createSignalBus } from "../signal-bus.js";
|
||||
|
||||
describe("LogStore + ReflexScheduler integration", () => {
|
||||
let tmpDir: string;
|
||||
let logStore: LogStore;
|
||||
|
||||
beforeEach(() => {
|
||||
tmpDir = mkdtempSync(join(tmpdir(), "nerve-log-int-"));
|
||||
logStore = createLogStore(join(tmpDir, "logs.db"));
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
logStore.close();
|
||||
rmSync(tmpDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it("logs run_start when reflex triggers a compute", () => {
|
||||
const config: NerveConfig = {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
|
||||
workflows: null,
|
||||
};
|
||||
const bus = createSignalBus();
|
||||
const triggered: string[] = [];
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name), {
|
||||
logStore,
|
||||
});
|
||||
|
||||
const signal: Signal = { id: 1, senseId: "cpu-usage", payload: 42, ts: Date.now() };
|
||||
bus.emit(signal);
|
||||
|
||||
const logs = logStore.query({ source: "reflex", type: "run_start" });
|
||||
expect(logs).toHaveLength(1);
|
||||
expect(logs[0].refId).toBe("cpu-usage");
|
||||
expect(triggered).toHaveLength(1);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
|
||||
it("interval reflex produces run_start logs", () => {
|
||||
vi.useFakeTimers();
|
||||
|
||||
const config: NerveConfig = {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 1000, on: null }],
|
||||
workflows: null,
|
||||
};
|
||||
const bus = createSignalBus();
|
||||
const ref: { scheduler: ReturnType<typeof createReflexScheduler> | null } = { scheduler: null };
|
||||
const scheduler = createReflexScheduler(
|
||||
config,
|
||||
bus,
|
||||
(name) => {
|
||||
ref.scheduler?.onComputeComplete(name);
|
||||
},
|
||||
{ logStore },
|
||||
);
|
||||
ref.scheduler = scheduler;
|
||||
|
||||
vi.advanceTimersByTime(3000);
|
||||
|
||||
const logs = logStore.query({ source: "reflex", type: "run_start" });
|
||||
expect(logs.length).toBeGreaterThanOrEqual(3);
|
||||
expect(logs.every((l) => l.refId === "cpu-usage")).toBe(true);
|
||||
|
||||
scheduler.stop();
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("logs cannot trigger reflexes (architectural constraint)", () => {
|
||||
const config: NerveConfig = {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
|
||||
workflows: null,
|
||||
};
|
||||
const bus = createSignalBus();
|
||||
const triggered: string[] = [];
|
||||
const scheduler = createReflexScheduler(
|
||||
config,
|
||||
bus,
|
||||
(name) => {
|
||||
triggered.push(name);
|
||||
scheduler.onComputeComplete(name);
|
||||
},
|
||||
{ logStore },
|
||||
);
|
||||
|
||||
logStore.append({
|
||||
source: "reflex",
|
||||
type: "run_complete",
|
||||
refId: "cpu-usage",
|
||||
payload: '{"v":99}',
|
||||
ts: Date.now(),
|
||||
});
|
||||
|
||||
// Writing to the log store should NOT trigger any reflex.
|
||||
// Only bus.emit(signal) triggers reflexes.
|
||||
expect(triggered).toHaveLength(0);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,214 @@
|
||||
import { mkdtempSync, rmSync } from "node:fs";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it } from "vitest";
|
||||
|
||||
import { createLogStore } from "../log-store.js";
|
||||
import type { LogStore } from "../log-store.js";
|
||||
|
||||
describe("LogStore", () => {
|
||||
let tmpDir: string;
|
||||
let store: LogStore;
|
||||
|
||||
beforeEach(() => {
|
||||
tmpDir = mkdtempSync(join(tmpdir(), "nerve-log-test-"));
|
||||
store = createLogStore(join(tmpDir, "data", "logs.db"));
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
store.close();
|
||||
rmSync(tmpDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
describe("append + query", () => {
|
||||
it("appends an entry and returns it with an id", () => {
|
||||
const entry = store.append({
|
||||
source: "system",
|
||||
type: "start",
|
||||
refId: null,
|
||||
payload: null,
|
||||
ts: 1000,
|
||||
});
|
||||
|
||||
expect(entry.id).toBe(1);
|
||||
expect(entry.source).toBe("system");
|
||||
expect(entry.type).toBe("start");
|
||||
});
|
||||
|
||||
it("auto-increments ids", () => {
|
||||
const e1 = store.append({
|
||||
source: "system",
|
||||
type: "start",
|
||||
refId: null,
|
||||
payload: null,
|
||||
ts: 1000,
|
||||
});
|
||||
const e2 = store.append({
|
||||
source: "system",
|
||||
type: "stop",
|
||||
refId: null,
|
||||
payload: null,
|
||||
ts: 2000,
|
||||
});
|
||||
|
||||
expect(e2.id).toBe((e1.id ?? 0) + 1);
|
||||
});
|
||||
|
||||
it("returns all entries when queried with no filter", () => {
|
||||
store.append({ source: "system", type: "start", refId: null, payload: null, ts: 1000 });
|
||||
store.append({ source: "reflex", type: "run_start", refId: "cpu", payload: null, ts: 2000 });
|
||||
store.append({
|
||||
source: "reflex",
|
||||
type: "run_complete",
|
||||
refId: "cpu",
|
||||
payload: '{"v":42}',
|
||||
ts: 3000,
|
||||
});
|
||||
|
||||
const all = store.query();
|
||||
expect(all).toHaveLength(3);
|
||||
});
|
||||
});
|
||||
|
||||
describe("query filters", () => {
|
||||
beforeEach(() => {
|
||||
store.append({ source: "system", type: "start", refId: null, payload: null, ts: 1000 });
|
||||
store.append({ source: "reflex", type: "run_start", refId: "cpu", payload: null, ts: 2000 });
|
||||
store.append({
|
||||
source: "reflex",
|
||||
type: "run_complete",
|
||||
refId: "cpu",
|
||||
payload: '{"v":42}',
|
||||
ts: 3000,
|
||||
});
|
||||
store.append({
|
||||
source: "system",
|
||||
type: "error",
|
||||
refId: "disk",
|
||||
payload: '{"error":"fail"}',
|
||||
ts: 4000,
|
||||
});
|
||||
store.append({ source: "system", type: "stop", refId: null, payload: null, ts: 5000 });
|
||||
});
|
||||
|
||||
it("filters by source", () => {
|
||||
const results = store.query({ source: "reflex" });
|
||||
expect(results).toHaveLength(2);
|
||||
expect(results.every((r) => r.source === "reflex")).toBe(true);
|
||||
});
|
||||
|
||||
it("filters by type", () => {
|
||||
const results = store.query({ type: "error" });
|
||||
expect(results).toHaveLength(1);
|
||||
expect(results[0].refId).toBe("disk");
|
||||
});
|
||||
|
||||
it("filters by refId", () => {
|
||||
const results = store.query({ refId: "cpu" });
|
||||
expect(results).toHaveLength(2);
|
||||
});
|
||||
|
||||
it("filters by since (inclusive)", () => {
|
||||
const results = store.query({ since: 3000 });
|
||||
expect(results).toHaveLength(3);
|
||||
expect(results[0].ts).toBe(3000);
|
||||
});
|
||||
|
||||
it("filters by until (inclusive)", () => {
|
||||
const results = store.query({ until: 2000 });
|
||||
expect(results).toHaveLength(2);
|
||||
});
|
||||
|
||||
it("filters by since + until range", () => {
|
||||
const results = store.query({ since: 2000, until: 4000 });
|
||||
expect(results).toHaveLength(3);
|
||||
});
|
||||
|
||||
it("applies limit", () => {
|
||||
const results = store.query({ limit: 2 });
|
||||
expect(results).toHaveLength(2);
|
||||
expect(results[0].type).toBe("start");
|
||||
expect(results[1].type).toBe("run_start");
|
||||
});
|
||||
|
||||
it("combines multiple filters", () => {
|
||||
const results = store.query({ source: "system", since: 4000 });
|
||||
expect(results).toHaveLength(2);
|
||||
expect(results[0].type).toBe("error");
|
||||
expect(results[1].type).toBe("stop");
|
||||
});
|
||||
|
||||
it("returns empty array when no matches", () => {
|
||||
const results = store.query({ source: "workflow" });
|
||||
expect(results).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe("query ordering", () => {
|
||||
it("returns entries in insertion order (ascending id)", () => {
|
||||
store.append({ source: "system", type: "start", refId: null, payload: null, ts: 5000 });
|
||||
store.append({ source: "reflex", type: "run_start", refId: "a", payload: null, ts: 1000 });
|
||||
|
||||
const all = store.query();
|
||||
expect(all[0].ts).toBe(5000);
|
||||
expect(all[1].ts).toBe(1000);
|
||||
});
|
||||
});
|
||||
|
||||
describe("meta table", () => {
|
||||
it("returns null for nonexistent key", () => {
|
||||
expect(store.getMeta("missing")).toBeNull();
|
||||
});
|
||||
|
||||
it("sets and gets a value", () => {
|
||||
store.setMeta("archived_up_to", "2026-03-22");
|
||||
expect(store.getMeta("archived_up_to")).toBe("2026-03-22");
|
||||
});
|
||||
|
||||
it("upserts on duplicate key", () => {
|
||||
store.setMeta("version", "1");
|
||||
store.setMeta("version", "2");
|
||||
expect(store.getMeta("version")).toBe("2");
|
||||
});
|
||||
|
||||
it("supports multiple keys", () => {
|
||||
store.setMeta("a", "1");
|
||||
store.setMeta("b", "2");
|
||||
expect(store.getMeta("a")).toBe("1");
|
||||
expect(store.getMeta("b")).toBe("2");
|
||||
});
|
||||
});
|
||||
|
||||
describe("append-only semantics", () => {
|
||||
it("ids are always increasing", () => {
|
||||
const entries = Array.from({ length: 10 }, (_, i) =>
|
||||
store.append({ source: "system", type: "test", refId: null, payload: null, ts: i }),
|
||||
);
|
||||
|
||||
for (let i = 1; i < entries.length; i++) {
|
||||
expect(entries[i].id).toBeGreaterThan(entries[i - 1].id ?? 0);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("payload JSON round-trip", () => {
|
||||
it("preserves JSON payload", () => {
|
||||
const payload = JSON.stringify({ cpu: 95, host: "node-1" });
|
||||
store.append({ source: "reflex", type: "run_complete", refId: "cpu", payload, ts: 1000 });
|
||||
|
||||
const results = store.query({ refId: "cpu" });
|
||||
expect(results).toHaveLength(1);
|
||||
expect(JSON.parse(results[0].payload ?? "null")).toEqual({ cpu: 95, host: "node-1" });
|
||||
});
|
||||
});
|
||||
|
||||
describe("creates parent directories", () => {
|
||||
it("creates nested directory structure for db path", () => {
|
||||
const deepPath = join(tmpDir, "a", "b", "c", "test.db");
|
||||
const deepStore = createLogStore(deepPath);
|
||||
deepStore.append({ source: "system", type: "start", refId: null, payload: null, ts: 1000 });
|
||||
expect(deepStore.query()).toHaveLength(1);
|
||||
deepStore.close();
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -27,3 +27,6 @@ export type { Kernel, KernelOptions, KernelHealth } from "./kernel.js";
|
||||
|
||||
export { createFileWatcher } from "./file-watcher.js";
|
||||
export type { FileWatcher, FileChange, FileChangeHandler } from "./file-watcher.js";
|
||||
|
||||
export { createLogStore } from "./log-store.js";
|
||||
export type { LogStore, LogEntry, LogQuery } from "./log-store.js";
|
||||
|
||||
@@ -25,6 +25,8 @@ import { createFileWatcher } from "./file-watcher.js";
|
||||
import type { FileWatcher } from "./file-watcher.js";
|
||||
import type { ComputeMessage, ShutdownMessage } from "./ipc.js";
|
||||
import { parseWorkerMessage } from "./ipc.js";
|
||||
import { createLogStore } from "./log-store.js";
|
||||
import type { LogStore } from "./log-store.js";
|
||||
import { createReflexScheduler } from "./reflex-scheduler.js";
|
||||
import type { ReflexScheduler } from "./reflex-scheduler.js";
|
||||
import { createSignalBus } from "./signal-bus.js";
|
||||
@@ -43,6 +45,7 @@ export type Kernel = {
|
||||
groups: Set<string>;
|
||||
senseCount: number;
|
||||
bus: SignalBus;
|
||||
logStore: LogStore;
|
||||
/** Resolves when all workers have sent their initial "ready" message. */
|
||||
ready: Promise<void>;
|
||||
/** Returns the PID of the worker process for a given group, or null if not found. */
|
||||
@@ -106,6 +109,8 @@ function groupForSense(config: NerveConfig, senseName: string): string | null {
|
||||
export type KernelOptions = {
|
||||
workerScript: string;
|
||||
enableFileWatcher?: boolean;
|
||||
/** Override the LogStore instance (useful for testing). */
|
||||
logStore?: LogStore;
|
||||
};
|
||||
|
||||
function defaultKernelOptions(): KernelOptions {
|
||||
@@ -120,6 +125,15 @@ export function createKernel(
|
||||
const bus: SignalBus = createSignalBus();
|
||||
const workerScript = options.workerScript;
|
||||
const startTime = Date.now();
|
||||
const logStore: LogStore = options.logStore ?? createLogStore(join(nerveRoot, "data", "logs.db"));
|
||||
|
||||
logStore.append({
|
||||
source: "system",
|
||||
type: "start",
|
||||
refId: null,
|
||||
payload: null,
|
||||
ts: startTime,
|
||||
});
|
||||
|
||||
let config = initialConfig;
|
||||
|
||||
@@ -168,6 +182,13 @@ export function createKernel(
|
||||
|
||||
if (msg.type === "error") {
|
||||
process.stderr.write(`[kernel] sense "${msg.sense}" error: ${msg.error}\n`);
|
||||
logStore.append({
|
||||
source: "system",
|
||||
type: "error",
|
||||
refId: msg.sense,
|
||||
payload: JSON.stringify({ error: msg.error }),
|
||||
ts: Date.now(),
|
||||
});
|
||||
scheduler.onComputeComplete(msg.sense);
|
||||
return;
|
||||
}
|
||||
@@ -179,6 +200,13 @@ export function createKernel(
|
||||
payload: msg.payload,
|
||||
ts: Date.now(),
|
||||
};
|
||||
logStore.append({
|
||||
source: "reflex",
|
||||
type: "run_complete",
|
||||
refId: msg.sense,
|
||||
payload: JSON.stringify(msg.payload),
|
||||
ts: signal.ts,
|
||||
});
|
||||
bus.emit(signal);
|
||||
scheduler.onComputeComplete(msg.sense);
|
||||
}
|
||||
@@ -239,7 +267,7 @@ export function createKernel(
|
||||
sendCompute(entry.process, senseName);
|
||||
}
|
||||
|
||||
scheduler = createReflexScheduler(config, bus, triggerFn);
|
||||
scheduler = createReflexScheduler(config, bus, triggerFn, { logStore });
|
||||
|
||||
if (groups.size === 0) {
|
||||
readyResolve?.();
|
||||
@@ -322,7 +350,7 @@ export function createKernel(
|
||||
// Note: pending/throttled computes in the old scheduler are silently dropped here.
|
||||
// In-flight state is not preserved across reloadConfig.
|
||||
scheduler.stop();
|
||||
scheduler = createReflexScheduler(config, bus, triggerFn);
|
||||
scheduler = createReflexScheduler(config, bus, triggerFn, { logStore });
|
||||
const newGroups = collectGroups(newConfig);
|
||||
removeStaleGroups(oldGroups, newGroups);
|
||||
addNewGroups(oldGroups, newGroups);
|
||||
@@ -360,6 +388,13 @@ export function createKernel(
|
||||
process.stderr.write(
|
||||
`[kernel] sense file changed: "${senseName}", restarting group "${sc.group}"\n`,
|
||||
);
|
||||
logStore.append({
|
||||
source: "system",
|
||||
type: "sense_reload",
|
||||
refId: senseName,
|
||||
payload: null,
|
||||
ts: Date.now(),
|
||||
});
|
||||
restartGroup(sc.group).catch((e) => {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`[kernel] restartGroup error: ${msg}\n`);
|
||||
@@ -368,6 +403,13 @@ export function createKernel(
|
||||
|
||||
function handleConfigFileChange(): void {
|
||||
process.stderr.write("[kernel] nerve.yaml changed, reloading config\n");
|
||||
logStore.append({
|
||||
source: "system",
|
||||
type: "config_reload",
|
||||
refId: null,
|
||||
payload: null,
|
||||
ts: Date.now(),
|
||||
});
|
||||
try {
|
||||
const raw = readFileSync(join(nerveRoot, "nerve.yaml"), "utf8");
|
||||
const parseResult = parseNerveConfig(raw);
|
||||
@@ -405,6 +447,14 @@ export function createKernel(
|
||||
exitPromises.push(waitForExit(entry.process, 5000));
|
||||
}
|
||||
await Promise.all(exitPromises);
|
||||
logStore.append({
|
||||
source: "system",
|
||||
type: "stop",
|
||||
refId: null,
|
||||
payload: null,
|
||||
ts: Date.now(),
|
||||
});
|
||||
logStore.close();
|
||||
}
|
||||
|
||||
function getWorkerPid(group: string): number | null {
|
||||
@@ -418,6 +468,7 @@ export function createKernel(
|
||||
groups,
|
||||
senseCount,
|
||||
bus,
|
||||
logStore,
|
||||
ready,
|
||||
getWorkerPid,
|
||||
triggerCompute: triggerFn,
|
||||
|
||||
@@ -0,0 +1,150 @@
|
||||
/**
|
||||
* Log Store — append-only structured log storage backed by SQLite.
|
||||
*
|
||||
* Stores system, reflex, and workflow log entries in a single table.
|
||||
* Logs are data assets for audit/analysis — they MUST NOT trigger reflexes.
|
||||
*
|
||||
* Also provides a `meta` key-value table for bookkeeping (e.g. archive watermarks).
|
||||
*/
|
||||
|
||||
import { mkdirSync } from "node:fs";
|
||||
import { dirname } from "node:path";
|
||||
import Database from "better-sqlite3";
|
||||
import type BetterSqlite3 from "better-sqlite3";
|
||||
|
||||
export type LogEntry = {
|
||||
id?: number;
|
||||
source: string;
|
||||
type: string;
|
||||
refId: string | null;
|
||||
payload: string | null;
|
||||
ts: number;
|
||||
};
|
||||
|
||||
export type LogQuery = {
|
||||
source?: string;
|
||||
type?: string;
|
||||
refId?: string;
|
||||
since?: number;
|
||||
until?: number;
|
||||
limit?: number;
|
||||
};
|
||||
|
||||
export type LogStore = {
|
||||
append: (entry: Omit<LogEntry, "id">) => LogEntry;
|
||||
query: (filter?: LogQuery) => LogEntry[];
|
||||
getMeta: (key: string) => string | null;
|
||||
setMeta: (key: string, value: string) => void;
|
||||
close: () => void;
|
||||
};
|
||||
|
||||
const SCHEMA_SQL = `
|
||||
CREATE TABLE IF NOT EXISTS logs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
source TEXT NOT NULL,
|
||||
type TEXT NOT NULL,
|
||||
ref_id TEXT,
|
||||
payload TEXT,
|
||||
ts INTEGER NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_logs_source_type ON logs(source, type);
|
||||
CREATE INDEX IF NOT EXISTS idx_logs_ts ON logs(ts);
|
||||
CREATE INDEX IF NOT EXISTS idx_logs_ref_id ON logs(ref_id);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS meta (
|
||||
key TEXT PRIMARY KEY,
|
||||
value TEXT NOT NULL
|
||||
);
|
||||
`;
|
||||
|
||||
export function createLogStore(dbPath: string): LogStore {
|
||||
mkdirSync(dirname(dbPath), { recursive: true });
|
||||
|
||||
const sqlite: BetterSqlite3.Database = new Database(dbPath);
|
||||
sqlite.pragma("journal_mode = WAL");
|
||||
sqlite.exec(SCHEMA_SQL);
|
||||
|
||||
const insertStmt = sqlite.prepare(
|
||||
"INSERT INTO logs (source, type, ref_id, payload, ts) VALUES (@source, @type, @refId, @payload, @ts)",
|
||||
);
|
||||
|
||||
const getMetaStmt = sqlite.prepare("SELECT value FROM meta WHERE key = ?");
|
||||
const setMetaStmt = sqlite.prepare(
|
||||
"INSERT INTO meta (key, value) VALUES (@key, @value) ON CONFLICT(key) DO UPDATE SET value = @value",
|
||||
);
|
||||
|
||||
function append(entry: Omit<LogEntry, "id">): LogEntry {
|
||||
const info = insertStmt.run({
|
||||
source: entry.source,
|
||||
type: entry.type,
|
||||
refId: entry.refId,
|
||||
payload: entry.payload,
|
||||
ts: entry.ts,
|
||||
});
|
||||
return { ...entry, id: Number(info.lastInsertRowid) };
|
||||
}
|
||||
|
||||
function query(filter: LogQuery = {}): LogEntry[] {
|
||||
const conditions: string[] = [];
|
||||
const params: Record<string, unknown> = {};
|
||||
|
||||
if (filter.source !== undefined) {
|
||||
conditions.push("source = @source");
|
||||
params.source = filter.source;
|
||||
}
|
||||
if (filter.type !== undefined) {
|
||||
conditions.push("type = @type");
|
||||
params.type = filter.type;
|
||||
}
|
||||
if (filter.refId !== undefined) {
|
||||
conditions.push("ref_id = @refId");
|
||||
params.refId = filter.refId;
|
||||
}
|
||||
if (filter.since !== undefined) {
|
||||
conditions.push("ts >= @since");
|
||||
params.since = filter.since;
|
||||
}
|
||||
if (filter.until !== undefined) {
|
||||
conditions.push("ts <= @until");
|
||||
params.until = filter.until;
|
||||
}
|
||||
|
||||
const where = conditions.length > 0 ? `WHERE ${conditions.join(" AND ")}` : "";
|
||||
const limit = filter.limit !== undefined ? `LIMIT ${filter.limit}` : "";
|
||||
const sql = `SELECT id, source, type, ref_id, payload, ts FROM logs ${where} ORDER BY id ASC ${limit}`;
|
||||
|
||||
const rows = sqlite.prepare(sql).all(params) as Array<{
|
||||
id: number;
|
||||
source: string;
|
||||
type: string;
|
||||
ref_id: string | null;
|
||||
payload: string | null;
|
||||
ts: number;
|
||||
}>;
|
||||
|
||||
return rows.map((r) => ({
|
||||
id: r.id,
|
||||
source: r.source,
|
||||
type: r.type,
|
||||
refId: r.ref_id,
|
||||
payload: r.payload,
|
||||
ts: r.ts,
|
||||
}));
|
||||
}
|
||||
|
||||
function getMeta(key: string): string | null {
|
||||
const row = getMetaStmt.get(key) as { value: string } | undefined;
|
||||
return row?.value ?? null;
|
||||
}
|
||||
|
||||
function setMeta(key: string, value: string): void {
|
||||
setMetaStmt.run({ key, value });
|
||||
}
|
||||
|
||||
function close(): void {
|
||||
sqlite.close();
|
||||
}
|
||||
|
||||
return { append, query, getMeta, setMeta, close };
|
||||
}
|
||||
@@ -10,6 +10,7 @@
|
||||
*/
|
||||
|
||||
import type { NerveConfig } from "@uncaged/nerve-core";
|
||||
import type { LogStore } from "./log-store.js";
|
||||
import type { SignalBus, Unsubscribe } from "./signal-bus.js";
|
||||
|
||||
/** Sends a compute message to the worker responsible for the given sense. */
|
||||
@@ -34,19 +35,26 @@ function makeSenseState(): SenseState {
|
||||
return { lastComputeAt: 0, inFlight: false, pending: false, deferredTimer: null };
|
||||
}
|
||||
|
||||
export type ReflexSchedulerOptions = {
|
||||
logStore?: LogStore;
|
||||
};
|
||||
|
||||
/**
|
||||
* Create and start a reflex scheduler.
|
||||
*
|
||||
* @param config Full NerveConfig (reads senses for throttle/timeout, reflexes for schedule).
|
||||
* @param bus SignalBus to subscribe for event-driven reflexes.
|
||||
* @param triggerFn Called with the sense name when a compute should be dispatched.
|
||||
* @param opts Optional: logStore for structured logging.
|
||||
* @returns ReflexScheduler with stop() and onComputeComplete() methods.
|
||||
*/
|
||||
export function createReflexScheduler(
|
||||
config: NerveConfig,
|
||||
bus: SignalBus,
|
||||
triggerFn: TriggerFn,
|
||||
opts?: ReflexSchedulerOptions,
|
||||
): ReflexScheduler {
|
||||
const logStore = opts?.logStore;
|
||||
const intervals: ReturnType<typeof setInterval>[] = [];
|
||||
const unsubscribers: Unsubscribe[] = [];
|
||||
const states = new Map<string, SenseState>();
|
||||
@@ -65,6 +73,13 @@ export function createReflexScheduler(
|
||||
state.inFlight = true;
|
||||
state.pending = false;
|
||||
state.lastComputeAt = Date.now();
|
||||
logStore?.append({
|
||||
source: "reflex",
|
||||
type: "run_start",
|
||||
refId: senseName,
|
||||
payload: null,
|
||||
ts: Date.now(),
|
||||
});
|
||||
triggerFn(senseName);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user