Compare commits

...

2 Commits

Author SHA1 Message Date
xiaoju 8c9adf08c5 refactor: pure sense compute — no db, no peers
SenseComputeFn is now (signal: AbortSignal) => Promise<T | null>.
sense-runtime handles db.insert when compute returns non-null.
Senses export { compute, table } — SenseModule type added to core.

Closes #264
Refs #260

— 小橘 🍊(NEKO Team)
2026-04-30 00:07:49 +00:00
scottwei 08e8020cb6 Merge pull request 'feat: add sense contract types to nerve-core' (#263) from feat/sense-contract into main
Reviewed-on: #263
2026-04-29 23:44:01 +00:00
7 changed files with 112 additions and 308 deletions
+1
View File
@@ -20,6 +20,7 @@
"test": "vitest run"
},
"dependencies": {
"drizzle-orm": "1.0.0-beta.23-c10d10c",
"yaml": "^2.8.3"
},
"devDependencies": {
+1 -6
View File
@@ -12,12 +12,7 @@ export type {
ComputeResult,
} from "./config.js";
export type { Signal, SenseInfo } from "./sense.js";
export type {
SenseBlobStore,
SenseComputeOptions,
SensePeerMap,
SenseComputeFn,
} from "./sense-contract.js";
export type { SenseComputeFn, SenseModule } from "./sense-contract.js";
export { labelSenseTrigger, senseTriggerLabels } from "./sense-trigger-labels.js";
export type {
WorkflowMessage,
+14 -40
View File
@@ -1,46 +1,20 @@
import type { ComputeResult } from "./config.js";
/**
* Minimal read/write/exists interface for the CAS blob store injected into
* sense compute functions (RFC-001 §8). Matches BlobStore from @uncaged/nerve-store.
*/
export type SenseBlobStore = {
write: (content: string | Uint8Array | Buffer) => string;
read: (hash: string) => Buffer | null;
exists: (hash: string) => boolean;
};
/**
* Options injected by the engine into every compute invocation.
* `signal` is always present; `blobs` is only available when running
* inside the sense worker (RFC-001 §8).
*/
export type SenseComputeOptions = {
signal: AbortSignal;
blobs: SenseBlobStore | null;
};
/**
* Read-only map of peer sense name → their Drizzle DB.
* `TDb` defaults to `unknown` so callers that don't need peer queries can
* leave it unspecified.
*/
export type SensePeerMap<TDb = unknown> = Readonly<Record<string, TDb>>;
import type { SQLiteTable } from "drizzle-orm/sqlite-core";
/**
* The function signature every sense `src/index.ts` must export as a named
* `compute` export.
*
* - `db` — the sense's own Drizzle DB instance (read-write).
* - `peers` — read-only map of peer sense name → Drizzle DB.
* - `options` — injected options; `options.blobs` is available inside the
* sense worker; `options.signal` carries the AbortSignal.
*
* Return `null` to stay silent, or `{ signal, workflow }` to emit a Signal
* (and optionally trigger a Workflow).
* Pure: receives only an AbortSignal. No DB, no peers.
* Return `null` to stay silent, or a value `T` to emit a Signal.
* The runtime handles persistence via `db.insert(table).values(result)`.
*/
export type SenseComputeFn<T = unknown, TDb = unknown> = (
db: TDb,
peers: SensePeerMap<TDb>,
options: SenseComputeOptions,
) => Promise<ComputeResult<T>>;
export type SenseComputeFn<T = unknown> = (signal: AbortSignal) => Promise<T | null>;
/**
* The full shape a sense module (`src/index.ts`) must export.
* `compute` provides the data; `table` tells the runtime where to persist it.
*/
export type SenseModule<T = unknown> = {
compute: SenseComputeFn<T>;
table: SQLiteTable;
};
@@ -7,10 +7,9 @@ import { drizzle } from "drizzle-orm/node-sqlite";
import { integer, real, sqliteTable } from "drizzle-orm/sqlite-core";
import { describe, expect, it } from "vitest";
import { createBlobStore } from "@uncaged/nerve-store";
import { parseParentMessage } from "../ipc.js";
import { executeCompute, openPeerDb, openSenseDb, runMigrations } from "../sense-runtime.js";
import type { ComputeFn, DrizzleDB, PeerMap, SenseRuntime } from "../sense-runtime.js";
import { executeCompute, openSenseDb, runMigrations } from "../sense-runtime.js";
import type { DrizzleDB, SenseRuntime } from "../sense-runtime.js";
// ---------------------------------------------------------------------------
// Helpers
@@ -151,76 +150,50 @@ describe("openSenseDb", () => {
});
});
// ---------------------------------------------------------------------------
// openPeerDb
// ---------------------------------------------------------------------------
describe("openPeerDb", () => {
it("opens an existing db in read-only mode", () => {
// Create a writable db first
const dbPath = makeTempDbPath();
const sqlite = new DatabaseSync(dbPath);
sqlite.exec(INIT_SQL);
sqlite.prepare("INSERT INTO samples (ts, value) VALUES (1, 42.0)").run();
sqlite.close();
const result = openPeerDb(dbPath);
expect(result.ok).toBe(true);
if (!result.ok) return;
// Should be able to read
const peerDb = result.value;
const rows = peerDb.select().from(samples).all();
expect(rows).toHaveLength(1);
expect(rows[0].value).toBe(42.0);
});
it("returns err when db path does not exist", () => {
const result = openPeerDb("/nonexistent/path/to/peer.db");
expect(result.ok).toBe(false);
});
});
// ---------------------------------------------------------------------------
// executeCompute
// ---------------------------------------------------------------------------
describe("executeCompute", () => {
function makeRuntime(computeFn: ComputeFn): {
runtime: SenseRuntime;
sqlite: DatabaseSync;
} {
const sqlite = new DatabaseSync(":memory:");
sqlite.exec(INIT_SQL);
const db = drizzle({ client: sqlite }) as DrizzleDB;
function makeRuntime(
computeFn: SenseRuntime["compute"],
sqlite?: DatabaseSync,
): { runtime: SenseRuntime; sqlite: DatabaseSync } {
const db_sqlite = sqlite ?? new DatabaseSync(":memory:");
if (!sqlite) db_sqlite.exec(INIT_SQL);
const db = drizzle({ client: db_sqlite }) as DrizzleDB;
return {
runtime: { name: "test-sense", db, compute: computeFn, persistSignal: () => {} },
sqlite,
runtime: {
name: "test-sense",
db,
compute: computeFn,
table: samples,
persistSignal: () => {},
},
sqlite: db_sqlite,
};
}
const emptyPeers: PeerMap = {};
it("returns non-null and inserts into table when compute returns data", async () => {
const { runtime, sqlite } = makeRuntime(async () => ({
ts: 1000,
value: 0.5,
}));
it("returns the compute result when compute returns a non-null value", async () => {
const { runtime, sqlite } = makeRuntime(async (db) => {
await db.insert(samples).values({ ts: Date.now(), value: 0.5 });
return { signal: 0.5, workflow: null };
});
const result = await executeCompute(runtime, emptyPeers);
const result = await executeCompute(runtime);
expect(result.ok).toBe(true);
if (!result.ok) return;
expect(result.value).toEqual({ signal: 0.5, workflow: null });
expect(result.value).toEqual({ ts: 1000, value: 0.5 });
const rows = sqlite.prepare("SELECT * FROM samples").all();
expect(rows).toHaveLength(1);
sqlite.close();
});
it("returns null (no signal) when compute returns null", async () => {
it("returns null and does not insert when compute returns null", async () => {
const { runtime, sqlite } = makeRuntime(async () => null);
const result = await executeCompute(runtime, emptyPeers);
const result = await executeCompute(runtime);
expect(result.ok).toBe(true);
if (!result.ok) return;
expect(result.value).toBeNull();
@@ -235,60 +208,14 @@ describe("executeCompute", () => {
throw new Error("something went wrong");
});
const result = await executeCompute(runtime, emptyPeers);
const result = await executeCompute(runtime);
expect(result.ok).toBe(false);
if (result.ok) return;
expect(result.error.message).toContain("something went wrong");
sqlite.close();
});
it("compute can read from peers", async () => {
// Set up a peer db with data
const peerSqlite = new DatabaseSync(":memory:");
peerSqlite.exec(INIT_SQL);
peerSqlite.prepare("INSERT INTO samples (ts, value) VALUES (100, 3.14)").run();
const peerDb = drizzle({ client: peerSqlite }) as DrizzleDB;
const peers: PeerMap = { "other-sense": peerDb };
const { runtime, sqlite } = makeRuntime(async (_db, p) => {
const rows = await p["other-sense"].select().from(samples).all();
return rows.length > 0 ? { signal: rows[0].value, workflow: null } : null;
});
const result = await executeCompute(runtime, peers);
expect(result.ok).toBe(true);
if (!result.ok) return;
expect(result.value).toEqual({ signal: 3.14, workflow: null });
peerSqlite.close();
sqlite.close();
});
it("write to own db does not affect peer db (isolation)", async () => {
const peerSqlite = new DatabaseSync(":memory:");
peerSqlite.exec(INIT_SQL);
const peerDb = drizzle({ client: peerSqlite }) as DrizzleDB;
const peers: PeerMap = { "peer-sense": peerDb };
const { runtime, sqlite } = makeRuntime(async (db) => {
await db.insert(samples).values({ ts: 999, value: 9.9 });
return { signal: 9.9, workflow: null };
});
await executeCompute(runtime, peers);
const peerRows = peerSqlite.prepare("SELECT * FROM samples").all();
expect(peerRows).toHaveLength(0);
const ownRows = sqlite.prepare("SELECT * FROM samples").all();
expect(ownRows).toHaveLength(1);
peerSqlite.close();
sqlite.close();
});
it("inserts correctly into the sense db directory path", async () => {
it("inserts correctly into the sense db from openSenseDb", async () => {
const dbPath = makeTempDbPath();
const migrationsDir = makeTempMigrationsDir(INIT_SQL);
const dbResult = openSenseDb(dbPath, migrationsDir);
@@ -301,14 +228,12 @@ describe("executeCompute", () => {
const runtime: SenseRuntime = {
name: "cpu-usage",
db,
compute: async (d) => {
await d.insert(samples).values({ ts: 1000, value: 1.23 });
return { signal: 1.23, workflow: null };
},
compute: async () => ({ ts: 1000, value: 1.23 }),
table: samples,
persistSignal: () => {},
};
const result = await executeCompute(runtime, {});
const result = await executeCompute(runtime);
expect(result.ok).toBe(true);
const rows = dbSqlite.prepare("SELECT * FROM samples").all() as Array<{
@@ -323,17 +248,17 @@ describe("executeCompute", () => {
it("returns err when compute exceeds timeoutMs", async () => {
const { runtime, sqlite } = makeRuntime(
(_db, _peers, options) =>
(signal) =>
new Promise<null>((resolve, reject) => {
const t = setTimeout(() => resolve(null), 5_000);
options?.signal.addEventListener("abort", () => {
signal.addEventListener("abort", () => {
clearTimeout(t);
reject(new Error("aborted"));
});
}),
);
const result = await executeCompute(runtime, emptyPeers, 50);
const result = await executeCompute(runtime, 50);
expect(result.ok).toBe(false);
if (result.ok) return;
expect(result.error.message).toMatch(/timed out/i);
@@ -341,39 +266,25 @@ describe("executeCompute", () => {
});
it("completes within timeout when compute is fast", async () => {
const { runtime, sqlite } = makeRuntime(async () => ({ signal: 42, workflow: null }));
const result = await executeCompute(runtime, emptyPeers, 5_000);
const { runtime, sqlite } = makeRuntime(async () => ({ ts: 1, value: 42 }));
const result = await executeCompute(runtime, 5_000);
expect(result.ok).toBe(true);
if (!result.ok) return;
expect(result.value).toEqual({ signal: 42, workflow: null });
expect(result.value).toEqual({ ts: 1, value: 42 });
sqlite.close();
});
it("passes AbortSignal to compute fn", async () => {
let capturedSignal: AbortSignal | undefined;
const { runtime, sqlite } = makeRuntime(async (_db, _peers, options) => {
capturedSignal = options?.signal;
const { runtime, sqlite } = makeRuntime(async (signal) => {
capturedSignal = signal;
return null;
});
await executeCompute(runtime, emptyPeers, 1_000);
await executeCompute(runtime, 1_000);
expect(capturedSignal).toBeInstanceOf(AbortSignal);
sqlite.close();
});
it("passes BlobStore as options.blobs when blobStore argument is provided", async () => {
const blobsRoot = mkdtempSync(join(tmpdir(), "nerve-blobs-"));
const blobStore = createBlobStore(blobsRoot);
let seen: ReturnType<typeof createBlobStore> | undefined;
const { runtime, sqlite } = makeRuntime(async (_db, _peers, options) => {
seen = options?.blobs;
return null;
});
await executeCompute(runtime, emptyPeers, undefined, blobStore);
expect(seen).toBe(blobStore);
sqlite.close();
});
});
// ---------------------------------------------------------------------------
@@ -429,19 +340,14 @@ describe("runMigrations journal", () => {
const first = runMigrations(sqlite, dir);
expect(first.ok).toBe(true);
// Insert a row so we can verify second run doesn't fail on CREATE TABLE
sqlite.exec("INSERT INTO samples (ts, value) VALUES (1, 1.0)");
// Run again — migration must NOT re-run (would fail without IF NOT EXISTS but
// the journal prevents it even for non-idempotent SQL)
const nonIdempotentSql = "CREATE TABLE samples2 (id INTEGER PRIMARY KEY)";
writeFileSync(join(dir, "0002_samples2.sql"), nonIdempotentSql);
// First time: creates samples2
const second = runMigrations(sqlite, dir);
expect(second.ok).toBe(true);
// Second time: 0002 already in journal, must not re-run
const third = runMigrations(sqlite, dir);
expect(third.ok).toBe(true);
+34 -60
View File
@@ -4,43 +4,20 @@ import { DatabaseSync } from "node:sqlite";
import { drizzle } from "drizzle-orm/node-sqlite";
import type { NodeSQLiteDatabase } from "drizzle-orm/node-sqlite";
import type { SQLiteTable } from "drizzle-orm/sqlite-core";
import type { ComputeResult, Result } from "@uncaged/nerve-core";
import type { Result, SenseComputeFn } from "@uncaged/nerve-core";
import { DEFAULT_SENSE_SIGNAL_RETENTION, err, isPlainRecord, ok } from "@uncaged/nerve-core";
import type { BlobStore } from "@uncaged/nerve-store";
/** A Drizzle DB instance (schema-generic) */
export type DrizzleDB = NodeSQLiteDatabase<Record<string, never>>;
/** Read-only map of peer sense name → their Drizzle DB */
export type PeerMap = Readonly<Record<string, DrizzleDB>>;
/** Options passed to a compute function */
export type ComputeOptions = {
signal: AbortSignal;
/** CAS under `data/blobs/`; injected by the sense worker when available. */
blobs?: BlobStore;
};
/**
* The shape every sense's index.ts must export.
* Engine injects `db` (read-write), `peers` (read-only), and `options`
* (`signal`, and `blobs` when running in the sense worker — RFC-001 §8 CAS).
* Returns a structured result when a signal should be emitted (and optionally a workflow),
* or null for silence.
*/
export type ComputeFn<T = unknown> = (
db: DrizzleDB,
peers: PeerMap,
options?: ComputeOptions,
) => Promise<ComputeResult<T>>;
/** All state held for one sense inside a worker */
export type SenseRuntime = {
name: string;
db: DrizzleDB;
compute: ComputeFn;
compute: SenseComputeFn;
table: SQLiteTable;
persistSignal: (payload: unknown) => void;
};
@@ -126,13 +103,13 @@ export function runMigrations(sqlite: DatabaseSync, migrationsDir: string): Resu
return ok(undefined);
}
/** Run `_signals` row prune after this many inserts (amortize DELETE cost). */
const SIGNAL_INSERTS_PER_PRUNE = 100;
/**
* Open (or create) the SQLite file at `dbPath`, run all migrations in
* `migrationsDir`, and wrap with Drizzle ORM.
*/
/** Run `_signals` row prune after this many inserts (amortize DELETE cost). */
const SIGNAL_INSERTS_PER_PRUNE = 100;
export function openSenseDb(
dbPath: string,
migrationsDir: string,
@@ -184,27 +161,12 @@ export function openSenseDb(
}
/**
* Open a peer sense DB in read-only mode (no migrations).
* Dynamically import the compute function and table from a sense's index.ts/js.
* The module must export a named `compute` function and a named `table` (SQLiteTable).
*/
export function openPeerDb(dbPath: string): Result<DrizzleDB> {
let sqlite: DatabaseSync;
try {
sqlite = new DatabaseSync(dbPath, { readOnly: true });
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
return err(new Error(`Failed to open peer database "${dbPath}" (readonly): ${msg}`));
}
// Same schema-agnostic Drizzle wrapper as openSenseDb.
return ok(drizzle({ client: sqlite }) as DrizzleDB);
}
/**
* Dynamically import the compute function from a sense's index.ts/js.
* The module must export a named `compute` function.
*/
export async function loadComputeFn(senseIndexPath: string): Promise<Result<ComputeFn>> {
export async function loadSenseModule(
senseIndexPath: string,
): Promise<Result<{ compute: SenseComputeFn; table: SQLiteTable }>> {
let mod: unknown;
try {
@@ -221,26 +183,32 @@ export async function loadComputeFn(senseIndexPath: string): Promise<Result<Comp
);
}
return ok(mod.compute as ComputeFn);
if (!("table" in mod) || mod.table === null || typeof mod.table !== "object") {
return err(
new Error(
`Sense module "${senseIndexPath}" must export a named "table" (drizzle SQLiteTable)`,
),
);
}
return ok({
compute: mod.compute as SenseComputeFn,
table: mod.table as SQLiteTable,
});
}
/**
* Execute a sense's compute function with an optional soft timeout.
* If timeoutMs is provided and compute takes longer, the AbortSignal is
* triggered and an error Result is returned.
* When `blobStore` is set, it is exposed as `options.blobs` (see RFC-001 §8).
* When compute returns non-null, the result is persisted to the sense's table
* via `db.insert(table).values(result)`.
*/
export async function executeCompute(
runtime: SenseRuntime,
peers: PeerMap,
timeoutMs?: number,
blobStore?: BlobStore,
): Promise<Result<ComputeResult<unknown>>> {
): Promise<Result<unknown | null>> {
const controller = new AbortController();
const options: ComputeOptions =
blobStore !== undefined
? { signal: controller.signal, blobs: blobStore }
: { signal: controller.signal };
let timer: ReturnType<typeof setTimeout> | undefined;
const timeoutPromise =
@@ -254,10 +222,16 @@ export async function executeCompute(
: null;
try {
const computePromise = runtime.compute(runtime.db, peers, options);
const computePromise = runtime.compute(controller.signal);
const result = timeoutPromise
? await Promise.race([computePromise, timeoutPromise])
: await computePromise;
if (result !== null) {
// Cast required: DrizzleDB is schema-agnostic; the sense module guarantees shape compatibility.
await runtime.db.insert(runtime.table).values(result as Record<string, unknown>);
}
return ok(result);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
+18 -68
View File
@@ -3,14 +3,13 @@
*
* Entry point for `nerve worker sense --group <name>`.
* Receives the group name via CLI args, reads nerve.yaml, initialises one
* SenseRuntime per sense in the group, builds peer read-only connections,
* then signals ready and enters the IPC event loop.
* SenseRuntime per sense in the group, then signals ready and enters the
* IPC event loop.
*
* Layout assumptions (nerve user config at `~/.uncaged-nerve/`):
* senses/<name>/index.js ← compiled compute
* senses/<name>/migrations/ ← SQL migration files
* data/senses/<name>.db ← SQLite data file
* data/blobs/<aa>/<hashrest> ← CAS (sha256), via options.blobs in compute
* nerve.yaml ← config
*/
@@ -22,11 +21,10 @@ import { join, resolve } from "node:path";
import { parseNerveConfig, routeSenseComputeOutput } from "@uncaged/nerve-core";
import type { NerveConfig } from "@uncaged/nerve-core";
import { createBlobStore } from "@uncaged/nerve-store";
import type { WorkerToParentMessage } from "./ipc.js";
import { parseParentMessage } from "./ipc.js";
import { executeCompute, loadComputeFn, openPeerDb, openSenseDb } from "./sense-runtime.js";
import type { DrizzleDB, PeerMap, SenseRuntime } from "./sense-runtime.js";
import { executeCompute, loadSenseModule, openSenseDb } from "./sense-runtime.js";
import type { SenseRuntime } from "./sense-runtime.js";
import { ignoreSessionBroadcastSignals } from "./worker-fork-support.js";
// ---------------------------------------------------------------------------
@@ -52,7 +50,7 @@ function sendError(sense: string, error: string): void {
}
// ---------------------------------------------------------------------------
// Initialisation helpers (each extracted to keep bootstrap complexity low)
// Initialisation helpers
// ---------------------------------------------------------------------------
function readConfig(nerveRoot: string): NerveConfig {
@@ -78,7 +76,7 @@ async function initSense(
nerveRoot: string,
senseName: string,
retention: number,
): Promise<{ db: DrizzleDB; runtime: SenseRuntime }> {
): Promise<SenseRuntime> {
const dbPath = join(nerveRoot, "data", "senses", `${senseName}.db`);
const migrationsDir = join(nerveRoot, "senses", senseName, "migrations");
const senseIndexPath = resolve(join(nerveRoot, "senses", senseName, "index.js"));
@@ -88,55 +86,20 @@ async function initSense(
throw new Error(`Failed to init DB for "${senseName}": ${dbResult.error.message}`);
}
const computeResult = await loadComputeFn(senseIndexPath);
if (!computeResult.ok) {
throw new Error(`Failed to load compute for "${senseName}": ${computeResult.error.message}`);
const moduleResult = await loadSenseModule(senseIndexPath);
if (!moduleResult.ok) {
throw new Error(`Failed to load module for "${senseName}": ${moduleResult.error.message}`);
}
const { db } = dbResult.value;
return {
db,
runtime: {
name: senseName,
db,
compute: computeResult.value,
persistSignal: dbResult.value.persistSignal,
},
name: senseName,
db: dbResult.value.db,
compute: moduleResult.value.compute,
table: moduleResult.value.table,
persistSignal: dbResult.value.persistSignal,
};
}
function buildPeers(
nerveRoot: string,
allSenseNames: string[],
ownDbs: Map<string, DrizzleDB>,
groupSenseNames: Set<string>,
): PeerMap {
const entries: [string, DrizzleDB][] = [];
for (const peerName of allSenseNames) {
// Exclude senses that belong to this worker's own group — they are not peers
if (groupSenseNames.has(peerName)) continue;
const own = ownDbs.get(peerName);
if (own !== undefined) {
entries.push([peerName, own]);
continue;
}
const peerDbPath = join(nerveRoot, "data", "senses", `${peerName}.db`);
const peerResult = openPeerDb(peerDbPath);
if (!peerResult.ok) {
process.stderr.write(
`[sense-worker] Warning: could not open peer DB for "${peerName}": ${peerResult.error.message}\n`,
);
continue;
}
entries.push([peerName, peerResult.value]);
}
return Object.fromEntries(entries);
}
// ---------------------------------------------------------------------------
// Grace period: hard kill after soft timeout
//
@@ -173,13 +136,11 @@ function clearGracePeriodTimer(senseName: string): void {
async function runCompute(
senseName: string,
runtime: SenseRuntime,
peers: PeerMap,
timeoutMs: number,
gracePeriodMs: number | null,
blobStore: ReturnType<typeof createBlobStore>,
): Promise<void> {
try {
const result = await executeCompute(runtime, peers, timeoutMs, blobStore);
const result = await executeCompute(runtime, timeoutMs);
if (!result.ok) {
sendError(senseName, result.error.message);
if (gracePeriodMs !== null && result.error.message.includes("timed out")) {
@@ -210,11 +171,9 @@ async function runCompute(
function handleMessage(
raw: unknown,
runtimes: Map<string, SenseRuntime>,
peers: PeerMap,
group: string,
senseConfigs: Map<string, { timeout: number | null; gracePeriod: number | null }>,
inFlight: Map<string, Promise<void>>,
blobStore: ReturnType<typeof createBlobStore>,
): void {
const parseResult = parseParentMessage(raw);
if (!parseResult.ok) {
@@ -245,14 +204,13 @@ function handleMessage(
return;
}
// Look up timeout/gracePeriod per-sense at compute time (RFC §5.3: these are per-sense)
const sc = senseConfigs.get(msg.sense);
const timeoutMs = sc?.timeout ?? DEFAULT_TIMEOUT_MS;
const gracePeriodMs = sc?.gracePeriod ?? null;
const previous = inFlight.get(msg.sense) ?? Promise.resolve();
const next = previous
.then(() => runCompute(msg.sense, runtime, peers, timeoutMs, gracePeriodMs, blobStore))
.then(() => runCompute(msg.sense, runtime, timeoutMs, gracePeriodMs))
.catch((e: unknown) => {
const errMsg = e instanceof Error ? e.message : String(e);
sendError(msg.sense, errMsg);
@@ -279,14 +237,12 @@ async function bootstrap(nerveRoot: string, group: string): Promise<void> {
}
const runtimes = new Map<string, SenseRuntime>();
const ownDbs = new Map<string, DrizzleDB>();
const failedSenses: string[] = [];
for (const senseName of groupSenses) {
try {
const retention = config.senses[senseName].retention;
const { db, runtime } = await initSense(nerveRoot, senseName, retention);
ownDbs.set(senseName, db);
const runtime = await initSense(nerveRoot, senseName, retention);
runtimes.set(senseName, runtime);
} catch (e: unknown) {
const eMsg = e instanceof Error ? e.message : String(e);
@@ -297,16 +253,11 @@ async function bootstrap(nerveRoot: string, group: string): Promise<void> {
}
}
// If ALL senses failed, exit with error so kernel respawns
if (runtimes.size === 0) {
process.stderr.write(`[sense-worker] All senses in group "${group}" failed to load, exiting\n`);
process.exit(1);
}
const groupSenseNames = new Set(groupSenses);
const peers = buildPeers(nerveRoot, Object.keys(config.senses), ownDbs, groupSenseNames);
// Build per-sense timeout/gracePeriod map (RFC §5.3: these are per-sense, not per-group)
const senseConfigs = new Map<string, { timeout: number | null; gracePeriod: number | null }>();
for (const senseName of groupSenses) {
const sc = config.senses[senseName];
@@ -317,12 +268,11 @@ async function bootstrap(nerveRoot: string, group: string): Promise<void> {
}
const inFlight = new Map<string, Promise<void>>();
const blobStore = createBlobStore(join(nerveRoot, "data", "blobs"));
sendReady();
process.on("message", (raw: unknown) => {
handleMessage(raw, runtimes, peers, group, senseConfigs, inFlight, blobStore);
handleMessage(raw, runtimes, group, senseConfigs, inFlight);
});
}
+4
View File
@@ -86,6 +86,9 @@ importers:
packages/core:
dependencies:
drizzle-orm:
specifier: 1.0.0-beta.23-c10d10c
version: 1.0.0-beta.23-c10d10c(@cloudflare/workers-types@4.20260425.1)(@types/better-sqlite3@7.6.13)(@types/mssql@9.1.11(@azure/core-client@1.10.1))(better-sqlite3@11.10.0)(mssql@11.0.1(@azure/core-client@1.10.1))(sql.js@1.14.1)(zod@4.3.6)
yaml:
specifier: ^2.8.3
version: 2.8.3
@@ -2033,6 +2036,7 @@ packages:
uuid@8.3.2:
resolution: {integrity: sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg==}
deprecated: uuid@10 and below is no longer supported. For ESM codebases, update to uuid@latest. For CommonJS codebases, use uuid@11 (but be aware this version will likely be deprecated in 2028).
hasBin: true
vite@8.0.9: