Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 8c9adf08c5 | |||
| 08e8020cb6 | |||
| e287e07dab | |||
| 239dfffb28 |
@@ -20,6 +20,7 @@
|
||||
"test": "vitest run"
|
||||
},
|
||||
"dependencies": {
|
||||
"drizzle-orm": "1.0.0-beta.23-c10d10c",
|
||||
"yaml": "^2.8.3"
|
||||
},
|
||||
"devDependencies": {
|
||||
|
||||
@@ -12,6 +12,7 @@ export type {
|
||||
ComputeResult,
|
||||
} from "./config.js";
|
||||
export type { Signal, SenseInfo } from "./sense.js";
|
||||
export type { SenseComputeFn, SenseModule } from "./sense-contract.js";
|
||||
export { labelSenseTrigger, senseTriggerLabels } from "./sense-trigger-labels.js";
|
||||
export type {
|
||||
WorkflowMessage,
|
||||
|
||||
@@ -0,0 +1,20 @@
|
||||
import type { SQLiteTable } from "drizzle-orm/sqlite-core";
|
||||
|
||||
/**
|
||||
* The function signature every sense `src/index.ts` must export as a named
|
||||
* `compute` export.
|
||||
*
|
||||
* Pure: receives only an AbortSignal. No DB, no peers.
|
||||
* Return `null` to stay silent, or a value `T` to emit a Signal.
|
||||
* The runtime handles persistence via `db.insert(table).values(result)`.
|
||||
*/
|
||||
export type SenseComputeFn<T = unknown> = (signal: AbortSignal) => Promise<T | null>;
|
||||
|
||||
/**
|
||||
* The full shape a sense module (`src/index.ts`) must export.
|
||||
* `compute` provides the data; `table` tells the runtime where to persist it.
|
||||
*/
|
||||
export type SenseModule<T = unknown> = {
|
||||
compute: SenseComputeFn<T>;
|
||||
table: SQLiteTable;
|
||||
};
|
||||
@@ -7,10 +7,9 @@ import { drizzle } from "drizzle-orm/node-sqlite";
|
||||
import { integer, real, sqliteTable } from "drizzle-orm/sqlite-core";
|
||||
import { describe, expect, it } from "vitest";
|
||||
|
||||
import { createBlobStore } from "@uncaged/nerve-store";
|
||||
import { parseParentMessage } from "../ipc.js";
|
||||
import { executeCompute, openPeerDb, openSenseDb, runMigrations } from "../sense-runtime.js";
|
||||
import type { ComputeFn, DrizzleDB, PeerMap, SenseRuntime } from "../sense-runtime.js";
|
||||
import { executeCompute, openSenseDb, runMigrations } from "../sense-runtime.js";
|
||||
import type { DrizzleDB, SenseRuntime } from "../sense-runtime.js";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
@@ -151,76 +150,50 @@ describe("openSenseDb", () => {
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// openPeerDb
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("openPeerDb", () => {
|
||||
it("opens an existing db in read-only mode", () => {
|
||||
// Create a writable db first
|
||||
const dbPath = makeTempDbPath();
|
||||
const sqlite = new DatabaseSync(dbPath);
|
||||
sqlite.exec(INIT_SQL);
|
||||
sqlite.prepare("INSERT INTO samples (ts, value) VALUES (1, 42.0)").run();
|
||||
sqlite.close();
|
||||
|
||||
const result = openPeerDb(dbPath);
|
||||
expect(result.ok).toBe(true);
|
||||
if (!result.ok) return;
|
||||
|
||||
// Should be able to read
|
||||
const peerDb = result.value;
|
||||
const rows = peerDb.select().from(samples).all();
|
||||
expect(rows).toHaveLength(1);
|
||||
expect(rows[0].value).toBe(42.0);
|
||||
});
|
||||
|
||||
it("returns err when db path does not exist", () => {
|
||||
const result = openPeerDb("/nonexistent/path/to/peer.db");
|
||||
expect(result.ok).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// executeCompute
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("executeCompute", () => {
|
||||
function makeRuntime(computeFn: ComputeFn): {
|
||||
runtime: SenseRuntime;
|
||||
sqlite: DatabaseSync;
|
||||
} {
|
||||
const sqlite = new DatabaseSync(":memory:");
|
||||
sqlite.exec(INIT_SQL);
|
||||
const db = drizzle({ client: sqlite }) as DrizzleDB;
|
||||
function makeRuntime(
|
||||
computeFn: SenseRuntime["compute"],
|
||||
sqlite?: DatabaseSync,
|
||||
): { runtime: SenseRuntime; sqlite: DatabaseSync } {
|
||||
const db_sqlite = sqlite ?? new DatabaseSync(":memory:");
|
||||
if (!sqlite) db_sqlite.exec(INIT_SQL);
|
||||
const db = drizzle({ client: db_sqlite }) as DrizzleDB;
|
||||
return {
|
||||
runtime: { name: "test-sense", db, compute: computeFn, persistSignal: () => {} },
|
||||
sqlite,
|
||||
runtime: {
|
||||
name: "test-sense",
|
||||
db,
|
||||
compute: computeFn,
|
||||
table: samples,
|
||||
persistSignal: () => {},
|
||||
},
|
||||
sqlite: db_sqlite,
|
||||
};
|
||||
}
|
||||
|
||||
const emptyPeers: PeerMap = {};
|
||||
it("returns non-null and inserts into table when compute returns data", async () => {
|
||||
const { runtime, sqlite } = makeRuntime(async () => ({
|
||||
ts: 1000,
|
||||
value: 0.5,
|
||||
}));
|
||||
|
||||
it("returns the compute result when compute returns a non-null value", async () => {
|
||||
const { runtime, sqlite } = makeRuntime(async (db) => {
|
||||
await db.insert(samples).values({ ts: Date.now(), value: 0.5 });
|
||||
return { signal: 0.5, workflow: null };
|
||||
});
|
||||
|
||||
const result = await executeCompute(runtime, emptyPeers);
|
||||
const result = await executeCompute(runtime);
|
||||
expect(result.ok).toBe(true);
|
||||
if (!result.ok) return;
|
||||
expect(result.value).toEqual({ signal: 0.5, workflow: null });
|
||||
expect(result.value).toEqual({ ts: 1000, value: 0.5 });
|
||||
|
||||
const rows = sqlite.prepare("SELECT * FROM samples").all();
|
||||
expect(rows).toHaveLength(1);
|
||||
sqlite.close();
|
||||
});
|
||||
|
||||
it("returns null (no signal) when compute returns null", async () => {
|
||||
it("returns null and does not insert when compute returns null", async () => {
|
||||
const { runtime, sqlite } = makeRuntime(async () => null);
|
||||
|
||||
const result = await executeCompute(runtime, emptyPeers);
|
||||
const result = await executeCompute(runtime);
|
||||
expect(result.ok).toBe(true);
|
||||
if (!result.ok) return;
|
||||
expect(result.value).toBeNull();
|
||||
@@ -235,60 +208,14 @@ describe("executeCompute", () => {
|
||||
throw new Error("something went wrong");
|
||||
});
|
||||
|
||||
const result = await executeCompute(runtime, emptyPeers);
|
||||
const result = await executeCompute(runtime);
|
||||
expect(result.ok).toBe(false);
|
||||
if (result.ok) return;
|
||||
expect(result.error.message).toContain("something went wrong");
|
||||
sqlite.close();
|
||||
});
|
||||
|
||||
it("compute can read from peers", async () => {
|
||||
// Set up a peer db with data
|
||||
const peerSqlite = new DatabaseSync(":memory:");
|
||||
peerSqlite.exec(INIT_SQL);
|
||||
peerSqlite.prepare("INSERT INTO samples (ts, value) VALUES (100, 3.14)").run();
|
||||
const peerDb = drizzle({ client: peerSqlite }) as DrizzleDB;
|
||||
|
||||
const peers: PeerMap = { "other-sense": peerDb };
|
||||
|
||||
const { runtime, sqlite } = makeRuntime(async (_db, p) => {
|
||||
const rows = await p["other-sense"].select().from(samples).all();
|
||||
return rows.length > 0 ? { signal: rows[0].value, workflow: null } : null;
|
||||
});
|
||||
|
||||
const result = await executeCompute(runtime, peers);
|
||||
expect(result.ok).toBe(true);
|
||||
if (!result.ok) return;
|
||||
expect(result.value).toEqual({ signal: 3.14, workflow: null });
|
||||
|
||||
peerSqlite.close();
|
||||
sqlite.close();
|
||||
});
|
||||
|
||||
it("write to own db does not affect peer db (isolation)", async () => {
|
||||
const peerSqlite = new DatabaseSync(":memory:");
|
||||
peerSqlite.exec(INIT_SQL);
|
||||
const peerDb = drizzle({ client: peerSqlite }) as DrizzleDB;
|
||||
const peers: PeerMap = { "peer-sense": peerDb };
|
||||
|
||||
const { runtime, sqlite } = makeRuntime(async (db) => {
|
||||
await db.insert(samples).values({ ts: 999, value: 9.9 });
|
||||
return { signal: 9.9, workflow: null };
|
||||
});
|
||||
|
||||
await executeCompute(runtime, peers);
|
||||
|
||||
const peerRows = peerSqlite.prepare("SELECT * FROM samples").all();
|
||||
expect(peerRows).toHaveLength(0);
|
||||
|
||||
const ownRows = sqlite.prepare("SELECT * FROM samples").all();
|
||||
expect(ownRows).toHaveLength(1);
|
||||
|
||||
peerSqlite.close();
|
||||
sqlite.close();
|
||||
});
|
||||
|
||||
it("inserts correctly into the sense db directory path", async () => {
|
||||
it("inserts correctly into the sense db from openSenseDb", async () => {
|
||||
const dbPath = makeTempDbPath();
|
||||
const migrationsDir = makeTempMigrationsDir(INIT_SQL);
|
||||
const dbResult = openSenseDb(dbPath, migrationsDir);
|
||||
@@ -301,14 +228,12 @@ describe("executeCompute", () => {
|
||||
const runtime: SenseRuntime = {
|
||||
name: "cpu-usage",
|
||||
db,
|
||||
compute: async (d) => {
|
||||
await d.insert(samples).values({ ts: 1000, value: 1.23 });
|
||||
return { signal: 1.23, workflow: null };
|
||||
},
|
||||
compute: async () => ({ ts: 1000, value: 1.23 }),
|
||||
table: samples,
|
||||
persistSignal: () => {},
|
||||
};
|
||||
|
||||
const result = await executeCompute(runtime, {});
|
||||
const result = await executeCompute(runtime);
|
||||
expect(result.ok).toBe(true);
|
||||
|
||||
const rows = dbSqlite.prepare("SELECT * FROM samples").all() as Array<{
|
||||
@@ -323,17 +248,17 @@ describe("executeCompute", () => {
|
||||
|
||||
it("returns err when compute exceeds timeoutMs", async () => {
|
||||
const { runtime, sqlite } = makeRuntime(
|
||||
(_db, _peers, options) =>
|
||||
(signal) =>
|
||||
new Promise<null>((resolve, reject) => {
|
||||
const t = setTimeout(() => resolve(null), 5_000);
|
||||
options?.signal.addEventListener("abort", () => {
|
||||
signal.addEventListener("abort", () => {
|
||||
clearTimeout(t);
|
||||
reject(new Error("aborted"));
|
||||
});
|
||||
}),
|
||||
);
|
||||
|
||||
const result = await executeCompute(runtime, emptyPeers, 50);
|
||||
const result = await executeCompute(runtime, 50);
|
||||
expect(result.ok).toBe(false);
|
||||
if (result.ok) return;
|
||||
expect(result.error.message).toMatch(/timed out/i);
|
||||
@@ -341,39 +266,25 @@ describe("executeCompute", () => {
|
||||
});
|
||||
|
||||
it("completes within timeout when compute is fast", async () => {
|
||||
const { runtime, sqlite } = makeRuntime(async () => ({ signal: 42, workflow: null }));
|
||||
const result = await executeCompute(runtime, emptyPeers, 5_000);
|
||||
const { runtime, sqlite } = makeRuntime(async () => ({ ts: 1, value: 42 }));
|
||||
const result = await executeCompute(runtime, 5_000);
|
||||
expect(result.ok).toBe(true);
|
||||
if (!result.ok) return;
|
||||
expect(result.value).toEqual({ signal: 42, workflow: null });
|
||||
expect(result.value).toEqual({ ts: 1, value: 42 });
|
||||
sqlite.close();
|
||||
});
|
||||
|
||||
it("passes AbortSignal to compute fn", async () => {
|
||||
let capturedSignal: AbortSignal | undefined;
|
||||
const { runtime, sqlite } = makeRuntime(async (_db, _peers, options) => {
|
||||
capturedSignal = options?.signal;
|
||||
const { runtime, sqlite } = makeRuntime(async (signal) => {
|
||||
capturedSignal = signal;
|
||||
return null;
|
||||
});
|
||||
|
||||
await executeCompute(runtime, emptyPeers, 1_000);
|
||||
await executeCompute(runtime, 1_000);
|
||||
expect(capturedSignal).toBeInstanceOf(AbortSignal);
|
||||
sqlite.close();
|
||||
});
|
||||
|
||||
it("passes BlobStore as options.blobs when blobStore argument is provided", async () => {
|
||||
const blobsRoot = mkdtempSync(join(tmpdir(), "nerve-blobs-"));
|
||||
const blobStore = createBlobStore(blobsRoot);
|
||||
let seen: ReturnType<typeof createBlobStore> | undefined;
|
||||
const { runtime, sqlite } = makeRuntime(async (_db, _peers, options) => {
|
||||
seen = options?.blobs;
|
||||
return null;
|
||||
});
|
||||
|
||||
await executeCompute(runtime, emptyPeers, undefined, blobStore);
|
||||
expect(seen).toBe(blobStore);
|
||||
sqlite.close();
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -429,19 +340,14 @@ describe("runMigrations journal", () => {
|
||||
const first = runMigrations(sqlite, dir);
|
||||
expect(first.ok).toBe(true);
|
||||
|
||||
// Insert a row so we can verify second run doesn't fail on CREATE TABLE
|
||||
sqlite.exec("INSERT INTO samples (ts, value) VALUES (1, 1.0)");
|
||||
|
||||
// Run again — migration must NOT re-run (would fail without IF NOT EXISTS but
|
||||
// the journal prevents it even for non-idempotent SQL)
|
||||
const nonIdempotentSql = "CREATE TABLE samples2 (id INTEGER PRIMARY KEY)";
|
||||
writeFileSync(join(dir, "0002_samples2.sql"), nonIdempotentSql);
|
||||
|
||||
// First time: creates samples2
|
||||
const second = runMigrations(sqlite, dir);
|
||||
expect(second.ok).toBe(true);
|
||||
|
||||
// Second time: 0002 already in journal, must not re-run
|
||||
const third = runMigrations(sqlite, dir);
|
||||
expect(third.ok).toBe(true);
|
||||
|
||||
|
||||
@@ -4,43 +4,20 @@ import { DatabaseSync } from "node:sqlite";
|
||||
|
||||
import { drizzle } from "drizzle-orm/node-sqlite";
|
||||
import type { NodeSQLiteDatabase } from "drizzle-orm/node-sqlite";
|
||||
import type { SQLiteTable } from "drizzle-orm/sqlite-core";
|
||||
|
||||
import type { ComputeResult, Result } from "@uncaged/nerve-core";
|
||||
import type { Result, SenseComputeFn } from "@uncaged/nerve-core";
|
||||
import { DEFAULT_SENSE_SIGNAL_RETENTION, err, isPlainRecord, ok } from "@uncaged/nerve-core";
|
||||
|
||||
import type { BlobStore } from "@uncaged/nerve-store";
|
||||
|
||||
/** A Drizzle DB instance (schema-generic) */
|
||||
export type DrizzleDB = NodeSQLiteDatabase<Record<string, never>>;
|
||||
|
||||
/** Read-only map of peer sense name → their Drizzle DB */
|
||||
export type PeerMap = Readonly<Record<string, DrizzleDB>>;
|
||||
|
||||
/** Options passed to a compute function */
|
||||
export type ComputeOptions = {
|
||||
signal: AbortSignal;
|
||||
/** CAS under `data/blobs/`; injected by the sense worker when available. */
|
||||
blobs?: BlobStore;
|
||||
};
|
||||
|
||||
/**
|
||||
* The shape every sense's index.ts must export.
|
||||
* Engine injects `db` (read-write), `peers` (read-only), and `options`
|
||||
* (`signal`, and `blobs` when running in the sense worker — RFC-001 §8 CAS).
|
||||
* Returns a structured result when a signal should be emitted (and optionally a workflow),
|
||||
* or null for silence.
|
||||
*/
|
||||
export type ComputeFn<T = unknown> = (
|
||||
db: DrizzleDB,
|
||||
peers: PeerMap,
|
||||
options?: ComputeOptions,
|
||||
) => Promise<ComputeResult<T>>;
|
||||
|
||||
/** All state held for one sense inside a worker */
|
||||
export type SenseRuntime = {
|
||||
name: string;
|
||||
db: DrizzleDB;
|
||||
compute: ComputeFn;
|
||||
compute: SenseComputeFn;
|
||||
table: SQLiteTable;
|
||||
persistSignal: (payload: unknown) => void;
|
||||
};
|
||||
|
||||
@@ -126,13 +103,13 @@ export function runMigrations(sqlite: DatabaseSync, migrationsDir: string): Resu
|
||||
return ok(undefined);
|
||||
}
|
||||
|
||||
/** Run `_signals` row prune after this many inserts (amortize DELETE cost). */
|
||||
const SIGNAL_INSERTS_PER_PRUNE = 100;
|
||||
|
||||
/**
|
||||
* Open (or create) the SQLite file at `dbPath`, run all migrations in
|
||||
* `migrationsDir`, and wrap with Drizzle ORM.
|
||||
*/
|
||||
/** Run `_signals` row prune after this many inserts (amortize DELETE cost). */
|
||||
const SIGNAL_INSERTS_PER_PRUNE = 100;
|
||||
|
||||
export function openSenseDb(
|
||||
dbPath: string,
|
||||
migrationsDir: string,
|
||||
@@ -184,27 +161,12 @@ export function openSenseDb(
|
||||
}
|
||||
|
||||
/**
|
||||
* Open a peer sense DB in read-only mode (no migrations).
|
||||
* Dynamically import the compute function and table from a sense's index.ts/js.
|
||||
* The module must export a named `compute` function and a named `table` (SQLiteTable).
|
||||
*/
|
||||
export function openPeerDb(dbPath: string): Result<DrizzleDB> {
|
||||
let sqlite: DatabaseSync;
|
||||
|
||||
try {
|
||||
sqlite = new DatabaseSync(dbPath, { readOnly: true });
|
||||
} catch (e) {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
return err(new Error(`Failed to open peer database "${dbPath}" (readonly): ${msg}`));
|
||||
}
|
||||
|
||||
// Same schema-agnostic Drizzle wrapper as openSenseDb.
|
||||
return ok(drizzle({ client: sqlite }) as DrizzleDB);
|
||||
}
|
||||
|
||||
/**
|
||||
* Dynamically import the compute function from a sense's index.ts/js.
|
||||
* The module must export a named `compute` function.
|
||||
*/
|
||||
export async function loadComputeFn(senseIndexPath: string): Promise<Result<ComputeFn>> {
|
||||
export async function loadSenseModule(
|
||||
senseIndexPath: string,
|
||||
): Promise<Result<{ compute: SenseComputeFn; table: SQLiteTable }>> {
|
||||
let mod: unknown;
|
||||
|
||||
try {
|
||||
@@ -221,26 +183,32 @@ export async function loadComputeFn(senseIndexPath: string): Promise<Result<Comp
|
||||
);
|
||||
}
|
||||
|
||||
return ok(mod.compute as ComputeFn);
|
||||
if (!("table" in mod) || mod.table === null || typeof mod.table !== "object") {
|
||||
return err(
|
||||
new Error(
|
||||
`Sense module "${senseIndexPath}" must export a named "table" (drizzle SQLiteTable)`,
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
return ok({
|
||||
compute: mod.compute as SenseComputeFn,
|
||||
table: mod.table as SQLiteTable,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a sense's compute function with an optional soft timeout.
|
||||
* If timeoutMs is provided and compute takes longer, the AbortSignal is
|
||||
* triggered and an error Result is returned.
|
||||
* When `blobStore` is set, it is exposed as `options.blobs` (see RFC-001 §8).
|
||||
* When compute returns non-null, the result is persisted to the sense's table
|
||||
* via `db.insert(table).values(result)`.
|
||||
*/
|
||||
export async function executeCompute(
|
||||
runtime: SenseRuntime,
|
||||
peers: PeerMap,
|
||||
timeoutMs?: number,
|
||||
blobStore?: BlobStore,
|
||||
): Promise<Result<ComputeResult<unknown>>> {
|
||||
): Promise<Result<unknown | null>> {
|
||||
const controller = new AbortController();
|
||||
const options: ComputeOptions =
|
||||
blobStore !== undefined
|
||||
? { signal: controller.signal, blobs: blobStore }
|
||||
: { signal: controller.signal };
|
||||
|
||||
let timer: ReturnType<typeof setTimeout> | undefined;
|
||||
const timeoutPromise =
|
||||
@@ -254,10 +222,16 @@ export async function executeCompute(
|
||||
: null;
|
||||
|
||||
try {
|
||||
const computePromise = runtime.compute(runtime.db, peers, options);
|
||||
const computePromise = runtime.compute(controller.signal);
|
||||
const result = timeoutPromise
|
||||
? await Promise.race([computePromise, timeoutPromise])
|
||||
: await computePromise;
|
||||
|
||||
if (result !== null) {
|
||||
// Cast required: DrizzleDB is schema-agnostic; the sense module guarantees shape compatibility.
|
||||
await runtime.db.insert(runtime.table).values(result as Record<string, unknown>);
|
||||
}
|
||||
|
||||
return ok(result);
|
||||
} catch (e) {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
|
||||
@@ -3,14 +3,13 @@
|
||||
*
|
||||
* Entry point for `nerve worker sense --group <name>`.
|
||||
* Receives the group name via CLI args, reads nerve.yaml, initialises one
|
||||
* SenseRuntime per sense in the group, builds peer read-only connections,
|
||||
* then signals ready and enters the IPC event loop.
|
||||
* SenseRuntime per sense in the group, then signals ready and enters the
|
||||
* IPC event loop.
|
||||
*
|
||||
* Layout assumptions (nerve user config at `~/.uncaged-nerve/`):
|
||||
* senses/<name>/index.js ← compiled compute
|
||||
* senses/<name>/migrations/ ← SQL migration files
|
||||
* data/senses/<name>.db ← SQLite data file
|
||||
* data/blobs/<aa>/<hashrest> ← CAS (sha256), via options.blobs in compute
|
||||
* nerve.yaml ← config
|
||||
*/
|
||||
|
||||
@@ -22,11 +21,10 @@ import { join, resolve } from "node:path";
|
||||
import { parseNerveConfig, routeSenseComputeOutput } from "@uncaged/nerve-core";
|
||||
import type { NerveConfig } from "@uncaged/nerve-core";
|
||||
|
||||
import { createBlobStore } from "@uncaged/nerve-store";
|
||||
import type { WorkerToParentMessage } from "./ipc.js";
|
||||
import { parseParentMessage } from "./ipc.js";
|
||||
import { executeCompute, loadComputeFn, openPeerDb, openSenseDb } from "./sense-runtime.js";
|
||||
import type { DrizzleDB, PeerMap, SenseRuntime } from "./sense-runtime.js";
|
||||
import { executeCompute, loadSenseModule, openSenseDb } from "./sense-runtime.js";
|
||||
import type { SenseRuntime } from "./sense-runtime.js";
|
||||
import { ignoreSessionBroadcastSignals } from "./worker-fork-support.js";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -52,7 +50,7 @@ function sendError(sense: string, error: string): void {
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Initialisation helpers (each extracted to keep bootstrap complexity low)
|
||||
// Initialisation helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function readConfig(nerveRoot: string): NerveConfig {
|
||||
@@ -78,7 +76,7 @@ async function initSense(
|
||||
nerveRoot: string,
|
||||
senseName: string,
|
||||
retention: number,
|
||||
): Promise<{ db: DrizzleDB; runtime: SenseRuntime }> {
|
||||
): Promise<SenseRuntime> {
|
||||
const dbPath = join(nerveRoot, "data", "senses", `${senseName}.db`);
|
||||
const migrationsDir = join(nerveRoot, "senses", senseName, "migrations");
|
||||
const senseIndexPath = resolve(join(nerveRoot, "senses", senseName, "index.js"));
|
||||
@@ -88,55 +86,20 @@ async function initSense(
|
||||
throw new Error(`Failed to init DB for "${senseName}": ${dbResult.error.message}`);
|
||||
}
|
||||
|
||||
const computeResult = await loadComputeFn(senseIndexPath);
|
||||
if (!computeResult.ok) {
|
||||
throw new Error(`Failed to load compute for "${senseName}": ${computeResult.error.message}`);
|
||||
const moduleResult = await loadSenseModule(senseIndexPath);
|
||||
if (!moduleResult.ok) {
|
||||
throw new Error(`Failed to load module for "${senseName}": ${moduleResult.error.message}`);
|
||||
}
|
||||
|
||||
const { db } = dbResult.value;
|
||||
return {
|
||||
db,
|
||||
runtime: {
|
||||
name: senseName,
|
||||
db,
|
||||
compute: computeResult.value,
|
||||
persistSignal: dbResult.value.persistSignal,
|
||||
},
|
||||
name: senseName,
|
||||
db: dbResult.value.db,
|
||||
compute: moduleResult.value.compute,
|
||||
table: moduleResult.value.table,
|
||||
persistSignal: dbResult.value.persistSignal,
|
||||
};
|
||||
}
|
||||
|
||||
function buildPeers(
|
||||
nerveRoot: string,
|
||||
allSenseNames: string[],
|
||||
ownDbs: Map<string, DrizzleDB>,
|
||||
groupSenseNames: Set<string>,
|
||||
): PeerMap {
|
||||
const entries: [string, DrizzleDB][] = [];
|
||||
|
||||
for (const peerName of allSenseNames) {
|
||||
// Exclude senses that belong to this worker's own group — they are not peers
|
||||
if (groupSenseNames.has(peerName)) continue;
|
||||
|
||||
const own = ownDbs.get(peerName);
|
||||
if (own !== undefined) {
|
||||
entries.push([peerName, own]);
|
||||
continue;
|
||||
}
|
||||
|
||||
const peerDbPath = join(nerveRoot, "data", "senses", `${peerName}.db`);
|
||||
const peerResult = openPeerDb(peerDbPath);
|
||||
if (!peerResult.ok) {
|
||||
process.stderr.write(
|
||||
`[sense-worker] Warning: could not open peer DB for "${peerName}": ${peerResult.error.message}\n`,
|
||||
);
|
||||
continue;
|
||||
}
|
||||
entries.push([peerName, peerResult.value]);
|
||||
}
|
||||
|
||||
return Object.fromEntries(entries);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Grace period: hard kill after soft timeout
|
||||
//
|
||||
@@ -173,13 +136,11 @@ function clearGracePeriodTimer(senseName: string): void {
|
||||
async function runCompute(
|
||||
senseName: string,
|
||||
runtime: SenseRuntime,
|
||||
peers: PeerMap,
|
||||
timeoutMs: number,
|
||||
gracePeriodMs: number | null,
|
||||
blobStore: ReturnType<typeof createBlobStore>,
|
||||
): Promise<void> {
|
||||
try {
|
||||
const result = await executeCompute(runtime, peers, timeoutMs, blobStore);
|
||||
const result = await executeCompute(runtime, timeoutMs);
|
||||
if (!result.ok) {
|
||||
sendError(senseName, result.error.message);
|
||||
if (gracePeriodMs !== null && result.error.message.includes("timed out")) {
|
||||
@@ -210,11 +171,9 @@ async function runCompute(
|
||||
function handleMessage(
|
||||
raw: unknown,
|
||||
runtimes: Map<string, SenseRuntime>,
|
||||
peers: PeerMap,
|
||||
group: string,
|
||||
senseConfigs: Map<string, { timeout: number | null; gracePeriod: number | null }>,
|
||||
inFlight: Map<string, Promise<void>>,
|
||||
blobStore: ReturnType<typeof createBlobStore>,
|
||||
): void {
|
||||
const parseResult = parseParentMessage(raw);
|
||||
if (!parseResult.ok) {
|
||||
@@ -245,14 +204,13 @@ function handleMessage(
|
||||
return;
|
||||
}
|
||||
|
||||
// Look up timeout/gracePeriod per-sense at compute time (RFC §5.3: these are per-sense)
|
||||
const sc = senseConfigs.get(msg.sense);
|
||||
const timeoutMs = sc?.timeout ?? DEFAULT_TIMEOUT_MS;
|
||||
const gracePeriodMs = sc?.gracePeriod ?? null;
|
||||
|
||||
const previous = inFlight.get(msg.sense) ?? Promise.resolve();
|
||||
const next = previous
|
||||
.then(() => runCompute(msg.sense, runtime, peers, timeoutMs, gracePeriodMs, blobStore))
|
||||
.then(() => runCompute(msg.sense, runtime, timeoutMs, gracePeriodMs))
|
||||
.catch((e: unknown) => {
|
||||
const errMsg = e instanceof Error ? e.message : String(e);
|
||||
sendError(msg.sense, errMsg);
|
||||
@@ -279,14 +237,12 @@ async function bootstrap(nerveRoot: string, group: string): Promise<void> {
|
||||
}
|
||||
|
||||
const runtimes = new Map<string, SenseRuntime>();
|
||||
const ownDbs = new Map<string, DrizzleDB>();
|
||||
const failedSenses: string[] = [];
|
||||
|
||||
for (const senseName of groupSenses) {
|
||||
try {
|
||||
const retention = config.senses[senseName].retention;
|
||||
const { db, runtime } = await initSense(nerveRoot, senseName, retention);
|
||||
ownDbs.set(senseName, db);
|
||||
const runtime = await initSense(nerveRoot, senseName, retention);
|
||||
runtimes.set(senseName, runtime);
|
||||
} catch (e: unknown) {
|
||||
const eMsg = e instanceof Error ? e.message : String(e);
|
||||
@@ -297,16 +253,11 @@ async function bootstrap(nerveRoot: string, group: string): Promise<void> {
|
||||
}
|
||||
}
|
||||
|
||||
// If ALL senses failed, exit with error so kernel respawns
|
||||
if (runtimes.size === 0) {
|
||||
process.stderr.write(`[sense-worker] All senses in group "${group}" failed to load, exiting\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const groupSenseNames = new Set(groupSenses);
|
||||
const peers = buildPeers(nerveRoot, Object.keys(config.senses), ownDbs, groupSenseNames);
|
||||
|
||||
// Build per-sense timeout/gracePeriod map (RFC §5.3: these are per-sense, not per-group)
|
||||
const senseConfigs = new Map<string, { timeout: number | null; gracePeriod: number | null }>();
|
||||
for (const senseName of groupSenses) {
|
||||
const sc = config.senses[senseName];
|
||||
@@ -317,12 +268,11 @@ async function bootstrap(nerveRoot: string, group: string): Promise<void> {
|
||||
}
|
||||
|
||||
const inFlight = new Map<string, Promise<void>>();
|
||||
const blobStore = createBlobStore(join(nerveRoot, "data", "blobs"));
|
||||
|
||||
sendReady();
|
||||
|
||||
process.on("message", (raw: unknown) => {
|
||||
handleMessage(raw, runtimes, peers, group, senseConfigs, inFlight, blobStore);
|
||||
handleMessage(raw, runtimes, group, senseConfigs, inFlight);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
Generated
+4
@@ -86,6 +86,9 @@ importers:
|
||||
|
||||
packages/core:
|
||||
dependencies:
|
||||
drizzle-orm:
|
||||
specifier: 1.0.0-beta.23-c10d10c
|
||||
version: 1.0.0-beta.23-c10d10c(@cloudflare/workers-types@4.20260425.1)(@types/better-sqlite3@7.6.13)(@types/mssql@9.1.11(@azure/core-client@1.10.1))(better-sqlite3@11.10.0)(mssql@11.0.1(@azure/core-client@1.10.1))(sql.js@1.14.1)(zod@4.3.6)
|
||||
yaml:
|
||||
specifier: ^2.8.3
|
||||
version: 2.8.3
|
||||
@@ -2033,6 +2036,7 @@ packages:
|
||||
|
||||
uuid@8.3.2:
|
||||
resolution: {integrity: sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==}
|
||||
deprecated: uuid@10 and below is no longer supported. For ESM codebases, update to uuid@latest. For CommonJS codebases, use uuid@11 (but be aware this version will likely be deprecated in 2028).
|
||||
hasBin: true
|
||||
|
||||
vite@8.0.9:
|
||||
|
||||
Reference in New Issue
Block a user