refactor(daemon): stateful sense engine — JSON state, remove Signal Bus
Phase 2 of RFC #308: Stateful Sense refactor. - SenseRuntime uses JSON file persistence instead of SQLite/Drizzle - Sense compute now receives state and returns { state, workflow } - IPC: replaced SignalMessage with ComputeResultMessage - Removed Signal Bus entirely (on[] now uses reverse-index in scheduler) - sense-scheduler.onSenseCompleted() triggers dependent senses - kernel no longer constructs Signal objects or calls routeSenseComputeOutput - Removed drizzle-orm dependency from daemon package Refs #308, closes #310
This commit is contained in:
@@ -28,7 +28,6 @@
|
||||
"dependencies": {
|
||||
"@uncaged/nerve-core": "workspace:*",
|
||||
"@uncaged/nerve-store": "workspace:*",
|
||||
"drizzle-orm": "1.0.0-beta.23-c10d10c",
|
||||
"yaml": "^2.8.3"
|
||||
},
|
||||
"devDependencies": {
|
||||
|
||||
@@ -241,7 +241,6 @@ describe("daemon-ipc — list-senses", () => {
|
||||
throttle: 5000,
|
||||
timeout: 3000,
|
||||
triggers: ["every 30s"],
|
||||
lastSignalTimestamp: 1000,
|
||||
},
|
||||
{
|
||||
name: "disk-usage",
|
||||
@@ -249,7 +248,6 @@ describe("daemon-ipc — list-senses", () => {
|
||||
throttle: 30000,
|
||||
timeout: null,
|
||||
triggers: [],
|
||||
lastSignalTimestamp: null,
|
||||
},
|
||||
];
|
||||
const listSenses = vi.fn(() => sensesData);
|
||||
|
||||
@@ -31,6 +31,11 @@ process.on("message", (msg) => {
|
||||
writeFileSync(markerFile, "crashed", "utf8");
|
||||
process.exit(1);
|
||||
}
|
||||
process.send({ type: "signal", sense: msg.sense, payload: 42 });
|
||||
process.send({
|
||||
type: "compute-result",
|
||||
sense: msg.sense,
|
||||
state: 42,
|
||||
workflow: null,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
*
|
||||
* Behaviour:
|
||||
* - Sends { type: "ready" } on startup
|
||||
* - On { type: "compute", sense } → sends back { type: "signal", sense, payload: 42 }
|
||||
* - On { type: "compute", sense } → sends back compute-result with state + workflow:null
|
||||
* - On { type: "shutdown" } → exits cleanly with code 0
|
||||
*/
|
||||
|
||||
@@ -23,6 +23,11 @@ process.on("message", (msg) => {
|
||||
}
|
||||
|
||||
if (msg.type === "compute" && typeof msg.sense === "string") {
|
||||
process.send({ type: "signal", sense: msg.sense, payload: 42 });
|
||||
process.send({
|
||||
type: "compute-result",
|
||||
sense: msg.sense,
|
||||
state: 42,
|
||||
workflow: null,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
@@ -18,7 +18,12 @@ process.on("message", (msg) => {
|
||||
if (msg.type === "compute" && typeof msg.sense === "string") {
|
||||
// Intentionally slow — will be killed by grace period
|
||||
setTimeout(() => {
|
||||
process.send({ type: "signal", sense: msg.sense, payload: "late" });
|
||||
process.send({
|
||||
type: "compute-result",
|
||||
sense: msg.sense,
|
||||
state: "late",
|
||||
workflow: null,
|
||||
});
|
||||
}, 10_000);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -11,7 +11,6 @@ import { tmpdir } from "node:os";
|
||||
import { dirname, join } from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
|
||||
import type { Signal } from "@uncaged/nerve-core";
|
||||
import type { NerveConfig } from "@uncaged/nerve-core";
|
||||
import { afterEach, beforeEach, describe, expect, it } from "vitest";
|
||||
|
||||
@@ -30,7 +29,6 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -82,7 +80,6 @@ describe("kernel integration — real child processes", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -91,7 +88,6 @@ describe("kernel integration — real child processes", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -100,7 +96,6 @@ describe("kernel integration — real child processes", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -116,31 +111,32 @@ describe("kernel integration — real child processes", () => {
|
||||
expect(kernel.senseCount).toBe(3);
|
||||
});
|
||||
|
||||
it("workers start and respond to compute messages with signals", async () => {
|
||||
it("workers start and respond to compute messages with compute-result", async () => {
|
||||
const config = makeConfig();
|
||||
kernel = createKernel(config, nerveRoot, {
|
||||
workerScript: MOCK_WORKER,
|
||||
});
|
||||
|
||||
// Wait for all workers to be ready (event-based, not fixed delay)
|
||||
await kernel.ready;
|
||||
|
||||
// Subscribe to the bus before triggering compute
|
||||
const received: Signal[] = [];
|
||||
const unsub = kernel.bus.subscribe((signal) => {
|
||||
received.push(signal);
|
||||
});
|
||||
|
||||
// Trigger a compute for "cpu-usage" through the kernel's triggerCompute
|
||||
kernel.triggerCompute("cpu-usage");
|
||||
|
||||
// Poll until a signal arrives on the bus (event-driven, no fixed delay)
|
||||
await pollUntil(() => received.length > 0, 10_000);
|
||||
await pollUntil(
|
||||
() =>
|
||||
kernel!.logStore.query({
|
||||
source: "sense",
|
||||
type: "compute-complete",
|
||||
refId: "cpu-usage",
|
||||
}).length > 0,
|
||||
10_000,
|
||||
);
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
|
||||
|
||||
unsub();
|
||||
const rows = kernel.logStore.query({
|
||||
source: "sense",
|
||||
type: "compute-complete",
|
||||
refId: "cpu-usage",
|
||||
});
|
||||
expect(rows).toHaveLength(1);
|
||||
}, 15_000);
|
||||
|
||||
it("graceful shutdown: stop() resolves after all workers exit", async () => {
|
||||
@@ -151,7 +147,6 @@ describe("kernel integration — real child processes", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -160,7 +155,6 @@ describe("kernel integration — real child processes", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -180,31 +174,32 @@ describe("kernel integration — real child processes", () => {
|
||||
await expect(stopPromise).resolves.toBeUndefined();
|
||||
}, 10_000);
|
||||
|
||||
it("compute round-trip: worker receives compute and sends signal back through bus", async () => {
|
||||
it("compute round-trip: worker receives compute and kernel logs compute-complete", async () => {
|
||||
const config = makeConfig();
|
||||
kernel = createKernel(config, nerveRoot, {
|
||||
workerScript: MOCK_WORKER,
|
||||
});
|
||||
|
||||
// Wait for all workers to be ready (event-based, not fixed delay)
|
||||
await kernel.ready;
|
||||
|
||||
const received: Signal[] = [];
|
||||
const unsub = kernel.bus.subscribe((signal) => {
|
||||
received.push(signal);
|
||||
});
|
||||
|
||||
// Trigger compute via the kernel — the kernel sends IPC to the worker,
|
||||
// the mock worker responds with a signal message, and the kernel routes it to the bus.
|
||||
kernel.triggerCompute("cpu-usage");
|
||||
|
||||
// Poll for the signal on the bus (no fixed delay)
|
||||
await pollUntil(() => received.length > 0, 10_000);
|
||||
await pollUntil(
|
||||
() =>
|
||||
kernel!.logStore.query({
|
||||
source: "sense",
|
||||
type: "compute-complete",
|
||||
refId: "cpu-usage",
|
||||
}).length > 0,
|
||||
10_000,
|
||||
);
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
|
||||
|
||||
unsub();
|
||||
const rows = kernel.logStore.query({
|
||||
source: "sense",
|
||||
type: "compute-complete",
|
||||
refId: "cpu-usage",
|
||||
});
|
||||
expect(rows).toHaveLength(1);
|
||||
}, 15_000);
|
||||
|
||||
it("crash recovery: kernel respawns worker after unexpected exit and new worker is functional", async () => {
|
||||
@@ -234,23 +229,24 @@ describe("kernel integration — real child processes", () => {
|
||||
expect(newPid).not.toBeNull();
|
||||
expect(newPid).not.toBe(originalPid);
|
||||
|
||||
// Wait a bit for the new worker to send its "ready" message and be fully up.
|
||||
// Poll until the new worker responds to a compute message on the bus.
|
||||
const postRespawnSignals: Signal[] = [];
|
||||
const unsub = kernel.bus.subscribe((signal) => {
|
||||
postRespawnSignals.push(signal);
|
||||
});
|
||||
|
||||
// Trigger compute through the kernel to the new worker
|
||||
kernel.triggerCompute("cpu-usage");
|
||||
|
||||
// Poll for the signal — verifies the new worker is fully functional
|
||||
await pollUntil(() => postRespawnSignals.length > 0, 15_000);
|
||||
await pollUntil(
|
||||
() =>
|
||||
kernel!.logStore.query({
|
||||
source: "sense",
|
||||
type: "compute-complete",
|
||||
refId: "cpu-usage",
|
||||
}).length > 0,
|
||||
15_000,
|
||||
);
|
||||
|
||||
expect(postRespawnSignals).toHaveLength(1);
|
||||
expect(postRespawnSignals[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
|
||||
|
||||
unsub();
|
||||
const rows = kernel.logStore.query({
|
||||
source: "sense",
|
||||
type: "compute-complete",
|
||||
refId: "cpu-usage",
|
||||
});
|
||||
expect(rows.length).toBeGreaterThanOrEqual(1);
|
||||
|
||||
// Kernel should still stop gracefully after respawn
|
||||
await kernel.stop();
|
||||
|
||||
@@ -86,7 +86,6 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -125,7 +124,6 @@ describe("kernel — getHealth", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -134,7 +132,6 @@ describe("kernel — getHealth", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -143,7 +140,6 @@ describe("kernel — getHealth", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -246,7 +242,6 @@ describe("kernel — reloadConfig", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -255,7 +250,6 @@ describe("kernel — reloadConfig", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -283,7 +277,6 @@ describe("kernel — reloadConfig", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -292,7 +285,6 @@ describe("kernel — reloadConfig", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -318,7 +310,6 @@ describe("kernel — reloadConfig", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -352,7 +343,6 @@ describe("kernel — reloadConfig", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -361,7 +351,6 @@ describe("kernel — reloadConfig", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
|
||||
@@ -101,7 +101,6 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -153,7 +152,6 @@ describe("kernel.triggerSense()", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -162,7 +160,6 @@ describe("kernel.triggerSense()", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -199,7 +196,6 @@ describe("kernel.triggerSense()", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -208,7 +204,6 @@ describe("kernel.triggerSense()", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
/**
|
||||
* Integration tests for Kernel + WorkflowManager integration.
|
||||
*
|
||||
* Verifies that sense signals trigger workflow runs when Sense compute routes
|
||||
* to workflows; that workflow events are logged; that reloadConfig handles workflow changes;
|
||||
* Verifies that sense compute-result IPC triggers workflow runs when `workflow`
|
||||
* is non-null; that workflow events are logged; that reloadConfig handles workflow changes;
|
||||
* and that graceful shutdown stops workflow workers.
|
||||
*
|
||||
* Uses mocked child_process.fork to avoid real subprocesses.
|
||||
@@ -53,7 +53,12 @@ function makeMockChild(pid = 1): MockChild {
|
||||
// Sense IPC: reply to compute so scheduler completes (onComputeComplete).
|
||||
if (m.type === "compute" && typeof m.sense === "string") {
|
||||
setImmediate(() => {
|
||||
child.emit("message", { type: "signal", sense: m.sense, payload: 42 });
|
||||
child.emit("message", {
|
||||
type: "compute-result",
|
||||
sense: m.sense,
|
||||
state: 42,
|
||||
workflow: null,
|
||||
});
|
||||
});
|
||||
}
|
||||
});
|
||||
@@ -117,7 +122,6 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -159,7 +163,6 @@ describe("kernel + workflowManager integration", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -174,22 +177,17 @@ describe("kernel + workflowManager integration", () => {
|
||||
await flushSenseWorkerForkMicrotasks(kernel);
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
// Simulate a sense worker sending a signal with workflow launch payload
|
||||
// The kernel's handleWorkerMessage processes "signal" type messages
|
||||
// and uses routeSenseComputeOutput to detect workflow launches
|
||||
const workerPool = mockChildren[0];
|
||||
if (workerPool) {
|
||||
workerPool.emit("message", {
|
||||
type: "signal",
|
||||
type: "compute-result",
|
||||
sense: "cpu-usage",
|
||||
payload: {
|
||||
signal: { reason: "test" },
|
||||
workflow: {
|
||||
name: "my-workflow",
|
||||
maxRounds: 10,
|
||||
prompt: "run this workflow",
|
||||
dryRun: false,
|
||||
},
|
||||
state: { reason: "test" },
|
||||
workflow: {
|
||||
name: "my-workflow",
|
||||
maxRounds: 10,
|
||||
prompt: "run this workflow",
|
||||
dryRun: false,
|
||||
},
|
||||
});
|
||||
}
|
||||
@@ -221,7 +219,6 @@ describe("kernel + workflowManager integration", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -236,20 +233,17 @@ describe("kernel + workflowManager integration", () => {
|
||||
await flushSenseWorkerForkMicrotasks(kernel);
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
// Simulate sense worker returning a signal plus workflow launch
|
||||
const workerPool = mockChildren[0];
|
||||
if (workerPool) {
|
||||
workerPool.emit("message", {
|
||||
type: "signal",
|
||||
type: "compute-result",
|
||||
sense: "cpu-usage",
|
||||
payload: {
|
||||
signal: { level: "critical" },
|
||||
workflow: {
|
||||
name: "alert-workflow",
|
||||
maxRounds: 5,
|
||||
prompt: "handle critical alert",
|
||||
dryRun: false,
|
||||
},
|
||||
state: { level: "critical" },
|
||||
workflow: {
|
||||
name: "alert-workflow",
|
||||
maxRounds: 5,
|
||||
prompt: "handle critical alert",
|
||||
dryRun: false,
|
||||
},
|
||||
});
|
||||
}
|
||||
@@ -280,7 +274,7 @@ describe("kernel + workflowManager integration", () => {
|
||||
await stopPromise;
|
||||
});
|
||||
|
||||
it("logs sense signal before workflow-launch when both are present", async () => {
|
||||
it("logs compute-complete before workflow-launch when workflow is present", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeConfig({
|
||||
workflows: { "order-wf": { concurrency: 1, overflow: "drop" } },
|
||||
@@ -296,16 +290,14 @@ describe("kernel + workflowManager integration", () => {
|
||||
const workerPool = mockChildren[0];
|
||||
if (workerPool) {
|
||||
workerPool.emit("message", {
|
||||
type: "signal",
|
||||
type: "compute-result",
|
||||
sense: "cpu-usage",
|
||||
payload: {
|
||||
signal: { seq: 1 },
|
||||
workflow: {
|
||||
name: "order-wf",
|
||||
maxRounds: 2,
|
||||
prompt: "p",
|
||||
dryRun: true,
|
||||
},
|
||||
state: { seq: 1 },
|
||||
workflow: {
|
||||
name: "order-wf",
|
||||
maxRounds: 2,
|
||||
prompt: "p",
|
||||
dryRun: true,
|
||||
},
|
||||
});
|
||||
}
|
||||
@@ -316,17 +308,17 @@ describe("kernel + workflowManager integration", () => {
|
||||
.map((c) => c[0] as { source: string; type: string; refId: string | null })
|
||||
.filter((e) => e.source === "sense" && e.refId === "cpu-usage");
|
||||
const typeOrder = senseEntries.map((e) => e.type);
|
||||
const sigAt = typeOrder.indexOf("signal");
|
||||
const completeAt = typeOrder.indexOf("compute-complete");
|
||||
const launchAt = typeOrder.indexOf("workflow-launch");
|
||||
expect(sigAt).toBeGreaterThanOrEqual(0);
|
||||
expect(launchAt).toBeGreaterThan(sigAt);
|
||||
expect(completeAt).toBeGreaterThanOrEqual(0);
|
||||
expect(launchAt).toBeGreaterThan(completeAt);
|
||||
|
||||
const stopPromise = kernel.stop();
|
||||
await vi.runAllTimersAsync();
|
||||
await stopPromise;
|
||||
});
|
||||
|
||||
it("does not trigger workflow when signal senseId is not in 'on' list", async () => {
|
||||
it("does not trigger workflow when compute-result has workflow null", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
@@ -335,7 +327,6 @@ describe("kernel + workflowManager integration", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -344,7 +335,6 @@ describe("kernel + workflowManager integration", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -359,13 +349,13 @@ describe("kernel + workflowManager integration", () => {
|
||||
await flushSenseWorkerForkMicrotasks(kernel);
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
// Emit a regular signal (shorthand payload) — should NOT trigger any workflow
|
||||
const workerPool = mockChildren[0];
|
||||
if (workerPool) {
|
||||
workerPool.emit("message", {
|
||||
type: "signal",
|
||||
type: "compute-result",
|
||||
sense: "cpu-usage",
|
||||
payload: 50,
|
||||
state: 50,
|
||||
workflow: null,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -396,7 +386,6 @@ describe("kernel + workflowManager integration", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -411,20 +400,17 @@ describe("kernel + workflowManager integration", () => {
|
||||
await flushSenseWorkerForkMicrotasks(kernel);
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
// Simulate sense compute returning a signal plus workflow launch
|
||||
const workerPool = mockChildren[0];
|
||||
if (workerPool) {
|
||||
workerPool.emit("message", {
|
||||
type: "signal",
|
||||
type: "compute-result",
|
||||
sense: "cpu-usage",
|
||||
payload: {
|
||||
signal: { note: "log" },
|
||||
workflow: {
|
||||
name: "log-test-workflow",
|
||||
maxRounds: 10,
|
||||
prompt: "test prompt",
|
||||
dryRun: false,
|
||||
},
|
||||
state: { note: "log" },
|
||||
workflow: {
|
||||
name: "log-test-workflow",
|
||||
maxRounds: 10,
|
||||
prompt: "test prompt",
|
||||
dryRun: false,
|
||||
},
|
||||
});
|
||||
}
|
||||
@@ -452,7 +438,6 @@ describe("kernel + workflowManager integration", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -476,7 +461,6 @@ describe("kernel + workflowManager integration", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -488,20 +472,17 @@ describe("kernel + workflowManager integration", () => {
|
||||
};
|
||||
kernel.reloadConfig(newConfig);
|
||||
|
||||
// Simulate sense compute returning a signal plus workflow for the new workflow
|
||||
const workerPool = mockChildren[0];
|
||||
if (workerPool) {
|
||||
workerPool.emit("message", {
|
||||
type: "signal",
|
||||
type: "compute-result",
|
||||
sense: "cpu-usage",
|
||||
payload: {
|
||||
signal: { phase: "reload" },
|
||||
workflow: {
|
||||
name: "new-workflow",
|
||||
maxRounds: 10,
|
||||
prompt: "reload test",
|
||||
dryRun: false,
|
||||
},
|
||||
state: { phase: "reload" },
|
||||
workflow: {
|
||||
name: "new-workflow",
|
||||
maxRounds: 10,
|
||||
prompt: "reload test",
|
||||
dryRun: false,
|
||||
},
|
||||
});
|
||||
}
|
||||
@@ -534,7 +515,6 @@ describe("kernel + workflowManager integration", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -557,7 +537,6 @@ describe("kernel + workflowManager integration", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -574,20 +553,17 @@ describe("kernel + workflowManager integration", () => {
|
||||
(c.send as ReturnType<typeof vi.fn>).mockClear();
|
||||
}
|
||||
|
||||
// Simulate sense compute trying to launch the removed workflow — it should not start
|
||||
const workerPool = mockChildren[0];
|
||||
if (workerPool) {
|
||||
workerPool.emit("message", {
|
||||
type: "signal",
|
||||
type: "compute-result",
|
||||
sense: "cpu-usage",
|
||||
payload: {
|
||||
signal: { stale: true },
|
||||
workflow: {
|
||||
name: "old-workflow",
|
||||
maxRounds: 10,
|
||||
prompt: "should not work",
|
||||
dryRun: false,
|
||||
},
|
||||
state: { stale: true },
|
||||
workflow: {
|
||||
name: "old-workflow",
|
||||
maxRounds: 10,
|
||||
prompt: "should not work",
|
||||
dryRun: false,
|
||||
},
|
||||
});
|
||||
}
|
||||
@@ -621,7 +597,6 @@ describe("kernel + workflowManager integration", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -636,20 +611,17 @@ describe("kernel + workflowManager integration", () => {
|
||||
await flushSenseWorkerForkMicrotasks(kernel);
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
// Trigger a workflow via sense compute return value
|
||||
const workerPool = mockChildren[0];
|
||||
if (workerPool) {
|
||||
workerPool.emit("message", {
|
||||
type: "signal",
|
||||
type: "compute-result",
|
||||
sense: "cpu-usage",
|
||||
payload: {
|
||||
signal: { shutdownCase: true },
|
||||
workflow: {
|
||||
name: "shutdown-test",
|
||||
maxRounds: 10,
|
||||
prompt: "test",
|
||||
dryRun: false,
|
||||
},
|
||||
state: { shutdownCase: true },
|
||||
workflow: {
|
||||
name: "shutdown-test",
|
||||
maxRounds: 10,
|
||||
prompt: "test",
|
||||
dryRun: false,
|
||||
},
|
||||
});
|
||||
}
|
||||
@@ -689,7 +661,6 @@ describe("kernel + workflowManager integration", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
|
||||
@@ -37,7 +37,12 @@ function makeMockChild(pid = 1): MockChild {
|
||||
}
|
||||
if (m.type === "compute" && typeof m.sense === "string") {
|
||||
setImmediate(() => {
|
||||
child.emit("message", { type: "signal", sense: m.sense, payload: 42 });
|
||||
child.emit("message", {
|
||||
type: "compute-result",
|
||||
sense: m.sense,
|
||||
state: 42,
|
||||
workflow: null,
|
||||
});
|
||||
});
|
||||
}
|
||||
});
|
||||
@@ -81,7 +86,6 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -112,7 +116,7 @@ describe("kernel — message routing", () => {
|
||||
rmSync(nerveRoot, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it("routes signal message to bus without throwing", async () => {
|
||||
it("routes compute-result message without throwing", async () => {
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": {
|
||||
@@ -120,7 +124,6 @@ describe("kernel — message routing", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -133,14 +136,19 @@ describe("kernel — message routing", () => {
|
||||
const child = mockChildren[0];
|
||||
|
||||
expect(() => {
|
||||
child.emit("message", { type: "signal", sense: "cpu-usage", payload: 42 });
|
||||
child.emit("message", {
|
||||
type: "compute-result",
|
||||
sense: "cpu-usage",
|
||||
state: 42,
|
||||
workflow: null,
|
||||
});
|
||||
}).not.toThrow();
|
||||
|
||||
await kernel.stop();
|
||||
await vi.runAllTimersAsync();
|
||||
});
|
||||
|
||||
it("persists emitted signals as sense/signal log entries", async () => {
|
||||
it("persists compute-complete log entries for sense IPC", async () => {
|
||||
const tmpDir = mkdtempSync(join(tmpdir(), "nerve-kernel-sig-"));
|
||||
const logStore = createLogStore(join(tmpDir, "logs.db"));
|
||||
try {
|
||||
@@ -151,7 +159,6 @@ describe("kernel — message routing", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -160,10 +167,19 @@ describe("kernel — message routing", () => {
|
||||
const kernel = createKernel(config, tmpDir, { logStore });
|
||||
await vi.runAllTimersAsync();
|
||||
const child = mockChildren[0];
|
||||
child.emit("message", { type: "signal", sense: "cpu-usage", payload: 123 });
|
||||
const rows = logStore.query({ source: "sense", type: "signal", refId: "cpu-usage" });
|
||||
child.emit("message", {
|
||||
type: "compute-result",
|
||||
sense: "cpu-usage",
|
||||
state: 123,
|
||||
workflow: null,
|
||||
});
|
||||
const rows = logStore.query({
|
||||
source: "sense",
|
||||
type: "compute-complete",
|
||||
refId: "cpu-usage",
|
||||
});
|
||||
expect(rows).toHaveLength(1);
|
||||
expect(rows[0].payload).toBe(JSON.stringify(123));
|
||||
expect(rows[0].payload).toBeNull();
|
||||
await kernel.stop();
|
||||
await vi.runAllTimersAsync();
|
||||
} finally {
|
||||
@@ -180,7 +196,6 @@ describe("kernel — message routing", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -209,7 +224,6 @@ describe("kernel — message routing", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -237,7 +251,6 @@ describe("kernel — message routing", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -278,7 +291,6 @@ describe("kernel — groupForSense mapping", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -287,7 +299,6 @@ describe("kernel — groupForSense mapping", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -296,7 +307,6 @@ describe("kernel — groupForSense mapping", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -322,7 +332,6 @@ describe("kernel — groupForSense mapping", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: 500,
|
||||
on: [],
|
||||
},
|
||||
|
||||
@@ -3,11 +3,10 @@ import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
import type { NerveConfig, Signal } from "@uncaged/nerve-core";
|
||||
import type { NerveConfig } from "@uncaged/nerve-core";
|
||||
import { createLogStore } from "@uncaged/nerve-store";
|
||||
import type { LogStore } from "@uncaged/nerve-store";
|
||||
import { createSenseScheduler } from "../sense-scheduler.js";
|
||||
import { createSignalBus } from "../signal-bus.js";
|
||||
|
||||
describe("LogStore + SenseScheduler integration", () => {
|
||||
let tmpDir: string;
|
||||
@@ -31,7 +30,6 @@ describe("LogStore + SenseScheduler integration", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: ["cpu-usage"],
|
||||
},
|
||||
@@ -41,14 +39,12 @@ describe("LogStore + SenseScheduler integration", () => {
|
||||
extract: null,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
};
|
||||
const bus = createSignalBus();
|
||||
const triggered: string[] = [];
|
||||
const scheduler = createSenseScheduler(config, bus, (name) => triggered.push(name), {
|
||||
const scheduler = createSenseScheduler(config, (name) => triggered.push(name), {
|
||||
logStore,
|
||||
});
|
||||
|
||||
const signal: Signal = { id: 1, senseId: "cpu-usage", payload: 42, timestamp: Date.now() };
|
||||
bus.emit(signal);
|
||||
scheduler.onSenseCompleted("cpu-usage");
|
||||
|
||||
const logs = logStore.query({ source: "sense_scheduler", type: "run_start" });
|
||||
expect(logs).toHaveLength(1);
|
||||
@@ -68,7 +64,6 @@ describe("LogStore + SenseScheduler integration", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: 1000,
|
||||
on: [],
|
||||
},
|
||||
@@ -78,11 +73,9 @@ describe("LogStore + SenseScheduler integration", () => {
|
||||
extract: null,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
};
|
||||
const bus = createSignalBus();
|
||||
const ref: { scheduler: ReturnType<typeof createSenseScheduler> | null } = { scheduler: null };
|
||||
const scheduler = createSenseScheduler(
|
||||
config,
|
||||
bus,
|
||||
(name) => {
|
||||
ref.scheduler?.onComputeComplete(name);
|
||||
},
|
||||
@@ -108,7 +101,6 @@ describe("LogStore + SenseScheduler integration", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: ["cpu-usage"],
|
||||
},
|
||||
@@ -118,11 +110,9 @@ describe("LogStore + SenseScheduler integration", () => {
|
||||
extract: null,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
};
|
||||
const bus = createSignalBus();
|
||||
const triggered: string[] = [];
|
||||
const scheduler = createSenseScheduler(
|
||||
config,
|
||||
bus,
|
||||
(name) => {
|
||||
triggered.push(name);
|
||||
scheduler.onComputeComplete(name);
|
||||
@@ -138,8 +128,6 @@ describe("LogStore + SenseScheduler integration", () => {
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
|
||||
// Writing to the log store should NOT trigger any sense compute.
|
||||
// Only bus.emit(signal) triggers scheduled senses.
|
||||
expect(triggered).toHaveLength(0);
|
||||
|
||||
scheduler.stop();
|
||||
|
||||
@@ -7,7 +7,6 @@ import { tmpdir } from "node:os";
|
||||
import { dirname, join } from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
|
||||
import type { Signal } from "@uncaged/nerve-core";
|
||||
import type { NerveConfig } from "@uncaged/nerve-core";
|
||||
import { afterEach, beforeEach, describe, expect, it } from "vitest";
|
||||
|
||||
@@ -27,7 +26,6 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -97,13 +95,16 @@ describe("phase6 — restartGroup", () => {
|
||||
expect(newPid).not.toBeNull();
|
||||
expect(newPid).not.toBe(oldPid);
|
||||
|
||||
// Verify new worker is functional
|
||||
const received: Signal[] = [];
|
||||
const unsub = kernel.bus.subscribe((s) => received.push(s));
|
||||
kernel.triggerCompute("cpu-usage");
|
||||
await pollUntil(() => received.length > 0, 10_000);
|
||||
expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
|
||||
unsub();
|
||||
await pollUntil(
|
||||
() =>
|
||||
kernel!.logStore.query({
|
||||
source: "sense",
|
||||
type: "compute-complete",
|
||||
refId: "cpu-usage",
|
||||
}).length > 0,
|
||||
10_000,
|
||||
);
|
||||
}, 35_000);
|
||||
|
||||
it("restartGroup on nonexistent group does nothing", async () => {
|
||||
@@ -154,7 +155,6 @@ describe("phase6 — reloadConfig", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -163,7 +163,6 @@ describe("phase6 — reloadConfig", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -191,7 +190,6 @@ describe("phase6 — reloadConfig", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -200,7 +198,6 @@ describe("phase6 — reloadConfig", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -224,7 +221,6 @@ describe("phase6 — reloadConfig", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -269,7 +265,6 @@ describe("phase6 — error isolation", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -278,7 +273,6 @@ describe("phase6 — error isolation", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -294,19 +288,27 @@ describe("phase6 — error isolation", () => {
|
||||
});
|
||||
await kernel.ready;
|
||||
|
||||
// Both senses go through the same worker (mock-worker responds to all compute with signal)
|
||||
const received: Signal[] = [];
|
||||
const unsub = kernel.bus.subscribe((s) => received.push(s));
|
||||
|
||||
kernel.triggerCompute("good-sense");
|
||||
await pollUntil(() => received.length > 0, 10_000);
|
||||
expect(received[0]).toMatchObject({ senseId: "good-sense" });
|
||||
await pollUntil(
|
||||
() =>
|
||||
kernel!.logStore.query({
|
||||
source: "sense",
|
||||
type: "compute-complete",
|
||||
refId: "good-sense",
|
||||
}).length > 0,
|
||||
10_000,
|
||||
);
|
||||
|
||||
kernel.triggerCompute("bad-sense");
|
||||
await pollUntil(() => received.length > 1, 10_000);
|
||||
expect(received[1]).toMatchObject({ senseId: "bad-sense" });
|
||||
|
||||
unsub();
|
||||
await pollUntil(
|
||||
() =>
|
||||
kernel!.logStore.query({
|
||||
source: "sense",
|
||||
type: "compute-complete",
|
||||
refId: "bad-sense",
|
||||
}).length > 0,
|
||||
10_000,
|
||||
);
|
||||
}, 10_000);
|
||||
|
||||
it("error worker sends error messages, kernel still running", async () => {
|
||||
@@ -366,7 +368,6 @@ describe("phase6 — getHealth", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -375,7 +376,6 @@ describe("phase6 — getHealth", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -384,7 +384,6 @@ describe("phase6 — getHealth", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -420,7 +419,6 @@ describe("phase6 — getHealth", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -429,7 +427,6 @@ describe("phase6 — getHealth", () => {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -489,13 +486,16 @@ describe("phase6 — auto-respawn on worker crash", () => {
|
||||
expect(newPid).not.toBeNull();
|
||||
expect(newPid).not.toBe(originalPid);
|
||||
|
||||
// Verify new worker responds
|
||||
const received: Signal[] = [];
|
||||
const unsub = kernel.bus.subscribe((s) => received.push(s));
|
||||
kernel.triggerCompute("cpu-usage");
|
||||
await pollUntil(() => received.length > 0, 10_000);
|
||||
expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
|
||||
unsub();
|
||||
await pollUntil(
|
||||
() =>
|
||||
kernel!.logStore.query({
|
||||
source: "sense",
|
||||
type: "compute-complete",
|
||||
refId: "cpu-usage",
|
||||
}).length > 0,
|
||||
10_000,
|
||||
);
|
||||
|
||||
await kernel.stop();
|
||||
kernel = null;
|
||||
|
||||
@@ -1,281 +1,118 @@
|
||||
import { mkdirSync, mkdtempSync, writeFileSync } from "node:fs";
|
||||
import { mkdtempSync, readFileSync, rmSync, writeFileSync } from "node:fs";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
|
||||
import { DatabaseSync } from "node:sqlite";
|
||||
import { drizzle } from "drizzle-orm/node-sqlite";
|
||||
import { integer, real, sqliteTable } from "drizzle-orm/sqlite-core";
|
||||
import type { SenseComputeFn } from "@uncaged/nerve-core";
|
||||
import { describe, expect, it } from "vitest";
|
||||
|
||||
import { parseParentMessage } from "../ipc.js";
|
||||
import { executeCompute, openSenseDb, runMigrations } from "../sense-runtime.js";
|
||||
import type { DrizzleDB, SenseRuntime } from "../sense-runtime.js";
|
||||
import { executeCompute, readState, writeState } from "../sense-runtime.js";
|
||||
import type { SenseRuntime } from "../sense-runtime.js";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const INIT_SQL = `
|
||||
CREATE TABLE IF NOT EXISTS samples (
|
||||
ts INTEGER PRIMARY KEY,
|
||||
value REAL NOT NULL
|
||||
);
|
||||
`;
|
||||
|
||||
function makeTempMigrationsDir(sql: string): string {
|
||||
const dir = mkdtempSync(join(tmpdir(), "nerve-test-"));
|
||||
writeFileSync(join(dir, "0001_init.sql"), sql);
|
||||
return dir;
|
||||
function makeTempStatePath(): string {
|
||||
const dir = mkdtempSync(join(tmpdir(), "nerve-state-"));
|
||||
return join(dir, "sense.json");
|
||||
}
|
||||
|
||||
function makeTempMigrationsDirEmpty(): string {
|
||||
return mkdtempSync(join(tmpdir(), "nerve-test-empty-"));
|
||||
}
|
||||
|
||||
function makeTempDbPath(): string {
|
||||
const dir = mkdtempSync(join(tmpdir(), "nerve-db-"));
|
||||
return join(dir, "test.db");
|
||||
}
|
||||
|
||||
const samples = sqliteTable("samples", {
|
||||
ts: integer("ts").primaryKey(),
|
||||
value: real("value").notNull(),
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// runMigrations
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("runMigrations", () => {
|
||||
it("creates table via SQL migration file", () => {
|
||||
const sqlite = new DatabaseSync(":memory:");
|
||||
const migrationsDir = makeTempMigrationsDir(INIT_SQL);
|
||||
const result = runMigrations(sqlite, migrationsDir);
|
||||
|
||||
expect(result.ok).toBe(true);
|
||||
|
||||
const row = sqlite
|
||||
.prepare("SELECT name FROM sqlite_master WHERE type='table' AND name='samples'")
|
||||
.get();
|
||||
expect(row).toBeDefined();
|
||||
|
||||
sqlite.close();
|
||||
describe("readState / writeState", () => {
|
||||
it("writeState creates parent dirs and persists JSON", () => {
|
||||
const base = mkdtempSync(join(tmpdir(), "nerve-write-"));
|
||||
const path = join(base, "nested", "a.json");
|
||||
writeState(path, { n: 1 });
|
||||
const raw = readFileSync(path, "utf8");
|
||||
expect(JSON.parse(raw)).toEqual({ n: 1 });
|
||||
rmSync(base, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it("runs multiple migrations in lexicographic order", () => {
|
||||
const sqlite = new DatabaseSync(":memory:");
|
||||
const dir = mkdtempSync(join(tmpdir(), "nerve-multi-"));
|
||||
|
||||
writeFileSync(join(dir, "0001_init.sql"), INIT_SQL);
|
||||
writeFileSync(join(dir, "0002_add_col.sql"), "ALTER TABLE samples ADD COLUMN label TEXT;");
|
||||
|
||||
const result = runMigrations(sqlite, dir);
|
||||
expect(result.ok).toBe(true);
|
||||
|
||||
const info = sqlite.prepare("PRAGMA table_info(samples)").all() as Array<{ name: string }>;
|
||||
const cols = info.map((r) => r.name);
|
||||
expect(cols).toContain("label");
|
||||
|
||||
sqlite.close();
|
||||
it("readState returns initialState when file is missing", () => {
|
||||
const path = join(mkdtempSync(join(tmpdir(), "nerve-missing-")), "none.json");
|
||||
expect(readState(path, { x: 0 })).toEqual({ x: 0 });
|
||||
});
|
||||
|
||||
it("returns ok when migrations directory is empty", () => {
|
||||
const sqlite = new DatabaseSync(":memory:");
|
||||
const dir = makeTempMigrationsDirEmpty();
|
||||
const result = runMigrations(sqlite, dir);
|
||||
expect(result.ok).toBe(true);
|
||||
sqlite.close();
|
||||
it("readState returns parsed JSON when file exists", () => {
|
||||
const dir = mkdtempSync(join(tmpdir(), "nerve-read-"));
|
||||
const path = join(dir, "s.json");
|
||||
writeFileSync(path, JSON.stringify({ count: 3 }), "utf8");
|
||||
expect(readState(path, { count: 0 })).toEqual({ count: 3 });
|
||||
rmSync(dir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it("returns err when migrations directory does not exist", () => {
|
||||
const sqlite = new DatabaseSync(":memory:");
|
||||
const result = runMigrations(sqlite, "/nonexistent/path/migrations");
|
||||
expect(result.ok).toBe(false);
|
||||
sqlite.close();
|
||||
});
|
||||
|
||||
it("returns err when a migration SQL is invalid", () => {
|
||||
const sqlite = new DatabaseSync(":memory:");
|
||||
const dir = mkdtempSync(join(tmpdir(), "nerve-bad-sql-"));
|
||||
writeFileSync(join(dir, "0001_bad.sql"), "NOT VALID SQL !!!;");
|
||||
const result = runMigrations(sqlite, dir);
|
||||
expect(result.ok).toBe(false);
|
||||
sqlite.close();
|
||||
it("readState returns initialState on invalid JSON", () => {
|
||||
const dir = mkdtempSync(join(tmpdir(), "nerve-badjson-"));
|
||||
const path = join(dir, "bad.json");
|
||||
writeFileSync(path, "not json {{{", "utf8");
|
||||
expect(readState(path, { fallback: true })).toEqual({ fallback: true });
|
||||
rmSync(dir, { recursive: true, force: true });
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// openSenseDb
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("openSenseDb", () => {
|
||||
it("creates the db file and runs migrations", () => {
|
||||
const dbPath = makeTempDbPath();
|
||||
const migrationsDir = makeTempMigrationsDir(INIT_SQL);
|
||||
|
||||
const result = openSenseDb(dbPath, migrationsDir);
|
||||
expect(result.ok).toBe(true);
|
||||
|
||||
if (!result.ok) return;
|
||||
const { sqlite } = result.value;
|
||||
const row = sqlite
|
||||
.prepare("SELECT name FROM sqlite_master WHERE type='table' AND name='samples'")
|
||||
.get();
|
||||
expect(row).toBeDefined();
|
||||
sqlite.close();
|
||||
});
|
||||
|
||||
it("returns err when migrations dir is missing", () => {
|
||||
const dbPath = makeTempDbPath();
|
||||
const result = openSenseDb(dbPath, "/nonexistent/migrations");
|
||||
expect(result.ok).toBe(false);
|
||||
});
|
||||
|
||||
it("prunes _signals to retention after every 100 inserts", () => {
|
||||
const dbPath = makeTempDbPath();
|
||||
const migrationsDir = makeTempMigrationsDir(INIT_SQL);
|
||||
const result = openSenseDb(dbPath, migrationsDir, 5);
|
||||
expect(result.ok).toBe(true);
|
||||
if (!result.ok) return;
|
||||
|
||||
const { sqlite, persistSignal } = result.value;
|
||||
for (let i = 0; i < 100; i++) {
|
||||
persistSignal({ n: i });
|
||||
}
|
||||
|
||||
const count = sqlite.prepare("SELECT COUNT(*) AS c FROM _signals").get() as { c: number };
|
||||
expect(count.c).toBe(5);
|
||||
|
||||
sqlite.close();
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// executeCompute
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("executeCompute", () => {
|
||||
function makeRuntime(
|
||||
computeFn: SenseRuntime["compute"],
|
||||
sqlite?: DatabaseSync,
|
||||
): { runtime: SenseRuntime; sqlite: DatabaseSync } {
|
||||
const db_sqlite = sqlite ?? new DatabaseSync(":memory:");
|
||||
if (!sqlite) db_sqlite.exec(INIT_SQL);
|
||||
const db = drizzle({ client: db_sqlite }) as DrizzleDB;
|
||||
compute: SenseComputeFn<{ n: number }>,
|
||||
state: { n: number },
|
||||
statePath?: string,
|
||||
): SenseRuntime {
|
||||
return {
|
||||
runtime: {
|
||||
name: "test-sense",
|
||||
db,
|
||||
compute: computeFn,
|
||||
table: samples,
|
||||
persistSignal: () => {},
|
||||
},
|
||||
sqlite: db_sqlite,
|
||||
name: "test-sense",
|
||||
compute: compute as SenseComputeFn,
|
||||
state,
|
||||
statePath: statePath ?? makeTempStatePath(),
|
||||
};
|
||||
}
|
||||
|
||||
it("returns non-null and inserts into table when compute returns data", async () => {
|
||||
const { runtime, sqlite } = makeRuntime(async () => ({
|
||||
signal: { ts: 1000, value: 0.5 },
|
||||
workflow: null,
|
||||
}));
|
||||
it("passes state into compute and persists returned state", async () => {
|
||||
const path = makeTempStatePath();
|
||||
const runtime = makeRuntime(
|
||||
async (s) => ({ state: { n: s.n + 1 }, workflow: null }),
|
||||
{ n: 0 },
|
||||
path,
|
||||
);
|
||||
|
||||
const result = await executeCompute(runtime);
|
||||
expect(result.ok).toBe(true);
|
||||
if (!result.ok) return;
|
||||
expect(result.value).toEqual({ signal: { ts: 1000, value: 0.5 }, workflow: null });
|
||||
|
||||
const rows = sqlite.prepare("SELECT * FROM samples").all();
|
||||
expect(rows).toHaveLength(1);
|
||||
sqlite.close();
|
||||
});
|
||||
|
||||
it("returns null and does not insert when compute returns null", async () => {
|
||||
const { runtime, sqlite } = makeRuntime(async () => null);
|
||||
|
||||
const result = await executeCompute(runtime);
|
||||
expect(result.ok).toBe(true);
|
||||
if (!result.ok) return;
|
||||
expect(result.value).toBeNull();
|
||||
|
||||
const rows = sqlite.prepare("SELECT * FROM samples").all();
|
||||
expect(rows).toHaveLength(0);
|
||||
sqlite.close();
|
||||
expect(result.value).toEqual({ state: { n: 1 }, workflow: null });
|
||||
expect(runtime.state).toEqual({ n: 1 });
|
||||
expect(JSON.parse(readFileSync(path, "utf8"))).toEqual({ n: 1 });
|
||||
});
|
||||
|
||||
it("returns err when compute throws", async () => {
|
||||
const { runtime, sqlite } = makeRuntime(async () => {
|
||||
throw new Error("something went wrong");
|
||||
});
|
||||
const runtime = makeRuntime(
|
||||
async () => {
|
||||
throw new Error("boom");
|
||||
},
|
||||
{ n: 0 },
|
||||
);
|
||||
|
||||
const result = await executeCompute(runtime);
|
||||
expect(result.ok).toBe(false);
|
||||
if (result.ok) return;
|
||||
expect(result.error.message).toContain("something went wrong");
|
||||
sqlite.close();
|
||||
});
|
||||
|
||||
it("inserts correctly into the sense db from openSenseDb", async () => {
|
||||
const dbPath = makeTempDbPath();
|
||||
const migrationsDir = makeTempMigrationsDir(INIT_SQL);
|
||||
const dbResult = openSenseDb(dbPath, migrationsDir);
|
||||
expect(dbResult.ok).toBe(true);
|
||||
if (!dbResult.ok) return;
|
||||
|
||||
mkdirSync(join(dbPath, "..", "migrations"), { recursive: true });
|
||||
|
||||
const { sqlite: dbSqlite, db } = dbResult.value;
|
||||
const runtime: SenseRuntime = {
|
||||
name: "cpu-usage",
|
||||
db,
|
||||
compute: async () => ({ signal: { ts: 1000, value: 1.23 }, workflow: null }),
|
||||
table: samples,
|
||||
persistSignal: () => {},
|
||||
};
|
||||
|
||||
const result = await executeCompute(runtime);
|
||||
expect(result.ok).toBe(true);
|
||||
|
||||
const rows = dbSqlite.prepare("SELECT * FROM samples").all() as Array<{
|
||||
ts: number;
|
||||
value: number;
|
||||
}>;
|
||||
expect(rows).toHaveLength(1);
|
||||
expect(rows[0].ts).toBe(1000);
|
||||
expect(rows[0].value).toBe(1.23);
|
||||
dbSqlite.close();
|
||||
expect(result.error.message).toContain("boom");
|
||||
});
|
||||
|
||||
it("returns err when compute exceeds timeoutMs", async () => {
|
||||
const { runtime, sqlite } = makeRuntime(
|
||||
() => new Promise<null>((resolve) => setTimeout(() => resolve(null), 5_000)),
|
||||
const runtime = makeRuntime(
|
||||
async (s) =>
|
||||
new Promise((resolve) => setTimeout(() => resolve({ state: s, workflow: null }), 5_000)),
|
||||
{ n: 0 },
|
||||
);
|
||||
|
||||
const result = await executeCompute(runtime, 50);
|
||||
expect(result.ok).toBe(false);
|
||||
if (result.ok) return;
|
||||
expect(result.error.message).toMatch(/timed out/i);
|
||||
sqlite.close();
|
||||
});
|
||||
|
||||
it("completes within timeout when compute is fast", async () => {
|
||||
const { runtime, sqlite } = makeRuntime(async () => ({
|
||||
signal: { ts: 1, value: 42 },
|
||||
workflow: null,
|
||||
}));
|
||||
const runtime = makeRuntime(async (s) => ({ state: { n: s.n }, workflow: null }), { n: 42 });
|
||||
const result = await executeCompute(runtime, 5_000);
|
||||
expect(result.ok).toBe(true);
|
||||
if (!result.ok) return;
|
||||
expect(result.value).toEqual({ signal: { ts: 1, value: 42 }, workflow: null });
|
||||
sqlite.close();
|
||||
expect(result.value.state).toEqual({ n: 42 });
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// parseParentMessage (IPC validation)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("parseParentMessage", () => {
|
||||
describe("parseParentMessage (IPC validation)", () => {
|
||||
it("accepts a valid compute message", () => {
|
||||
const result = parseParentMessage({ type: "compute", sense: "cpu" });
|
||||
expect(result.ok).toBe(true);
|
||||
@@ -310,47 +147,3 @@ describe("parseParentMessage", () => {
|
||||
expect(result.error.message).toMatch(/unknown/i);
|
||||
});
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// runMigrations – journal (idempotency)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("runMigrations journal", () => {
|
||||
it("does not re-run an already-applied migration", () => {
|
||||
const sqlite = new DatabaseSync(":memory:");
|
||||
const dir = mkdtempSync(join(tmpdir(), "nerve-journal-"));
|
||||
writeFileSync(join(dir, "0001_init.sql"), INIT_SQL);
|
||||
|
||||
const first = runMigrations(sqlite, dir);
|
||||
expect(first.ok).toBe(true);
|
||||
|
||||
sqlite.exec("INSERT INTO samples (ts, value) VALUES (1, 1.0)");
|
||||
|
||||
const nonIdempotentSql = "CREATE TABLE samples2 (id INTEGER PRIMARY KEY)";
|
||||
writeFileSync(join(dir, "0002_samples2.sql"), nonIdempotentSql);
|
||||
|
||||
const second = runMigrations(sqlite, dir);
|
||||
expect(second.ok).toBe(true);
|
||||
|
||||
const third = runMigrations(sqlite, dir);
|
||||
expect(third.ok).toBe(true);
|
||||
|
||||
sqlite.close();
|
||||
});
|
||||
|
||||
it("tracks migrations in _migrations table", () => {
|
||||
const sqlite = new DatabaseSync(":memory:");
|
||||
const dir = mkdtempSync(join(tmpdir(), "nerve-journal2-"));
|
||||
writeFileSync(join(dir, "0001_init.sql"), INIT_SQL);
|
||||
|
||||
runMigrations(sqlite, dir);
|
||||
|
||||
const rows = sqlite.prepare("SELECT name FROM _migrations ORDER BY name").all() as Array<{
|
||||
name: string;
|
||||
}>;
|
||||
expect(rows).toHaveLength(1);
|
||||
expect(rows[0].name).toBe("0001_init.sql");
|
||||
|
||||
sqlite.close();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
import type { NerveConfig, Signal } from "@uncaged/nerve-core";
|
||||
import type { NerveConfig } from "@uncaged/nerve-core";
|
||||
import { createSenseScheduler } from "../sense-scheduler.js";
|
||||
import { createSignalBus } from "../signal-bus.js";
|
||||
|
||||
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
return {
|
||||
@@ -12,7 +11,6 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -25,10 +23,6 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
};
|
||||
}
|
||||
|
||||
function makeSignal(senseId: string, payload: unknown = 1): Signal {
|
||||
return { id: 1, senseId, payload, timestamp: Date.now() };
|
||||
}
|
||||
|
||||
describe("SenseScheduler — throttle + pending deferred trigger", () => {
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
@@ -47,27 +41,21 @@ describe("SenseScheduler — throttle + pending deferred trigger", () => {
|
||||
throttle: 2000,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: ["cpu-usage"],
|
||||
},
|
||||
},
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createSenseScheduler(config, bus, (name) => triggered.push(name));
|
||||
const scheduler = createSenseScheduler(config, (name) => triggered.push(name));
|
||||
|
||||
// First trigger fires immediately (outside throttle window)
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onSenseCompleted("cpu-usage");
|
||||
scheduler.onComputeComplete("cpu-usage");
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
// Second trigger arrives 500ms later — still within throttle window (2000ms)
|
||||
vi.advanceTimersByTime(500);
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
// Should not fire yet (throttled)
|
||||
scheduler.onSenseCompleted("cpu-usage");
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
// Advance past the throttle window end; deferred trigger should now fire
|
||||
vi.advanceTimersByTime(1600);
|
||||
expect(triggered.length).toBe(2);
|
||||
|
||||
@@ -83,30 +71,25 @@ describe("SenseScheduler — throttle + pending deferred trigger", () => {
|
||||
throttle: 2000,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: ["cpu-usage"],
|
||||
},
|
||||
},
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createSenseScheduler(config, bus, (name) => triggered.push(name));
|
||||
const scheduler = createSenseScheduler(config, (name) => triggered.push(name));
|
||||
|
||||
// First trigger fires
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onSenseCompleted("cpu-usage");
|
||||
scheduler.onComputeComplete("cpu-usage");
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
// Multiple triggers within throttle window — should not stack
|
||||
vi.advanceTimersByTime(300);
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onSenseCompleted("cpu-usage");
|
||||
vi.advanceTimersByTime(300);
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onSenseCompleted("cpu-usage");
|
||||
vi.advanceTimersByTime(300);
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onSenseCompleted("cpu-usage");
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
// Advance past window — exactly one deferred trigger fires
|
||||
vi.advanceTimersByTime(1200);
|
||||
scheduler.onComputeComplete("cpu-usage");
|
||||
expect(triggered.length).toBe(2);
|
||||
@@ -123,28 +106,23 @@ describe("SenseScheduler — throttle + pending deferred trigger", () => {
|
||||
throttle: 2000,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: ["cpu-usage"],
|
||||
},
|
||||
},
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createSenseScheduler(config, bus, (name) => triggered.push(name));
|
||||
const scheduler = createSenseScheduler(config, (name) => triggered.push(name));
|
||||
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onSenseCompleted("cpu-usage");
|
||||
scheduler.onComputeComplete("cpu-usage");
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
// Trigger during throttle window — schedules deferred
|
||||
vi.advanceTimersByTime(500);
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onSenseCompleted("cpu-usage");
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
// Stop before window ends
|
||||
scheduler.stop();
|
||||
|
||||
// Advance past window — deferred timer should have been cleared
|
||||
vi.advanceTimersByTime(2000);
|
||||
expect(triggered.length).toBe(1);
|
||||
});
|
||||
|
||||
@@ -1,12 +1,7 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
import type { NerveConfig, Signal } from "@uncaged/nerve-core";
|
||||
import type { NerveConfig } from "@uncaged/nerve-core";
|
||||
import { createSenseScheduler } from "../sense-scheduler.js";
|
||||
import { createSignalBus } from "../signal-bus.js";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
return {
|
||||
@@ -16,7 +11,6 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -25,7 +19,6 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -34,7 +27,6 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
@@ -47,14 +39,6 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
};
|
||||
}
|
||||
|
||||
function makeSignal(senseId: string, payload: unknown = 1): Signal {
|
||||
return { id: 1, senseId, payload, timestamp: Date.now() };
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("SenseScheduler — interval schedule", () => {
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
@@ -72,14 +56,11 @@ describe("SenseScheduler — interval schedule", () => {
|
||||
"cpu-usage": { ...base.senses["cpu-usage"], interval: 1000, on: [] },
|
||||
},
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
// Use a ref so the triggerFn can call back into the scheduler
|
||||
const ref: { scheduler: ReturnType<typeof createSenseScheduler> | null } = {
|
||||
scheduler: null,
|
||||
};
|
||||
const scheduler = createSenseScheduler(config, bus, (name) => {
|
||||
const scheduler = createSenseScheduler(config, (name) => {
|
||||
triggered.push(name);
|
||||
// Immediately complete compute so the scheduler is not blocked by in-flight state
|
||||
ref.scheduler?.onComputeComplete(name);
|
||||
});
|
||||
ref.scheduler = scheduler;
|
||||
@@ -101,11 +82,10 @@ describe("SenseScheduler — interval schedule", () => {
|
||||
"cpu-usage": { ...base.senses["cpu-usage"], interval: 500, on: [] },
|
||||
},
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const ref: { scheduler: ReturnType<typeof createSenseScheduler> | null } = {
|
||||
scheduler: null,
|
||||
};
|
||||
const scheduler = createSenseScheduler(config, bus, (name) => {
|
||||
const scheduler = createSenseScheduler(config, (name) => {
|
||||
triggered.push(name);
|
||||
ref.scheduler?.onComputeComplete(name);
|
||||
});
|
||||
@@ -128,10 +108,8 @@ describe("SenseScheduler — interval schedule", () => {
|
||||
"cpu-usage": { ...base.senses["cpu-usage"], interval: 1000, on: [] },
|
||||
},
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createSenseScheduler(config, bus, (name) => triggered.push(name));
|
||||
const scheduler = createSenseScheduler(config, (name) => triggered.push(name));
|
||||
|
||||
// Only advance 500ms — should be 0 triggers (not catching up)
|
||||
vi.advanceTimersByTime(500);
|
||||
expect(triggered.length).toBe(0);
|
||||
|
||||
@@ -140,7 +118,7 @@ describe("SenseScheduler — interval schedule", () => {
|
||||
});
|
||||
|
||||
describe("SenseScheduler — event (on) schedule", () => {
|
||||
it("triggers target sense when watched sense emits a signal", () => {
|
||||
it("triggers target sense when watched sense completes", () => {
|
||||
const triggered: string[] = [];
|
||||
const base = makeConfig();
|
||||
const config = makeConfig({
|
||||
@@ -153,12 +131,11 @@ describe("SenseScheduler — event (on) schedule", () => {
|
||||
},
|
||||
},
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createSenseScheduler(config, bus, (name) => triggered.push(name));
|
||||
const scheduler = createSenseScheduler(config, (name) => triggered.push(name));
|
||||
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onSenseCompleted("cpu-usage");
|
||||
scheduler.onComputeComplete("system-health");
|
||||
bus.emit(makeSignal("disk-usage"));
|
||||
scheduler.onSenseCompleted("disk-usage");
|
||||
scheduler.onComputeComplete("system-health");
|
||||
|
||||
expect(triggered.length).toBe(2);
|
||||
@@ -167,7 +144,7 @@ describe("SenseScheduler — event (on) schedule", () => {
|
||||
scheduler.stop();
|
||||
});
|
||||
|
||||
it("does not trigger for signals from non-watched senses", () => {
|
||||
it("does not trigger for completions of non-watched senses", () => {
|
||||
const triggered: string[] = [];
|
||||
const base = makeConfig();
|
||||
const config = makeConfig({
|
||||
@@ -176,10 +153,9 @@ describe("SenseScheduler — event (on) schedule", () => {
|
||||
"system-health": { ...base.senses["system-health"], interval: null, on: ["cpu-usage"] },
|
||||
},
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createSenseScheduler(config, bus, (name) => triggered.push(name));
|
||||
const scheduler = createSenseScheduler(config, (name) => triggered.push(name));
|
||||
|
||||
bus.emit(makeSignal("disk-usage"));
|
||||
scheduler.onSenseCompleted("disk-usage");
|
||||
|
||||
expect(triggered.length).toBe(0);
|
||||
|
||||
@@ -195,11 +171,10 @@ describe("SenseScheduler — event (on) schedule", () => {
|
||||
"system-health": { ...base.senses["system-health"], interval: null, on: ["cpu-usage"] },
|
||||
},
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createSenseScheduler(config, bus, (name) => triggered.push(name));
|
||||
const scheduler = createSenseScheduler(config, (name) => triggered.push(name));
|
||||
|
||||
scheduler.stop();
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onSenseCompleted("cpu-usage");
|
||||
|
||||
expect(triggered.length).toBe(0);
|
||||
});
|
||||
@@ -222,20 +197,17 @@ describe("SenseScheduler — throttle", () => {
|
||||
throttle: 2000,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: ["cpu-usage"],
|
||||
},
|
||||
},
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createSenseScheduler(config, bus, (name) => triggered.push(name));
|
||||
const scheduler = createSenseScheduler(config, (name) => triggered.push(name));
|
||||
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onSenseCompleted("cpu-usage");
|
||||
scheduler.onComputeComplete("cpu-usage");
|
||||
|
||||
// Immediately trigger again — within throttle window
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onSenseCompleted("cpu-usage");
|
||||
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
@@ -251,20 +223,18 @@ describe("SenseScheduler — throttle", () => {
|
||||
throttle: 1000,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: ["cpu-usage"],
|
||||
},
|
||||
},
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createSenseScheduler(config, bus, (name) => triggered.push(name));
|
||||
const scheduler = createSenseScheduler(config, (name) => triggered.push(name));
|
||||
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onSenseCompleted("cpu-usage");
|
||||
scheduler.onComputeComplete("cpu-usage");
|
||||
|
||||
vi.advanceTimersByTime(1500);
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onSenseCompleted("cpu-usage");
|
||||
scheduler.onComputeComplete("cpu-usage");
|
||||
|
||||
expect(triggered.length).toBe(2);
|
||||
@@ -283,24 +253,19 @@ describe("SenseScheduler — merge/coalesce", () => {
|
||||
"system-health": { ...base.senses["system-health"], interval: null, on: ["cpu-usage"] },
|
||||
},
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createSenseScheduler(config, bus, (name) => triggered.push(name));
|
||||
const scheduler = createSenseScheduler(config, (name) => triggered.push(name));
|
||||
|
||||
// First trigger starts compute
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onSenseCompleted("cpu-usage");
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
// Three more arrive while first is in-flight — all should coalesce to one pending
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onSenseCompleted("cpu-usage");
|
||||
scheduler.onSenseCompleted("cpu-usage");
|
||||
scheduler.onSenseCompleted("cpu-usage");
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
// Complete first compute → pending drains as exactly one more run
|
||||
scheduler.onComputeComplete("system-health");
|
||||
expect(triggered.length).toBe(2);
|
||||
|
||||
// Complete second compute → no more pending
|
||||
scheduler.onComputeComplete("system-health");
|
||||
expect(triggered.length).toBe(2);
|
||||
|
||||
@@ -316,13 +281,11 @@ describe("SenseScheduler — merge/coalesce", () => {
|
||||
"system-health": { ...base.senses["system-health"], interval: null, on: ["cpu-usage"] },
|
||||
},
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createSenseScheduler(config, bus, (name) => triggered.push(name));
|
||||
const scheduler = createSenseScheduler(config, (name) => triggered.push(name));
|
||||
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onSenseCompleted("cpu-usage");
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
// Complete with no pending
|
||||
scheduler.onComputeComplete("system-health");
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
@@ -351,14 +314,11 @@ describe("SenseScheduler — interval + on combined", () => {
|
||||
},
|
||||
},
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createSenseScheduler(config, bus, (name) => triggered.push(name));
|
||||
const scheduler = createSenseScheduler(config, (name) => triggered.push(name));
|
||||
|
||||
// Event trigger
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onSenseCompleted("cpu-usage");
|
||||
scheduler.onComputeComplete("system-health");
|
||||
|
||||
// Interval trigger
|
||||
vi.advanceTimersByTime(1000);
|
||||
scheduler.onComputeComplete("system-health");
|
||||
|
||||
|
||||
@@ -1,99 +0,0 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
|
||||
import type { Signal } from "@uncaged/nerve-core";
|
||||
import { createSignalBus } from "../signal-bus.js";
|
||||
|
||||
function makeSignal(senseId: string, payload: unknown = 1): Signal {
|
||||
return { id: 1, senseId, payload, timestamp: Date.now() };
|
||||
}
|
||||
|
||||
describe("createSignalBus", () => {
|
||||
it("delivers emitted signal to a subscriber", () => {
|
||||
const bus = createSignalBus();
|
||||
const received: Signal[] = [];
|
||||
bus.subscribe((s) => received.push(s));
|
||||
|
||||
const sig = makeSignal("cpu-usage", 42);
|
||||
bus.emit(sig);
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0]).toBe(sig);
|
||||
});
|
||||
|
||||
it("delivers to multiple subscribers", () => {
|
||||
const bus = createSignalBus();
|
||||
const a: Signal[] = [];
|
||||
const b: Signal[] = [];
|
||||
bus.subscribe((s) => a.push(s));
|
||||
bus.subscribe((s) => b.push(s));
|
||||
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
|
||||
expect(a).toHaveLength(1);
|
||||
expect(b).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("unsubscribe stops delivery", () => {
|
||||
const bus = createSignalBus();
|
||||
const received: Signal[] = [];
|
||||
const unsub = bus.subscribe((s) => received.push(s));
|
||||
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
unsub();
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("remaining subscribers still receive after one unsubscribes", () => {
|
||||
const bus = createSignalBus();
|
||||
const a: Signal[] = [];
|
||||
const b: Signal[] = [];
|
||||
const unsubA = bus.subscribe((s) => a.push(s));
|
||||
bus.subscribe((s) => b.push(s));
|
||||
|
||||
unsubA();
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
|
||||
expect(a).toHaveLength(0);
|
||||
expect(b).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("emit with no subscribers does nothing", () => {
|
||||
const bus = createSignalBus();
|
||||
expect(() => bus.emit(makeSignal("cpu-usage"))).not.toThrow();
|
||||
});
|
||||
|
||||
it("dispatch is synchronous", () => {
|
||||
const bus = createSignalBus();
|
||||
const order: string[] = [];
|
||||
bus.subscribe(() => order.push("handler"));
|
||||
order.push("before");
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
order.push("after");
|
||||
expect(order).toEqual(["before", "handler", "after"]);
|
||||
});
|
||||
|
||||
it("handler exceptions don't prevent other handlers from running", () => {
|
||||
const bus = createSignalBus();
|
||||
const received: Signal[] = [];
|
||||
bus.subscribe(() => {
|
||||
throw new Error("boom");
|
||||
});
|
||||
bus.subscribe((s) => received.push(s));
|
||||
|
||||
expect(() => bus.emit(makeSignal("cpu-usage"))).toThrow("boom");
|
||||
expect(received).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("same handler can be subscribed once and fires once per emit", () => {
|
||||
const bus = createSignalBus();
|
||||
const handler = vi.fn();
|
||||
bus.subscribe(handler);
|
||||
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
|
||||
expect(handler).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
@@ -80,8 +80,18 @@ describe("createSenseWorkerPool", () => {
|
||||
await startWorkerWithReady(pool, "g1");
|
||||
expect(mockChildren).toHaveLength(1);
|
||||
const child = mockChildren[0];
|
||||
child.emit("message", { type: "signal", sense: "s", payload: 1 });
|
||||
expect(onWorkerMessage).toHaveBeenCalledWith({ type: "signal", sense: "s", payload: 1 });
|
||||
child.emit("message", {
|
||||
type: "compute-result",
|
||||
sense: "s",
|
||||
state: 1,
|
||||
workflow: null,
|
||||
});
|
||||
expect(onWorkerMessage).toHaveBeenCalledWith({
|
||||
type: "compute-result",
|
||||
sense: "s",
|
||||
state: 1,
|
||||
workflow: null,
|
||||
});
|
||||
});
|
||||
|
||||
it("sendCompute delivers to the worker for that group", async () => {
|
||||
|
||||
@@ -191,7 +191,6 @@
|
||||
<th>Name</th>
|
||||
<th>Type</th>
|
||||
<th>Triggers</th>
|
||||
<th>Last signal</th>
|
||||
<th></th>
|
||||
</tr>
|
||||
</thead>
|
||||
@@ -343,7 +342,7 @@
|
||||
if (!list.length) {
|
||||
var tr = document.createElement('tr');
|
||||
var td = document.createElement('td');
|
||||
td.colSpan = 5;
|
||||
td.colSpan = 4;
|
||||
td.style.color = 'var(--muted)';
|
||||
td.textContent = 'No senses registered.';
|
||||
tr.appendChild(td);
|
||||
@@ -353,12 +352,10 @@
|
||||
list.forEach(function (s) {
|
||||
var tr = document.createElement('tr');
|
||||
var triggers = (s.triggers && s.triggers.length) ? s.triggers.join('; ') : '—';
|
||||
var last = s.lastSignalTimestamp != null ? fmtTime(s.lastSignalTimestamp) : '—';
|
||||
tr.innerHTML =
|
||||
'<td>' + escapeHtml(s.name) + '</td>' +
|
||||
'<td>' + escapeHtml(String(s.group || '—')) + '</td>' +
|
||||
'<td>' + escapeHtml(triggers) + '</td>' +
|
||||
'<td>' + escapeHtml(last) + '</td>' +
|
||||
'<td></td>';
|
||||
var tdBtn = tr.querySelector('td:last-child');
|
||||
var b = document.createElement('button');
|
||||
|
||||
@@ -4,7 +4,7 @@ export type {
|
||||
HealthRequestMessage,
|
||||
HealthResponseMessage,
|
||||
ParentToWorkerMessage,
|
||||
SignalMessage,
|
||||
ComputeResultMessage,
|
||||
ErrorMessage,
|
||||
ReadyMessage,
|
||||
WorkerToParentMessage,
|
||||
@@ -15,16 +15,7 @@ export type {
|
||||
ThreadWorkflowMessageMessage,
|
||||
} from "./ipc.js";
|
||||
|
||||
export type { SignalBus, SignalHandler, Unsubscribe } from "./signal-bus.js";
|
||||
|
||||
export type { DrizzleDB, SenseRuntime } from "./sense-runtime.js";
|
||||
|
||||
export {
|
||||
runMigrations,
|
||||
openSenseDb,
|
||||
loadSenseModule,
|
||||
executeCompute,
|
||||
} from "./sense-runtime.js";
|
||||
export { loadSenseModule, executeCompute, readState, writeState } from "./sense-runtime.js";
|
||||
|
||||
export { createKernel } from "./kernel.js";
|
||||
export type { Kernel, KernelOptions, KernelHealth } from "./kernel.js";
|
||||
|
||||
+31
-14
@@ -4,7 +4,7 @@
|
||||
*/
|
||||
|
||||
import type { Result, WorkflowTrigger } from "@uncaged/nerve-core";
|
||||
import { err, isPlainRecord, ok } from "@uncaged/nerve-core";
|
||||
import { err, isPlainRecord, ok, parseWorkflowTrigger } from "@uncaged/nerve-core";
|
||||
|
||||
/** Parent → Worker: trigger one compute cycle for a sense */
|
||||
export type ComputeMessage = {
|
||||
@@ -65,11 +65,12 @@ export type ParentToWorkerMessage =
|
||||
| ResumeThreadMessage
|
||||
| KillThreadMessage;
|
||||
|
||||
/** Worker → Parent: compute produced a signal */
|
||||
export type SignalMessage = {
|
||||
type: "signal";
|
||||
/** Worker → Parent: sense compute finished (state persisted in worker; workflow optional). */
|
||||
export type ComputeResultMessage = {
|
||||
type: "compute-result";
|
||||
sense: string;
|
||||
payload: unknown;
|
||||
state: unknown;
|
||||
workflow: WorkflowTrigger | null;
|
||||
};
|
||||
|
||||
/** Worker → Parent: sense compute result includes a workflow to start */
|
||||
@@ -140,7 +141,7 @@ export type ThreadWorkflowMessageMessage = {
|
||||
|
||||
/** Union of all messages a worker sends to the parent */
|
||||
export type WorkerToParentMessage =
|
||||
| SignalMessage
|
||||
| ComputeResultMessage
|
||||
| ErrorMessage
|
||||
| ReadyMessage
|
||||
| HealthResponseMessage
|
||||
@@ -247,17 +248,33 @@ export function parseParentMessage(raw: unknown): Result<ParentToWorkerMessage>
|
||||
}
|
||||
}
|
||||
|
||||
function parseSignalMsg(obj: Record<string, unknown>): Result<WorkerToParentMessage> {
|
||||
function parseComputeResultMsg(obj: Record<string, unknown>): Result<WorkerToParentMessage> {
|
||||
if (typeof obj.sense !== "string") {
|
||||
return err(new Error("Worker 'signal' message missing string 'sense' field"));
|
||||
return err(new Error("Worker 'compute-result' message missing string 'sense' field"));
|
||||
}
|
||||
if (!("payload" in obj)) {
|
||||
return err(new Error("Worker 'signal' message missing 'payload' field"));
|
||||
if (!("state" in obj)) {
|
||||
return err(new Error("Worker 'compute-result' message missing 'state' field"));
|
||||
}
|
||||
if (!("workflow" in obj)) {
|
||||
return err(new Error("Worker 'compute-result' message missing 'workflow' field"));
|
||||
}
|
||||
const wfRaw = obj.workflow;
|
||||
if (wfRaw !== null && !isPlainRecord(wfRaw)) {
|
||||
return err(new Error("Worker 'compute-result' workflow must be an object or null"));
|
||||
}
|
||||
let workflow: WorkflowTrigger | null;
|
||||
if (wfRaw === null) {
|
||||
workflow = null;
|
||||
} else {
|
||||
const parsed = parseWorkflowTrigger(wfRaw);
|
||||
if (!parsed.ok) return err(parsed.error);
|
||||
workflow = parsed.value;
|
||||
}
|
||||
return ok({
|
||||
type: "signal",
|
||||
type: "compute-result",
|
||||
sense: obj.sense,
|
||||
payload: obj.payload,
|
||||
state: obj.state,
|
||||
workflow,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -341,7 +358,7 @@ function parseWorkflowErrorMsg(obj: Record<string, unknown>): Result<WorkerToPar
|
||||
}
|
||||
|
||||
const WORKER_MSG_TYPES = new Set([
|
||||
"signal",
|
||||
"compute-result",
|
||||
"error",
|
||||
"ready",
|
||||
"health-response",
|
||||
@@ -428,7 +445,7 @@ export function parseWorkerMessage(raw: unknown): Result<WorkerToParentMessage>
|
||||
if (!WORKER_MSG_TYPES.has(obj.type)) {
|
||||
return err(new Error(`Unknown worker IPC message type: "${obj.type}"`));
|
||||
}
|
||||
if (obj.type === "signal") return parseSignalMsg(obj);
|
||||
if (obj.type === "compute-result") return parseComputeResultMsg(obj);
|
||||
if (obj.type === "error") return parseErrorMsg(obj);
|
||||
if (obj.type === "health-response") return parseHealthResponseMsg(obj);
|
||||
if (obj.type === "thread-event") return parseThreadEventMsg(obj);
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Kernel — ties sense workers, signal bus, sense scheduler, workflow manager,
|
||||
* Kernel — ties sense workers, sense scheduler, workflow manager,
|
||||
* optional file watcher, and daemon IPC.
|
||||
*/
|
||||
|
||||
@@ -12,10 +12,9 @@ import {
|
||||
type HealthInfo,
|
||||
type NerveConfig,
|
||||
type SenseInfo,
|
||||
type Signal,
|
||||
type WorkflowTrigger,
|
||||
senseTriggerLabels,
|
||||
} from "@uncaged/nerve-core";
|
||||
import { routeSenseComputeOutput } from "@uncaged/nerve-core";
|
||||
|
||||
import { createLogStore } from "@uncaged/nerve-store";
|
||||
import type { LogStore } from "@uncaged/nerve-store";
|
||||
@@ -36,8 +35,6 @@ import {
|
||||
} from "./kernel-sense-groups.js";
|
||||
import { createSenseScheduler } from "./sense-scheduler.js";
|
||||
import type { SenseScheduler } from "./sense-scheduler.js";
|
||||
import { createSignalBus } from "./signal-bus.js";
|
||||
import type { SignalBus } from "./signal-bus.js";
|
||||
import { createSenseWorkerPool, resolveWorkerScript } from "./worker-pool.js";
|
||||
import { createWorkflowManager } from "./workflow-manager.js";
|
||||
import type { WorkflowManager } from "./workflow-manager.js";
|
||||
@@ -55,7 +52,6 @@ export type Kernel = {
|
||||
stop: () => Promise<void>;
|
||||
groups: Set<string>;
|
||||
senseCount: number;
|
||||
bus: SignalBus;
|
||||
logStore: LogStore;
|
||||
workflowManager: WorkflowManager;
|
||||
ready: Promise<void>;
|
||||
@@ -110,7 +106,6 @@ export function createKernel(
|
||||
nerveRoot: string,
|
||||
options: KernelOptions = defaultKernelOptions(),
|
||||
): Kernel {
|
||||
const bus: SignalBus = createSignalBus();
|
||||
const workerScript = options.workerScript ?? resolveWorkerScript();
|
||||
const startTime = Date.now();
|
||||
const startedAtIso = new Date(startTime).toISOString();
|
||||
@@ -127,12 +122,6 @@ export function createKernel(
|
||||
|
||||
let config = initialConfig;
|
||||
|
||||
let _signalIdCounter = 0;
|
||||
function nextSignalId(): number {
|
||||
_signalIdCounter += 1;
|
||||
return _signalIdCounter;
|
||||
}
|
||||
|
||||
const workflowManager = createWorkflowManager(nerveRoot, config, logStore);
|
||||
|
||||
const groups = new Set<string>();
|
||||
@@ -156,38 +145,14 @@ export function createKernel(
|
||||
}
|
||||
}
|
||||
|
||||
function handleSenseWorkerSignal(senseName: string, payload: unknown): void {
|
||||
const routeResult = routeSenseComputeOutput(payload);
|
||||
if (!routeResult.ok) {
|
||||
process.stderr.write(
|
||||
`[kernel] sense "${senseName}" invalid compute payload: ${routeResult.error.message}\n`,
|
||||
);
|
||||
logStore.append({
|
||||
source: "sense",
|
||||
type: "error",
|
||||
refId: senseName,
|
||||
payload: JSON.stringify({ error: routeResult.error.message }),
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
scheduler.onComputeComplete(senseName);
|
||||
return;
|
||||
}
|
||||
const { signal: signalPayload, workflow } = routeResult.value;
|
||||
|
||||
const signal: Signal = {
|
||||
id: nextSignalId(),
|
||||
senseId: senseName,
|
||||
payload: signalPayload,
|
||||
timestamp: Date.now(),
|
||||
};
|
||||
function handleComputeResult(senseName: string, workflow: WorkflowTrigger | null): void {
|
||||
logStore.append({
|
||||
source: "sense",
|
||||
type: "signal",
|
||||
type: "compute-complete",
|
||||
refId: senseName,
|
||||
payload: JSON.stringify(signalPayload),
|
||||
timestamp: signal.timestamp,
|
||||
payload: null,
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
bus.emit(signal);
|
||||
|
||||
if (workflow !== null) {
|
||||
workflowManager.startWorkflow(workflow.name, {
|
||||
@@ -204,6 +169,7 @@ export function createKernel(
|
||||
});
|
||||
}
|
||||
scheduler.onComputeComplete(senseName);
|
||||
scheduler.onSenseCompleted(senseName);
|
||||
}
|
||||
|
||||
function handleWorkerMessage(raw: unknown): void {
|
||||
@@ -235,8 +201,8 @@ export function createKernel(
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.type === "signal") {
|
||||
handleSenseWorkerSignal(msg.sense, msg.payload);
|
||||
if (msg.type === "compute-result") {
|
||||
handleComputeResult(msg.sense, msg.workflow);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -270,7 +236,7 @@ export function createKernel(
|
||||
senseWorkerPool.sendCompute(group, senseName);
|
||||
}
|
||||
|
||||
scheduler = createSenseScheduler(config, bus, triggerFn, {
|
||||
scheduler = createSenseScheduler(config, triggerFn, {
|
||||
logStore,
|
||||
});
|
||||
|
||||
@@ -306,7 +272,7 @@ export function createKernel(
|
||||
const oldWorkflows = config.workflows;
|
||||
config = newConfig;
|
||||
scheduler.stop();
|
||||
scheduler = createSenseScheduler(config, bus, triggerFn, {
|
||||
scheduler = createSenseScheduler(config, triggerFn, {
|
||||
logStore,
|
||||
});
|
||||
workflowManager.updateConfig(newConfig);
|
||||
@@ -388,22 +354,13 @@ export function createKernel(
|
||||
workflowManager,
|
||||
triggerSense,
|
||||
listSenses(): SenseInfo[] {
|
||||
return Object.entries(config.senses).map(([name, senseConfig]) => {
|
||||
const entries = logStore.query({
|
||||
source: "sense",
|
||||
type: "signal",
|
||||
refId: name,
|
||||
});
|
||||
const lastEntry = entries.length > 0 ? entries[entries.length - 1] : null;
|
||||
return {
|
||||
name,
|
||||
group: senseConfig.group,
|
||||
throttle: senseConfig.throttle,
|
||||
timeout: senseConfig.timeout,
|
||||
triggers: senseTriggerLabels(name, config.senses),
|
||||
lastSignalTimestamp: lastEntry !== null ? lastEntry.timestamp : null,
|
||||
};
|
||||
});
|
||||
return Object.entries(config.senses).map(([name, senseConfig]) => ({
|
||||
name,
|
||||
group: senseConfig.group,
|
||||
throttle: senseConfig.throttle,
|
||||
timeout: senseConfig.timeout,
|
||||
triggers: senseTriggerLabels(name, config.senses),
|
||||
}));
|
||||
},
|
||||
getHealthInfo: getDaemonHealth,
|
||||
getDefaultMaxRounds: () => config.maxRounds,
|
||||
@@ -468,7 +425,6 @@ export function createKernel(
|
||||
stop,
|
||||
groups,
|
||||
senseCount,
|
||||
bus,
|
||||
logStore,
|
||||
workflowManager,
|
||||
ready,
|
||||
|
||||
@@ -1,172 +1,38 @@
|
||||
import { mkdirSync, readFileSync, readdirSync } from "node:fs";
|
||||
import { dirname, join } from "node:path";
|
||||
import { DatabaseSync } from "node:sqlite";
|
||||
import { mkdirSync, readFileSync, writeFileSync } from "node:fs";
|
||||
import { dirname } from "node:path";
|
||||
|
||||
import { drizzle } from "drizzle-orm/node-sqlite";
|
||||
import type { NodeSQLiteDatabase } from "drizzle-orm/node-sqlite";
|
||||
import type { SQLiteTable } from "drizzle-orm/sqlite-core";
|
||||
|
||||
import type { ComputeResult, Result, SenseComputeFn } from "@uncaged/nerve-core";
|
||||
import { DEFAULT_SENSE_SIGNAL_RETENTION, err, isPlainRecord, ok } from "@uncaged/nerve-core";
|
||||
|
||||
/** A Drizzle DB instance (schema-generic) */
|
||||
export type DrizzleDB = NodeSQLiteDatabase<Record<string, never>>;
|
||||
import type { Result, SenseComputeFn, WorkflowTrigger } from "@uncaged/nerve-core";
|
||||
import { err, isPlainRecord, ok } from "@uncaged/nerve-core";
|
||||
|
||||
/** All state held for one sense inside a worker */
|
||||
export type SenseRuntime = {
|
||||
name: string;
|
||||
db: DrizzleDB;
|
||||
compute: SenseComputeFn;
|
||||
table: SQLiteTable;
|
||||
persistSignal: (payload: unknown) => void;
|
||||
state: unknown;
|
||||
statePath: string;
|
||||
};
|
||||
|
||||
function ensureMigrationsTable(sqlite: DatabaseSync): Result<void> {
|
||||
export function readState(statePath: string, initialState: unknown): unknown {
|
||||
try {
|
||||
sqlite.exec(
|
||||
`CREATE TABLE IF NOT EXISTS _migrations (
|
||||
name TEXT PRIMARY KEY,
|
||||
applied_at INTEGER NOT NULL
|
||||
)`,
|
||||
);
|
||||
return ok(undefined);
|
||||
} catch (e) {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
return err(new Error(`Failed to create _migrations table: ${msg}`));
|
||||
const raw = readFileSync(statePath, "utf8");
|
||||
return JSON.parse(raw);
|
||||
} catch {
|
||||
return initialState;
|
||||
}
|
||||
}
|
||||
|
||||
function listMigrationFiles(migrationsDir: string): Result<string[]> {
|
||||
try {
|
||||
const files = readdirSync(migrationsDir)
|
||||
.filter((f) => f.endsWith(".sql"))
|
||||
.sort();
|
||||
return ok(files);
|
||||
} catch (e) {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
return err(new Error(`Failed to read migrations directory "${migrationsDir}": ${msg}`));
|
||||
}
|
||||
}
|
||||
|
||||
function applyMigrationFile(sqlite: DatabaseSync, file: string, filePath: string): Result<void> {
|
||||
let sql: string;
|
||||
try {
|
||||
sql = readFileSync(filePath, "utf8");
|
||||
} catch (e) {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
return err(new Error(`Failed to read migration file "${filePath}": ${msg}`));
|
||||
}
|
||||
|
||||
const insertJournal = sqlite.prepare("INSERT INTO _migrations (name, applied_at) VALUES (?, ?)");
|
||||
sqlite.exec("BEGIN IMMEDIATE");
|
||||
try {
|
||||
sqlite.exec(sql);
|
||||
insertJournal.run(file, Date.now());
|
||||
sqlite.exec("COMMIT");
|
||||
return ok(undefined);
|
||||
} catch (e) {
|
||||
try {
|
||||
sqlite.exec("ROLLBACK");
|
||||
} catch {
|
||||
// ignore secondary errors during rollback
|
||||
}
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
return err(new Error(`Migration "${file}" failed: ${msg}`));
|
||||
}
|
||||
export function writeState(statePath: string, state: unknown): void {
|
||||
mkdirSync(dirname(statePath), { recursive: true });
|
||||
writeFileSync(statePath, JSON.stringify(state, null, 2));
|
||||
}
|
||||
|
||||
/**
|
||||
* Run all *.sql migration files in the given directory against a
|
||||
* `node:sqlite` DatabaseSync, in lexicographic order.
|
||||
* Tracks applied migrations in _migrations table to avoid re-running.
|
||||
*/
|
||||
export function runMigrations(sqlite: DatabaseSync, migrationsDir: string): Result<void> {
|
||||
const tableResult = ensureMigrationsTable(sqlite);
|
||||
if (!tableResult.ok) return tableResult;
|
||||
|
||||
const filesResult = listMigrationFiles(migrationsDir);
|
||||
if (!filesResult.ok) return filesResult;
|
||||
|
||||
const migrationRows = sqlite.prepare("SELECT name FROM _migrations").all();
|
||||
const applied = new Set<string>(
|
||||
migrationRows
|
||||
.filter((r): r is { name: string } => isPlainRecord(r) && typeof r.name === "string")
|
||||
.map((r) => r.name),
|
||||
);
|
||||
|
||||
for (const file of filesResult.value) {
|
||||
if (applied.has(file)) continue;
|
||||
const result = applyMigrationFile(sqlite, file, join(migrationsDir, file));
|
||||
if (!result.ok) return result;
|
||||
}
|
||||
|
||||
return ok(undefined);
|
||||
}
|
||||
|
||||
/** Run `_signals` row prune after this many inserts (amortize DELETE cost). */
|
||||
const SIGNAL_INSERTS_PER_PRUNE = 100;
|
||||
|
||||
/**
|
||||
* Open (or create) the SQLite file at `dbPath`, run all migrations in
|
||||
* `migrationsDir`, and wrap with Drizzle ORM.
|
||||
*/
|
||||
export function openSenseDb(
|
||||
dbPath: string,
|
||||
migrationsDir: string,
|
||||
retention: number = DEFAULT_SENSE_SIGNAL_RETENTION,
|
||||
): Result<{ sqlite: DatabaseSync; db: DrizzleDB; persistSignal: (payload: unknown) => void }> {
|
||||
let sqlite: DatabaseSync;
|
||||
|
||||
try {
|
||||
mkdirSync(dirname(dbPath), { recursive: true });
|
||||
sqlite = new DatabaseSync(dbPath);
|
||||
sqlite.exec("PRAGMA journal_mode=WAL");
|
||||
} catch (e) {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
return err(new Error(`Failed to open database "${dbPath}": ${msg}`));
|
||||
}
|
||||
|
||||
const migResult = runMigrations(sqlite, migrationsDir);
|
||||
if (!migResult.ok) return migResult;
|
||||
|
||||
// Auto-create _signals table for signal persistence (all senses get this)
|
||||
sqlite.exec(
|
||||
`CREATE TABLE IF NOT EXISTS _signals (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
payload TEXT NOT NULL,
|
||||
timestamp INTEGER NOT NULL
|
||||
)`,
|
||||
);
|
||||
|
||||
const insertStmt = sqlite.prepare("INSERT INTO _signals (payload, timestamp) VALUES (?, ?)");
|
||||
const pruneStmt = sqlite.prepare(
|
||||
"DELETE FROM _signals WHERE id <= (SELECT id FROM _signals ORDER BY id DESC LIMIT 1 OFFSET ?)",
|
||||
);
|
||||
|
||||
let insertsSincePrune = 0;
|
||||
|
||||
function persistSignal(payload: unknown): void {
|
||||
const json = JSON.stringify(payload);
|
||||
insertStmt.run(json, Date.now());
|
||||
insertsSincePrune += 1;
|
||||
if (insertsSincePrune >= SIGNAL_INSERTS_PER_PRUNE) {
|
||||
insertsSincePrune = 0;
|
||||
pruneStmt.run(retention);
|
||||
}
|
||||
}
|
||||
|
||||
// Drizzle infers a schema-specific DB type; senses are schema-agnostic at this layer.
|
||||
const db = drizzle({ client: sqlite }) as DrizzleDB;
|
||||
return ok({ sqlite, db, persistSignal });
|
||||
}
|
||||
|
||||
/**
|
||||
* Dynamically import the compute function and table from a sense's index.ts/js.
|
||||
* The module must export a named `compute` function and a named `table` (SQLiteTable).
|
||||
* Dynamically import `compute` and `initialState` from a sense's index.ts/js.
|
||||
* The module must export named `compute` and `initialState`.
|
||||
*/
|
||||
export async function loadSenseModule(
|
||||
senseIndexPath: string,
|
||||
): Promise<Result<{ compute: SenseComputeFn; table: SQLiteTable }>> {
|
||||
): Promise<Result<{ compute: SenseComputeFn; initialState: unknown }>> {
|
||||
let mod: unknown;
|
||||
|
||||
try {
|
||||
@@ -183,33 +49,24 @@ export async function loadSenseModule(
|
||||
);
|
||||
}
|
||||
|
||||
if (!("table" in mod) || mod.table === null || typeof mod.table !== "object") {
|
||||
return err(
|
||||
new Error(
|
||||
`Sense module "${senseIndexPath}" must export a named "table" (drizzle SQLiteTable)`,
|
||||
),
|
||||
);
|
||||
if (!("initialState" in mod)) {
|
||||
return err(new Error(`Sense module "${senseIndexPath}" must export a named "initialState"`));
|
||||
}
|
||||
|
||||
return ok({
|
||||
compute: mod.compute as SenseComputeFn,
|
||||
table: mod.table as SQLiteTable,
|
||||
initialState: mod.initialState,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a sense's compute function with an optional soft timeout.
|
||||
* If timeoutMs is provided and compute takes longer, the AbortSignal is
|
||||
* triggered and an error Result is returned.
|
||||
* When compute returns non-null, `result.signal` is persisted to the sense's
|
||||
* table via `db.insert(table).values(result.signal)` and `persistSignal` is
|
||||
* called with `result.signal`. Returns the full `ComputeResult` so callers
|
||||
* can inspect the `workflow` field.
|
||||
* Execute a sense's compute with current runtime state and an optional soft timeout.
|
||||
* On success, persists `result.state` to `runtime.statePath` and updates `runtime.state`.
|
||||
*/
|
||||
export async function executeCompute(
|
||||
runtime: SenseRuntime,
|
||||
timeoutMs?: number,
|
||||
): Promise<Result<ComputeResult<unknown>>> {
|
||||
): Promise<Result<{ state: unknown; workflow: WorkflowTrigger | null }>> {
|
||||
const controller = new AbortController();
|
||||
|
||||
let timer: ReturnType<typeof setTimeout> | undefined;
|
||||
@@ -224,16 +81,13 @@ export async function executeCompute(
|
||||
: null;
|
||||
|
||||
try {
|
||||
const computePromise = runtime.compute();
|
||||
const computePromise = runtime.compute(runtime.state);
|
||||
const result = timeoutPromise
|
||||
? await Promise.race([computePromise, timeoutPromise])
|
||||
: await computePromise;
|
||||
|
||||
if (result !== null) {
|
||||
// Cast required: DrizzleDB is schema-agnostic; the sense module guarantees shape compatibility.
|
||||
await runtime.db.insert(runtime.table).values(result.signal as Record<string, unknown>);
|
||||
runtime.persistSignal(result.signal);
|
||||
}
|
||||
runtime.state = result.state;
|
||||
writeState(runtime.statePath, result.state);
|
||||
|
||||
return ok(result);
|
||||
} catch (e) {
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*
|
||||
* Supports:
|
||||
* - interval: periodic setInterval-based triggering
|
||||
* - on[]: event-driven triggering via the signal bus
|
||||
* - on[]: event-driven triggering when watched senses complete a compute cycle
|
||||
* - throttle: skip triggers that arrive too soon after the last compute
|
||||
* - merge/coalesce: if compute is in-flight, record one pending trigger;
|
||||
* run it once after the current compute completes (no unbounded queue)
|
||||
@@ -11,7 +11,6 @@
|
||||
|
||||
import type { NerveConfig } from "@uncaged/nerve-core";
|
||||
import type { LogStore } from "@uncaged/nerve-store";
|
||||
import type { SignalBus, Unsubscribe } from "./signal-bus.js";
|
||||
|
||||
/** Sends a compute message to the worker responsible for the given sense. */
|
||||
export type TriggerFn = (senseName: string) => void;
|
||||
@@ -28,6 +27,8 @@ type SenseState = {
|
||||
export type SenseScheduler = {
|
||||
/** Notify scheduler that a compute cycle finished. Drains the pending flag. */
|
||||
onComputeComplete: (senseName: string) => void;
|
||||
/** Notify scheduler that a sense completed so `on[]` subscribers may run. */
|
||||
onSenseCompleted: (senseName: string) => void;
|
||||
stop: () => void;
|
||||
};
|
||||
|
||||
@@ -43,21 +44,29 @@ export type SenseSchedulerOptions = {
|
||||
* Create and start a sense scheduler.
|
||||
*
|
||||
* @param config Full NerveConfig (senses for schedule + throttle).
|
||||
* @param bus SignalBus to subscribe for event-driven triggers.
|
||||
* @param triggerFn Called with the sense name when a compute should be dispatched.
|
||||
* @param opts Optional: logStore for structured logging.
|
||||
* @returns SenseScheduler with stop() and onComputeComplete() methods.
|
||||
* @returns SenseScheduler with stop(), onComputeComplete(), and onSenseCompleted().
|
||||
*/
|
||||
export function createSenseScheduler(
|
||||
config: NerveConfig,
|
||||
bus: SignalBus,
|
||||
triggerFn: TriggerFn,
|
||||
opts?: SenseSchedulerOptions,
|
||||
): SenseScheduler {
|
||||
const logStore = opts?.logStore;
|
||||
const intervals: ReturnType<typeof setInterval>[] = [];
|
||||
const unsubscribers: Unsubscribe[] = [];
|
||||
const states = new Map<string, SenseState>();
|
||||
let stopped = false;
|
||||
|
||||
/** sense name → senses that list it in `on[]` */
|
||||
const onSubscribers = new Map<string, string[]>();
|
||||
for (const [senseName, sense] of Object.entries(config.senses)) {
|
||||
for (const watched of sense.on) {
|
||||
const list = onSubscribers.get(watched) ?? [];
|
||||
list.push(senseName);
|
||||
onSubscribers.set(watched, list);
|
||||
}
|
||||
}
|
||||
|
||||
function getState(senseName: string): SenseState {
|
||||
let state = states.get(senseName);
|
||||
@@ -89,6 +98,7 @@ export function createSenseScheduler(
|
||||
* If within throttle window, schedules a single deferred trigger at window end (fix #3).
|
||||
*/
|
||||
function maybeTrigger(senseName: string): void {
|
||||
if (stopped) return;
|
||||
const senseConfig = config.senses[senseName];
|
||||
const throttleMs = senseConfig?.throttle ?? null;
|
||||
const state = getState(senseName);
|
||||
@@ -118,10 +128,11 @@ export function createSenseScheduler(
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by the kernel when a compute cycle completes (signal or error received).
|
||||
* Called by the kernel when a compute cycle completes (compute-result or error received).
|
||||
* Drains the pending flag if set.
|
||||
*/
|
||||
function onComputeComplete(senseName: string): void {
|
||||
if (stopped) return;
|
||||
const state = states.get(senseName);
|
||||
if (state === undefined) return;
|
||||
|
||||
@@ -152,8 +163,17 @@ export function createSenseScheduler(
|
||||
dispatchCompute(senseName);
|
||||
}
|
||||
|
||||
function onSenseCompleted(senseName: string): void {
|
||||
if (stopped) return;
|
||||
const subscribers = onSubscribers.get(senseName);
|
||||
if (subscribers === undefined) return;
|
||||
for (const sub of subscribers) {
|
||||
maybeTrigger(sub);
|
||||
}
|
||||
}
|
||||
|
||||
for (const [senseName, sense] of Object.entries(config.senses)) {
|
||||
const { interval, on } = sense;
|
||||
const { interval } = sense;
|
||||
|
||||
if (interval !== null) {
|
||||
const id = setInterval(() => {
|
||||
@@ -161,25 +181,13 @@ export function createSenseScheduler(
|
||||
}, interval);
|
||||
intervals.push(id);
|
||||
}
|
||||
|
||||
if (on.length > 0) {
|
||||
const watchedSenses = new Set(on);
|
||||
const unsub = bus.subscribe((signal) => {
|
||||
if (watchedSenses.has(signal.senseId)) {
|
||||
maybeTrigger(senseName);
|
||||
}
|
||||
});
|
||||
unsubscribers.push(unsub);
|
||||
}
|
||||
}
|
||||
|
||||
function stop(): void {
|
||||
stopped = true;
|
||||
for (const id of intervals) {
|
||||
clearInterval(id);
|
||||
}
|
||||
for (const unsub of unsubscribers) {
|
||||
unsub();
|
||||
}
|
||||
for (const state of states.values()) {
|
||||
if (state.deferredTimer !== null) {
|
||||
clearTimeout(state.deferredTimer);
|
||||
@@ -187,8 +195,7 @@ export function createSenseScheduler(
|
||||
}
|
||||
}
|
||||
intervals.length = 0;
|
||||
unsubscribers.length = 0;
|
||||
}
|
||||
|
||||
return { onComputeComplete, stop };
|
||||
return { onComputeComplete, onSenseCompleted, stop };
|
||||
}
|
||||
|
||||
@@ -8,8 +8,7 @@
|
||||
*
|
||||
* Layout assumptions (nerve user config at `~/.uncaged-nerve/`):
|
||||
* dist/senses/<name>/index.js ← bundled compute (esbuild)
|
||||
* senses/<name>/migrations/ ← SQL migration files
|
||||
* data/senses/<name>.db ← SQLite data file
|
||||
* data/senses/<name>.json ← persisted sense state
|
||||
* nerve.yaml ← config
|
||||
*/
|
||||
|
||||
@@ -19,11 +18,11 @@ import { readFileSync } from "node:fs";
|
||||
import { join, resolve } from "node:path";
|
||||
|
||||
import { parseNerveConfig } from "@uncaged/nerve-core";
|
||||
import type { NerveConfig } from "@uncaged/nerve-core";
|
||||
import type { NerveConfig, WorkflowTrigger } from "@uncaged/nerve-core";
|
||||
|
||||
import type { WorkerToParentMessage } from "./ipc.js";
|
||||
import { parseParentMessage } from "./ipc.js";
|
||||
import { executeCompute, loadSenseModule, openSenseDb } from "./sense-runtime.js";
|
||||
import { executeCompute, loadSenseModule, readState } from "./sense-runtime.js";
|
||||
import type { SenseRuntime } from "./sense-runtime.js";
|
||||
import { ignoreSessionBroadcastSignals } from "./worker-signals.js";
|
||||
|
||||
@@ -41,8 +40,11 @@ function sendReady(): void {
|
||||
send({ type: "ready" });
|
||||
}
|
||||
|
||||
function sendSignal(sense: string, payload: unknown): void {
|
||||
send({ type: "signal", sense, payload });
|
||||
function sendComputeResult(
|
||||
sense: string,
|
||||
value: { state: unknown; workflow: WorkflowTrigger | null },
|
||||
): void {
|
||||
send({ type: "compute-result", sense, state: value.state, workflow: value.workflow });
|
||||
}
|
||||
|
||||
function sendError(sense: string, error: string): void {
|
||||
@@ -72,31 +74,23 @@ function readConfig(nerveRoot: string): NerveConfig {
|
||||
return configResult.value;
|
||||
}
|
||||
|
||||
async function initSense(
|
||||
nerveRoot: string,
|
||||
senseName: string,
|
||||
retention: number,
|
||||
): Promise<SenseRuntime> {
|
||||
const dbPath = join(nerveRoot, "data", "senses", `${senseName}.db`);
|
||||
const migrationsDir = join(nerveRoot, "senses", senseName, "migrations");
|
||||
async function initSense(nerveRoot: string, senseName: string): Promise<SenseRuntime> {
|
||||
const statePath = join(nerveRoot, "data", "senses", `${senseName}.json`);
|
||||
const senseIndexPath = resolve(join(nerveRoot, "dist", "senses", senseName, "index.js"));
|
||||
|
||||
const dbResult = openSenseDb(dbPath, migrationsDir, retention);
|
||||
if (!dbResult.ok) {
|
||||
throw new Error(`Failed to init DB for "${senseName}": ${dbResult.error.message}`);
|
||||
}
|
||||
|
||||
const moduleResult = await loadSenseModule(senseIndexPath);
|
||||
if (!moduleResult.ok) {
|
||||
throw new Error(`Failed to load module for "${senseName}": ${moduleResult.error.message}`);
|
||||
}
|
||||
|
||||
const { compute, initialState } = moduleResult.value;
|
||||
const state = readState(statePath, initialState);
|
||||
|
||||
return {
|
||||
name: senseName,
|
||||
db: dbResult.value.db,
|
||||
compute: moduleResult.value.compute,
|
||||
table: moduleResult.value.table,
|
||||
persistSignal: dbResult.value.persistSignal,
|
||||
compute,
|
||||
state,
|
||||
statePath,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -149,10 +143,7 @@ async function runCompute(
|
||||
return;
|
||||
}
|
||||
clearGracePeriodTimer(senseName);
|
||||
if (result.value != null) {
|
||||
// Single IPC message: kernel uses routeSenseComputeOutput(payload) for signal + optional workflow.
|
||||
sendSignal(senseName, result.value);
|
||||
}
|
||||
sendComputeResult(senseName, result.value);
|
||||
} catch (e: unknown) {
|
||||
const errMsg = e instanceof Error ? e.message : String(e);
|
||||
sendError(senseName, errMsg);
|
||||
@@ -236,8 +227,7 @@ async function bootstrap(nerveRoot: string, group: string): Promise<void> {
|
||||
|
||||
for (const senseName of groupSenses) {
|
||||
try {
|
||||
const retention = config.senses[senseName].retention;
|
||||
const runtime = await initSense(nerveRoot, senseName, retention);
|
||||
const runtime = await initSense(nerveRoot, senseName);
|
||||
runtimes.set(senseName, runtime);
|
||||
} catch (e: unknown) {
|
||||
const eMsg = e instanceof Error ? e.message : String(e);
|
||||
|
||||
@@ -1,51 +0,0 @@
|
||||
/**
|
||||
* In-memory signal bus for routing signals between sense workers and sense-scheduler subscribers.
|
||||
* Synchronous dispatch — no persistence, no async queuing.
|
||||
*
|
||||
* If a handler throws, the error is logged and remaining handlers still run.
|
||||
* The first error is re-thrown after all handlers complete so callers can observe it.
|
||||
*/
|
||||
|
||||
import type { Signal } from "@uncaged/nerve-core";
|
||||
|
||||
export type SignalHandler = (signal: Signal) => void;
|
||||
export type Unsubscribe = () => void;
|
||||
|
||||
export type SignalBus = {
|
||||
emit: (signal: Signal) => void;
|
||||
subscribe: (handler: SignalHandler) => Unsubscribe;
|
||||
};
|
||||
|
||||
export function createSignalBus(): SignalBus {
|
||||
const handlers = new Set<SignalHandler>();
|
||||
|
||||
function emit(signal: Signal): void {
|
||||
let firstError: unknown = null;
|
||||
let hasError = false;
|
||||
|
||||
for (const handler of handlers) {
|
||||
try {
|
||||
handler(signal);
|
||||
} catch (e) {
|
||||
console.error("[signal-bus] handler error:", e);
|
||||
if (!hasError) {
|
||||
firstError = e;
|
||||
hasError = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (hasError) {
|
||||
throw firstError;
|
||||
}
|
||||
}
|
||||
|
||||
function subscribe(handler: SignalHandler): Unsubscribe {
|
||||
handlers.add(handler);
|
||||
return () => {
|
||||
handlers.delete(handler);
|
||||
};
|
||||
}
|
||||
|
||||
return { emit, subscribe };
|
||||
}
|
||||
Generated
-3
@@ -111,9 +111,6 @@ importers:
|
||||
'@uncaged/nerve-store':
|
||||
specifier: workspace:*
|
||||
version: link:../store
|
||||
drizzle-orm:
|
||||
specifier: 1.0.0-beta.23-c10d10c
|
||||
version: 1.0.0-beta.23-c10d10c(@cloudflare/workers-types@4.20260425.1)(@types/better-sqlite3@7.6.13)(@types/mssql@9.1.11(@azure/core-client@1.10.1))(better-sqlite3@11.10.0)(mssql@11.0.1(@azure/core-client@1.10.1))(sql.js@1.14.1)(zod@4.3.6)
|
||||
yaml:
|
||||
specifier: ^2.8.3
|
||||
version: 2.8.3
|
||||
|
||||
Reference in New Issue
Block a user