diff --git a/packages/daemon/package.json b/packages/daemon/package.json index bce6d47..e205d1d 100644 --- a/packages/daemon/package.json +++ b/packages/daemon/package.json @@ -28,7 +28,6 @@ "dependencies": { "@uncaged/nerve-core": "workspace:*", "@uncaged/nerve-store": "workspace:*", - "drizzle-orm": "1.0.0-beta.23-c10d10c", "yaml": "^2.8.3" }, "devDependencies": { diff --git a/packages/daemon/src/__tests__/daemon-ipc.test.ts b/packages/daemon/src/__tests__/daemon-ipc.test.ts index 5335ee9..c99cf99 100644 --- a/packages/daemon/src/__tests__/daemon-ipc.test.ts +++ b/packages/daemon/src/__tests__/daemon-ipc.test.ts @@ -241,7 +241,6 @@ describe("daemon-ipc — list-senses", () => { throttle: 5000, timeout: 3000, triggers: ["every 30s"], - lastSignalTimestamp: 1000, }, { name: "disk-usage", @@ -249,7 +248,6 @@ describe("daemon-ipc — list-senses", () => { throttle: 30000, timeout: null, triggers: [], - lastSignalTimestamp: null, }, ]; const listSenses = vi.fn(() => sensesData); diff --git a/packages/daemon/src/__tests__/fixtures/crash-once-worker.mjs b/packages/daemon/src/__tests__/fixtures/crash-once-worker.mjs index 73911fe..4324163 100644 --- a/packages/daemon/src/__tests__/fixtures/crash-once-worker.mjs +++ b/packages/daemon/src/__tests__/fixtures/crash-once-worker.mjs @@ -31,6 +31,11 @@ process.on("message", (msg) => { writeFileSync(markerFile, "crashed", "utf8"); process.exit(1); } - process.send({ type: "signal", sense: msg.sense, payload: 42 }); + process.send({ + type: "compute-result", + sense: msg.sense, + state: 42, + workflow: null, + }); } }); diff --git a/packages/daemon/src/__tests__/fixtures/mock-worker.mjs b/packages/daemon/src/__tests__/fixtures/mock-worker.mjs index 27265a1..da7b3e3 100644 --- a/packages/daemon/src/__tests__/fixtures/mock-worker.mjs +++ b/packages/daemon/src/__tests__/fixtures/mock-worker.mjs @@ -9,7 +9,7 @@ * * Behaviour: * - Sends { type: "ready" } on startup - * - On { type: "compute", sense } → sends back { type: "signal", sense, payload: 42 } + * - On { type: "compute", sense } → sends back compute-result with state + workflow:null * - On { type: "shutdown" } → exits cleanly with code 0 */ @@ -23,6 +23,11 @@ process.on("message", (msg) => { } if (msg.type === "compute" && typeof msg.sense === "string") { - process.send({ type: "signal", sense: msg.sense, payload: 42 }); + process.send({ + type: "compute-result", + sense: msg.sense, + state: 42, + workflow: null, + }); } }); diff --git a/packages/daemon/src/__tests__/fixtures/slow-worker.mjs b/packages/daemon/src/__tests__/fixtures/slow-worker.mjs index 8dad2c2..e974bd4 100644 --- a/packages/daemon/src/__tests__/fixtures/slow-worker.mjs +++ b/packages/daemon/src/__tests__/fixtures/slow-worker.mjs @@ -18,7 +18,12 @@ process.on("message", (msg) => { if (msg.type === "compute" && typeof msg.sense === "string") { // Intentionally slow — will be killed by grace period setTimeout(() => { - process.send({ type: "signal", sense: msg.sense, payload: "late" }); + process.send({ + type: "compute-result", + sense: msg.sense, + state: "late", + workflow: null, + }); }, 10_000); } }); diff --git a/packages/daemon/src/__tests__/kernel-integration.test.ts b/packages/daemon/src/__tests__/kernel-integration.test.ts index 10906e4..ff65455 100644 --- a/packages/daemon/src/__tests__/kernel-integration.test.ts +++ b/packages/daemon/src/__tests__/kernel-integration.test.ts @@ -11,7 +11,6 @@ import { tmpdir } from "node:os"; import { dirname, join } from "node:path"; import { fileURLToPath } from "node:url"; -import type { Signal } from "@uncaged/nerve-core"; import type { NerveConfig } from "@uncaged/nerve-core"; import { afterEach, beforeEach, describe, expect, it } from "vitest"; @@ -30,7 +29,6 @@ function makeConfig(overrides: Partial = {}): NerveConfig { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -82,7 +80,6 @@ describe("kernel integration — real child processes", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -91,7 +88,6 @@ describe("kernel integration — real child processes", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -100,7 +96,6 @@ describe("kernel integration — real child processes", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -116,31 +111,32 @@ describe("kernel integration — real child processes", () => { expect(kernel.senseCount).toBe(3); }); - it("workers start and respond to compute messages with signals", async () => { + it("workers start and respond to compute messages with compute-result", async () => { const config = makeConfig(); kernel = createKernel(config, nerveRoot, { workerScript: MOCK_WORKER, }); - // Wait for all workers to be ready (event-based, not fixed delay) await kernel.ready; - // Subscribe to the bus before triggering compute - const received: Signal[] = []; - const unsub = kernel.bus.subscribe((signal) => { - received.push(signal); - }); - - // Trigger a compute for "cpu-usage" through the kernel's triggerCompute kernel.triggerCompute("cpu-usage"); - // Poll until a signal arrives on the bus (event-driven, no fixed delay) - await pollUntil(() => received.length > 0, 10_000); + await pollUntil( + () => + kernel!.logStore.query({ + source: "sense", + type: "compute-complete", + refId: "cpu-usage", + }).length > 0, + 10_000, + ); - expect(received).toHaveLength(1); - expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 }); - - unsub(); + const rows = kernel.logStore.query({ + source: "sense", + type: "compute-complete", + refId: "cpu-usage", + }); + expect(rows).toHaveLength(1); }, 15_000); it("graceful shutdown: stop() resolves after all workers exit", async () => { @@ -151,7 +147,6 @@ describe("kernel integration — real child processes", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -160,7 +155,6 @@ describe("kernel integration — real child processes", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -180,31 +174,32 @@ describe("kernel integration — real child processes", () => { await expect(stopPromise).resolves.toBeUndefined(); }, 10_000); - it("compute round-trip: worker receives compute and sends signal back through bus", async () => { + it("compute round-trip: worker receives compute and kernel logs compute-complete", async () => { const config = makeConfig(); kernel = createKernel(config, nerveRoot, { workerScript: MOCK_WORKER, }); - // Wait for all workers to be ready (event-based, not fixed delay) await kernel.ready; - const received: Signal[] = []; - const unsub = kernel.bus.subscribe((signal) => { - received.push(signal); - }); - - // Trigger compute via the kernel — the kernel sends IPC to the worker, - // the mock worker responds with a signal message, and the kernel routes it to the bus. kernel.triggerCompute("cpu-usage"); - // Poll for the signal on the bus (no fixed delay) - await pollUntil(() => received.length > 0, 10_000); + await pollUntil( + () => + kernel!.logStore.query({ + source: "sense", + type: "compute-complete", + refId: "cpu-usage", + }).length > 0, + 10_000, + ); - expect(received).toHaveLength(1); - expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 }); - - unsub(); + const rows = kernel.logStore.query({ + source: "sense", + type: "compute-complete", + refId: "cpu-usage", + }); + expect(rows).toHaveLength(1); }, 15_000); it("crash recovery: kernel respawns worker after unexpected exit and new worker is functional", async () => { @@ -234,23 +229,24 @@ describe("kernel integration — real child processes", () => { expect(newPid).not.toBeNull(); expect(newPid).not.toBe(originalPid); - // Wait a bit for the new worker to send its "ready" message and be fully up. - // Poll until the new worker responds to a compute message on the bus. - const postRespawnSignals: Signal[] = []; - const unsub = kernel.bus.subscribe((signal) => { - postRespawnSignals.push(signal); - }); - - // Trigger compute through the kernel to the new worker kernel.triggerCompute("cpu-usage"); - // Poll for the signal — verifies the new worker is fully functional - await pollUntil(() => postRespawnSignals.length > 0, 15_000); + await pollUntil( + () => + kernel!.logStore.query({ + source: "sense", + type: "compute-complete", + refId: "cpu-usage", + }).length > 0, + 15_000, + ); - expect(postRespawnSignals).toHaveLength(1); - expect(postRespawnSignals[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 }); - - unsub(); + const rows = kernel.logStore.query({ + source: "sense", + type: "compute-complete", + refId: "cpu-usage", + }); + expect(rows.length).toBeGreaterThanOrEqual(1); // Kernel should still stop gracefully after respawn await kernel.stop(); diff --git a/packages/daemon/src/__tests__/kernel-phase6.test.ts b/packages/daemon/src/__tests__/kernel-phase6.test.ts index cdc66ff..4958a18 100644 --- a/packages/daemon/src/__tests__/kernel-phase6.test.ts +++ b/packages/daemon/src/__tests__/kernel-phase6.test.ts @@ -86,7 +86,6 @@ function makeConfig(overrides: Partial = {}): NerveConfig { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -125,7 +124,6 @@ describe("kernel — getHealth", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -134,7 +132,6 @@ describe("kernel — getHealth", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -143,7 +140,6 @@ describe("kernel — getHealth", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -246,7 +242,6 @@ describe("kernel — reloadConfig", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -255,7 +250,6 @@ describe("kernel — reloadConfig", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -283,7 +277,6 @@ describe("kernel — reloadConfig", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -292,7 +285,6 @@ describe("kernel — reloadConfig", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -318,7 +310,6 @@ describe("kernel — reloadConfig", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -352,7 +343,6 @@ describe("kernel — reloadConfig", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -361,7 +351,6 @@ describe("kernel — reloadConfig", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, diff --git a/packages/daemon/src/__tests__/kernel-trigger-sense.test.ts b/packages/daemon/src/__tests__/kernel-trigger-sense.test.ts index 9688a7a..e742875 100644 --- a/packages/daemon/src/__tests__/kernel-trigger-sense.test.ts +++ b/packages/daemon/src/__tests__/kernel-trigger-sense.test.ts @@ -101,7 +101,6 @@ function makeConfig(overrides: Partial = {}): NerveConfig { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -153,7 +152,6 @@ describe("kernel.triggerSense()", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -162,7 +160,6 @@ describe("kernel.triggerSense()", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -199,7 +196,6 @@ describe("kernel.triggerSense()", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -208,7 +204,6 @@ describe("kernel.triggerSense()", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, diff --git a/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts b/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts index d9cc1bb..dd48647 100644 --- a/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts +++ b/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts @@ -1,8 +1,8 @@ /** * Integration tests for Kernel + WorkflowManager integration. * - * Verifies that sense signals trigger workflow runs when Sense compute routes - * to workflows; that workflow events are logged; that reloadConfig handles workflow changes; + * Verifies that sense compute-result IPC triggers workflow runs when `workflow` + * is non-null; that workflow events are logged; that reloadConfig handles workflow changes; * and that graceful shutdown stops workflow workers. * * Uses mocked child_process.fork to avoid real subprocesses. @@ -53,7 +53,12 @@ function makeMockChild(pid = 1): MockChild { // Sense IPC: reply to compute so scheduler completes (onComputeComplete). if (m.type === "compute" && typeof m.sense === "string") { setImmediate(() => { - child.emit("message", { type: "signal", sense: m.sense, payload: 42 }); + child.emit("message", { + type: "compute-result", + sense: m.sense, + state: 42, + workflow: null, + }); }); } }); @@ -117,7 +122,6 @@ function makeConfig(overrides: Partial = {}): NerveConfig { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -159,7 +163,6 @@ describe("kernel + workflowManager integration", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -174,22 +177,17 @@ describe("kernel + workflowManager integration", () => { await flushSenseWorkerForkMicrotasks(kernel); await vi.runAllTimersAsync(); - // Simulate a sense worker sending a signal with workflow launch payload - // The kernel's handleWorkerMessage processes "signal" type messages - // and uses routeSenseComputeOutput to detect workflow launches const workerPool = mockChildren[0]; if (workerPool) { workerPool.emit("message", { - type: "signal", + type: "compute-result", sense: "cpu-usage", - payload: { - signal: { reason: "test" }, - workflow: { - name: "my-workflow", - maxRounds: 10, - prompt: "run this workflow", - dryRun: false, - }, + state: { reason: "test" }, + workflow: { + name: "my-workflow", + maxRounds: 10, + prompt: "run this workflow", + dryRun: false, }, }); } @@ -221,7 +219,6 @@ describe("kernel + workflowManager integration", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -236,20 +233,17 @@ describe("kernel + workflowManager integration", () => { await flushSenseWorkerForkMicrotasks(kernel); await vi.runAllTimersAsync(); - // Simulate sense worker returning a signal plus workflow launch const workerPool = mockChildren[0]; if (workerPool) { workerPool.emit("message", { - type: "signal", + type: "compute-result", sense: "cpu-usage", - payload: { - signal: { level: "critical" }, - workflow: { - name: "alert-workflow", - maxRounds: 5, - prompt: "handle critical alert", - dryRun: false, - }, + state: { level: "critical" }, + workflow: { + name: "alert-workflow", + maxRounds: 5, + prompt: "handle critical alert", + dryRun: false, }, }); } @@ -280,7 +274,7 @@ describe("kernel + workflowManager integration", () => { await stopPromise; }); - it("logs sense signal before workflow-launch when both are present", async () => { + it("logs compute-complete before workflow-launch when workflow is present", async () => { const logStore = makeLogStore(); const config = makeConfig({ workflows: { "order-wf": { concurrency: 1, overflow: "drop" } }, @@ -296,16 +290,14 @@ describe("kernel + workflowManager integration", () => { const workerPool = mockChildren[0]; if (workerPool) { workerPool.emit("message", { - type: "signal", + type: "compute-result", sense: "cpu-usage", - payload: { - signal: { seq: 1 }, - workflow: { - name: "order-wf", - maxRounds: 2, - prompt: "p", - dryRun: true, - }, + state: { seq: 1 }, + workflow: { + name: "order-wf", + maxRounds: 2, + prompt: "p", + dryRun: true, }, }); } @@ -316,17 +308,17 @@ describe("kernel + workflowManager integration", () => { .map((c) => c[0] as { source: string; type: string; refId: string | null }) .filter((e) => e.source === "sense" && e.refId === "cpu-usage"); const typeOrder = senseEntries.map((e) => e.type); - const sigAt = typeOrder.indexOf("signal"); + const completeAt = typeOrder.indexOf("compute-complete"); const launchAt = typeOrder.indexOf("workflow-launch"); - expect(sigAt).toBeGreaterThanOrEqual(0); - expect(launchAt).toBeGreaterThan(sigAt); + expect(completeAt).toBeGreaterThanOrEqual(0); + expect(launchAt).toBeGreaterThan(completeAt); const stopPromise = kernel.stop(); await vi.runAllTimersAsync(); await stopPromise; }); - it("does not trigger workflow when signal senseId is not in 'on' list", async () => { + it("does not trigger workflow when compute-result has workflow null", async () => { const logStore = makeLogStore(); const config = makeConfig({ senses: { @@ -335,7 +327,6 @@ describe("kernel + workflowManager integration", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -344,7 +335,6 @@ describe("kernel + workflowManager integration", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -359,13 +349,13 @@ describe("kernel + workflowManager integration", () => { await flushSenseWorkerForkMicrotasks(kernel); await vi.runAllTimersAsync(); - // Emit a regular signal (shorthand payload) — should NOT trigger any workflow const workerPool = mockChildren[0]; if (workerPool) { workerPool.emit("message", { - type: "signal", + type: "compute-result", sense: "cpu-usage", - payload: 50, + state: 50, + workflow: null, }); } @@ -396,7 +386,6 @@ describe("kernel + workflowManager integration", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -411,20 +400,17 @@ describe("kernel + workflowManager integration", () => { await flushSenseWorkerForkMicrotasks(kernel); await vi.runAllTimersAsync(); - // Simulate sense compute returning a signal plus workflow launch const workerPool = mockChildren[0]; if (workerPool) { workerPool.emit("message", { - type: "signal", + type: "compute-result", sense: "cpu-usage", - payload: { - signal: { note: "log" }, - workflow: { - name: "log-test-workflow", - maxRounds: 10, - prompt: "test prompt", - dryRun: false, - }, + state: { note: "log" }, + workflow: { + name: "log-test-workflow", + maxRounds: 10, + prompt: "test prompt", + dryRun: false, }, }); } @@ -452,7 +438,6 @@ describe("kernel + workflowManager integration", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -476,7 +461,6 @@ describe("kernel + workflowManager integration", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -488,20 +472,17 @@ describe("kernel + workflowManager integration", () => { }; kernel.reloadConfig(newConfig); - // Simulate sense compute returning a signal plus workflow for the new workflow const workerPool = mockChildren[0]; if (workerPool) { workerPool.emit("message", { - type: "signal", + type: "compute-result", sense: "cpu-usage", - payload: { - signal: { phase: "reload" }, - workflow: { - name: "new-workflow", - maxRounds: 10, - prompt: "reload test", - dryRun: false, - }, + state: { phase: "reload" }, + workflow: { + name: "new-workflow", + maxRounds: 10, + prompt: "reload test", + dryRun: false, }, }); } @@ -534,7 +515,6 @@ describe("kernel + workflowManager integration", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -557,7 +537,6 @@ describe("kernel + workflowManager integration", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -574,20 +553,17 @@ describe("kernel + workflowManager integration", () => { (c.send as ReturnType).mockClear(); } - // Simulate sense compute trying to launch the removed workflow — it should not start const workerPool = mockChildren[0]; if (workerPool) { workerPool.emit("message", { - type: "signal", + type: "compute-result", sense: "cpu-usage", - payload: { - signal: { stale: true }, - workflow: { - name: "old-workflow", - maxRounds: 10, - prompt: "should not work", - dryRun: false, - }, + state: { stale: true }, + workflow: { + name: "old-workflow", + maxRounds: 10, + prompt: "should not work", + dryRun: false, }, }); } @@ -621,7 +597,6 @@ describe("kernel + workflowManager integration", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -636,20 +611,17 @@ describe("kernel + workflowManager integration", () => { await flushSenseWorkerForkMicrotasks(kernel); await vi.runAllTimersAsync(); - // Trigger a workflow via sense compute return value const workerPool = mockChildren[0]; if (workerPool) { workerPool.emit("message", { - type: "signal", + type: "compute-result", sense: "cpu-usage", - payload: { - signal: { shutdownCase: true }, - workflow: { - name: "shutdown-test", - maxRounds: 10, - prompt: "test", - dryRun: false, - }, + state: { shutdownCase: true }, + workflow: { + name: "shutdown-test", + maxRounds: 10, + prompt: "test", + dryRun: false, }, }); } @@ -689,7 +661,6 @@ describe("kernel + workflowManager integration", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, diff --git a/packages/daemon/src/__tests__/kernel.test.ts b/packages/daemon/src/__tests__/kernel.test.ts index 591f795..1e894f5 100644 --- a/packages/daemon/src/__tests__/kernel.test.ts +++ b/packages/daemon/src/__tests__/kernel.test.ts @@ -37,7 +37,12 @@ function makeMockChild(pid = 1): MockChild { } if (m.type === "compute" && typeof m.sense === "string") { setImmediate(() => { - child.emit("message", { type: "signal", sense: m.sense, payload: 42 }); + child.emit("message", { + type: "compute-result", + sense: m.sense, + state: 42, + workflow: null, + }); }); } }); @@ -81,7 +86,6 @@ function makeConfig(overrides: Partial = {}): NerveConfig { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -112,7 +116,7 @@ describe("kernel — message routing", () => { rmSync(nerveRoot, { recursive: true, force: true }); }); - it("routes signal message to bus without throwing", async () => { + it("routes compute-result message without throwing", async () => { const config = makeConfig({ senses: { "cpu-usage": { @@ -120,7 +124,6 @@ describe("kernel — message routing", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -133,14 +136,19 @@ describe("kernel — message routing", () => { const child = mockChildren[0]; expect(() => { - child.emit("message", { type: "signal", sense: "cpu-usage", payload: 42 }); + child.emit("message", { + type: "compute-result", + sense: "cpu-usage", + state: 42, + workflow: null, + }); }).not.toThrow(); await kernel.stop(); await vi.runAllTimersAsync(); }); - it("persists emitted signals as sense/signal log entries", async () => { + it("persists compute-complete log entries for sense IPC", async () => { const tmpDir = mkdtempSync(join(tmpdir(), "nerve-kernel-sig-")); const logStore = createLogStore(join(tmpDir, "logs.db")); try { @@ -151,7 +159,6 @@ describe("kernel — message routing", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -160,10 +167,19 @@ describe("kernel — message routing", () => { const kernel = createKernel(config, tmpDir, { logStore }); await vi.runAllTimersAsync(); const child = mockChildren[0]; - child.emit("message", { type: "signal", sense: "cpu-usage", payload: 123 }); - const rows = logStore.query({ source: "sense", type: "signal", refId: "cpu-usage" }); + child.emit("message", { + type: "compute-result", + sense: "cpu-usage", + state: 123, + workflow: null, + }); + const rows = logStore.query({ + source: "sense", + type: "compute-complete", + refId: "cpu-usage", + }); expect(rows).toHaveLength(1); - expect(rows[0].payload).toBe(JSON.stringify(123)); + expect(rows[0].payload).toBeNull(); await kernel.stop(); await vi.runAllTimersAsync(); } finally { @@ -180,7 +196,6 @@ describe("kernel — message routing", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -209,7 +224,6 @@ describe("kernel — message routing", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -237,7 +251,6 @@ describe("kernel — message routing", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -278,7 +291,6 @@ describe("kernel — groupForSense mapping", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -287,7 +299,6 @@ describe("kernel — groupForSense mapping", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -296,7 +307,6 @@ describe("kernel — groupForSense mapping", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -322,7 +332,6 @@ describe("kernel — groupForSense mapping", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: 500, on: [], }, diff --git a/packages/daemon/src/__tests__/log-store-integration.test.ts b/packages/daemon/src/__tests__/log-store-integration.test.ts index 41c067a..4310a30 100644 --- a/packages/daemon/src/__tests__/log-store-integration.test.ts +++ b/packages/daemon/src/__tests__/log-store-integration.test.ts @@ -3,11 +3,10 @@ import { tmpdir } from "node:os"; import { join } from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import type { NerveConfig, Signal } from "@uncaged/nerve-core"; +import type { NerveConfig } from "@uncaged/nerve-core"; import { createLogStore } from "@uncaged/nerve-store"; import type { LogStore } from "@uncaged/nerve-store"; import { createSenseScheduler } from "../sense-scheduler.js"; -import { createSignalBus } from "../signal-bus.js"; describe("LogStore + SenseScheduler integration", () => { let tmpDir: string; @@ -31,7 +30,6 @@ describe("LogStore + SenseScheduler integration", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: ["cpu-usage"], }, @@ -41,14 +39,12 @@ describe("LogStore + SenseScheduler integration", () => { extract: null, api: { port: null, token: null, host: "127.0.0.1" }, }; - const bus = createSignalBus(); const triggered: string[] = []; - const scheduler = createSenseScheduler(config, bus, (name) => triggered.push(name), { + const scheduler = createSenseScheduler(config, (name) => triggered.push(name), { logStore, }); - const signal: Signal = { id: 1, senseId: "cpu-usage", payload: 42, timestamp: Date.now() }; - bus.emit(signal); + scheduler.onSenseCompleted("cpu-usage"); const logs = logStore.query({ source: "sense_scheduler", type: "run_start" }); expect(logs).toHaveLength(1); @@ -68,7 +64,6 @@ describe("LogStore + SenseScheduler integration", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: 1000, on: [], }, @@ -78,11 +73,9 @@ describe("LogStore + SenseScheduler integration", () => { extract: null, api: { port: null, token: null, host: "127.0.0.1" }, }; - const bus = createSignalBus(); const ref: { scheduler: ReturnType | null } = { scheduler: null }; const scheduler = createSenseScheduler( config, - bus, (name) => { ref.scheduler?.onComputeComplete(name); }, @@ -108,7 +101,6 @@ describe("LogStore + SenseScheduler integration", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: ["cpu-usage"], }, @@ -118,11 +110,9 @@ describe("LogStore + SenseScheduler integration", () => { extract: null, api: { port: null, token: null, host: "127.0.0.1" }, }; - const bus = createSignalBus(); const triggered: string[] = []; const scheduler = createSenseScheduler( config, - bus, (name) => { triggered.push(name); scheduler.onComputeComplete(name); @@ -138,8 +128,6 @@ describe("LogStore + SenseScheduler integration", () => { timestamp: Date.now(), }); - // Writing to the log store should NOT trigger any sense compute. - // Only bus.emit(signal) triggers scheduled senses. expect(triggered).toHaveLength(0); scheduler.stop(); diff --git a/packages/daemon/src/__tests__/phase6-integration.test.ts b/packages/daemon/src/__tests__/phase6-integration.test.ts index 5dd40aa..2cf03f8 100644 --- a/packages/daemon/src/__tests__/phase6-integration.test.ts +++ b/packages/daemon/src/__tests__/phase6-integration.test.ts @@ -7,7 +7,6 @@ import { tmpdir } from "node:os"; import { dirname, join } from "node:path"; import { fileURLToPath } from "node:url"; -import type { Signal } from "@uncaged/nerve-core"; import type { NerveConfig } from "@uncaged/nerve-core"; import { afterEach, beforeEach, describe, expect, it } from "vitest"; @@ -27,7 +26,6 @@ function makeConfig(overrides: Partial = {}): NerveConfig { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -97,13 +95,16 @@ describe("phase6 — restartGroup", () => { expect(newPid).not.toBeNull(); expect(newPid).not.toBe(oldPid); - // Verify new worker is functional - const received: Signal[] = []; - const unsub = kernel.bus.subscribe((s) => received.push(s)); kernel.triggerCompute("cpu-usage"); - await pollUntil(() => received.length > 0, 10_000); - expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 }); - unsub(); + await pollUntil( + () => + kernel!.logStore.query({ + source: "sense", + type: "compute-complete", + refId: "cpu-usage", + }).length > 0, + 10_000, + ); }, 35_000); it("restartGroup on nonexistent group does nothing", async () => { @@ -154,7 +155,6 @@ describe("phase6 — reloadConfig", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -163,7 +163,6 @@ describe("phase6 — reloadConfig", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -191,7 +190,6 @@ describe("phase6 — reloadConfig", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -200,7 +198,6 @@ describe("phase6 — reloadConfig", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -224,7 +221,6 @@ describe("phase6 — reloadConfig", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -269,7 +265,6 @@ describe("phase6 — error isolation", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -278,7 +273,6 @@ describe("phase6 — error isolation", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -294,19 +288,27 @@ describe("phase6 — error isolation", () => { }); await kernel.ready; - // Both senses go through the same worker (mock-worker responds to all compute with signal) - const received: Signal[] = []; - const unsub = kernel.bus.subscribe((s) => received.push(s)); - kernel.triggerCompute("good-sense"); - await pollUntil(() => received.length > 0, 10_000); - expect(received[0]).toMatchObject({ senseId: "good-sense" }); + await pollUntil( + () => + kernel!.logStore.query({ + source: "sense", + type: "compute-complete", + refId: "good-sense", + }).length > 0, + 10_000, + ); kernel.triggerCompute("bad-sense"); - await pollUntil(() => received.length > 1, 10_000); - expect(received[1]).toMatchObject({ senseId: "bad-sense" }); - - unsub(); + await pollUntil( + () => + kernel!.logStore.query({ + source: "sense", + type: "compute-complete", + refId: "bad-sense", + }).length > 0, + 10_000, + ); }, 10_000); it("error worker sends error messages, kernel still running", async () => { @@ -366,7 +368,6 @@ describe("phase6 — getHealth", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -375,7 +376,6 @@ describe("phase6 — getHealth", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -384,7 +384,6 @@ describe("phase6 — getHealth", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -420,7 +419,6 @@ describe("phase6 — getHealth", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -429,7 +427,6 @@ describe("phase6 — getHealth", () => { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -489,13 +486,16 @@ describe("phase6 — auto-respawn on worker crash", () => { expect(newPid).not.toBeNull(); expect(newPid).not.toBe(originalPid); - // Verify new worker responds - const received: Signal[] = []; - const unsub = kernel.bus.subscribe((s) => received.push(s)); kernel.triggerCompute("cpu-usage"); - await pollUntil(() => received.length > 0, 10_000); - expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 }); - unsub(); + await pollUntil( + () => + kernel!.logStore.query({ + source: "sense", + type: "compute-complete", + refId: "cpu-usage", + }).length > 0, + 10_000, + ); await kernel.stop(); kernel = null; diff --git a/packages/daemon/src/__tests__/sense-runtime.test.ts b/packages/daemon/src/__tests__/sense-runtime.test.ts index 29b40fe..a7a7f22 100644 --- a/packages/daemon/src/__tests__/sense-runtime.test.ts +++ b/packages/daemon/src/__tests__/sense-runtime.test.ts @@ -1,281 +1,118 @@ -import { mkdirSync, mkdtempSync, writeFileSync } from "node:fs"; +import { mkdtempSync, readFileSync, rmSync, writeFileSync } from "node:fs"; import { tmpdir } from "node:os"; import { join } from "node:path"; -import { DatabaseSync } from "node:sqlite"; -import { drizzle } from "drizzle-orm/node-sqlite"; -import { integer, real, sqliteTable } from "drizzle-orm/sqlite-core"; +import type { SenseComputeFn } from "@uncaged/nerve-core"; import { describe, expect, it } from "vitest"; import { parseParentMessage } from "../ipc.js"; -import { executeCompute, openSenseDb, runMigrations } from "../sense-runtime.js"; -import type { DrizzleDB, SenseRuntime } from "../sense-runtime.js"; +import { executeCompute, readState, writeState } from "../sense-runtime.js"; +import type { SenseRuntime } from "../sense-runtime.js"; -// --------------------------------------------------------------------------- -// Helpers -// --------------------------------------------------------------------------- - -const INIT_SQL = ` -CREATE TABLE IF NOT EXISTS samples ( - ts INTEGER PRIMARY KEY, - value REAL NOT NULL -); -`; - -function makeTempMigrationsDir(sql: string): string { - const dir = mkdtempSync(join(tmpdir(), "nerve-test-")); - writeFileSync(join(dir, "0001_init.sql"), sql); - return dir; +function makeTempStatePath(): string { + const dir = mkdtempSync(join(tmpdir(), "nerve-state-")); + return join(dir, "sense.json"); } -function makeTempMigrationsDirEmpty(): string { - return mkdtempSync(join(tmpdir(), "nerve-test-empty-")); -} - -function makeTempDbPath(): string { - const dir = mkdtempSync(join(tmpdir(), "nerve-db-")); - return join(dir, "test.db"); -} - -const samples = sqliteTable("samples", { - ts: integer("ts").primaryKey(), - value: real("value").notNull(), -}); - -// --------------------------------------------------------------------------- -// runMigrations -// --------------------------------------------------------------------------- - -describe("runMigrations", () => { - it("creates table via SQL migration file", () => { - const sqlite = new DatabaseSync(":memory:"); - const migrationsDir = makeTempMigrationsDir(INIT_SQL); - const result = runMigrations(sqlite, migrationsDir); - - expect(result.ok).toBe(true); - - const row = sqlite - .prepare("SELECT name FROM sqlite_master WHERE type='table' AND name='samples'") - .get(); - expect(row).toBeDefined(); - - sqlite.close(); +describe("readState / writeState", () => { + it("writeState creates parent dirs and persists JSON", () => { + const base = mkdtempSync(join(tmpdir(), "nerve-write-")); + const path = join(base, "nested", "a.json"); + writeState(path, { n: 1 }); + const raw = readFileSync(path, "utf8"); + expect(JSON.parse(raw)).toEqual({ n: 1 }); + rmSync(base, { recursive: true, force: true }); }); - it("runs multiple migrations in lexicographic order", () => { - const sqlite = new DatabaseSync(":memory:"); - const dir = mkdtempSync(join(tmpdir(), "nerve-multi-")); - - writeFileSync(join(dir, "0001_init.sql"), INIT_SQL); - writeFileSync(join(dir, "0002_add_col.sql"), "ALTER TABLE samples ADD COLUMN label TEXT;"); - - const result = runMigrations(sqlite, dir); - expect(result.ok).toBe(true); - - const info = sqlite.prepare("PRAGMA table_info(samples)").all() as Array<{ name: string }>; - const cols = info.map((r) => r.name); - expect(cols).toContain("label"); - - sqlite.close(); + it("readState returns initialState when file is missing", () => { + const path = join(mkdtempSync(join(tmpdir(), "nerve-missing-")), "none.json"); + expect(readState(path, { x: 0 })).toEqual({ x: 0 }); }); - it("returns ok when migrations directory is empty", () => { - const sqlite = new DatabaseSync(":memory:"); - const dir = makeTempMigrationsDirEmpty(); - const result = runMigrations(sqlite, dir); - expect(result.ok).toBe(true); - sqlite.close(); + it("readState returns parsed JSON when file exists", () => { + const dir = mkdtempSync(join(tmpdir(), "nerve-read-")); + const path = join(dir, "s.json"); + writeFileSync(path, JSON.stringify({ count: 3 }), "utf8"); + expect(readState(path, { count: 0 })).toEqual({ count: 3 }); + rmSync(dir, { recursive: true, force: true }); }); - it("returns err when migrations directory does not exist", () => { - const sqlite = new DatabaseSync(":memory:"); - const result = runMigrations(sqlite, "/nonexistent/path/migrations"); - expect(result.ok).toBe(false); - sqlite.close(); - }); - - it("returns err when a migration SQL is invalid", () => { - const sqlite = new DatabaseSync(":memory:"); - const dir = mkdtempSync(join(tmpdir(), "nerve-bad-sql-")); - writeFileSync(join(dir, "0001_bad.sql"), "NOT VALID SQL !!!;"); - const result = runMigrations(sqlite, dir); - expect(result.ok).toBe(false); - sqlite.close(); + it("readState returns initialState on invalid JSON", () => { + const dir = mkdtempSync(join(tmpdir(), "nerve-badjson-")); + const path = join(dir, "bad.json"); + writeFileSync(path, "not json {{{", "utf8"); + expect(readState(path, { fallback: true })).toEqual({ fallback: true }); + rmSync(dir, { recursive: true, force: true }); }); }); -// --------------------------------------------------------------------------- -// openSenseDb -// --------------------------------------------------------------------------- - -describe("openSenseDb", () => { - it("creates the db file and runs migrations", () => { - const dbPath = makeTempDbPath(); - const migrationsDir = makeTempMigrationsDir(INIT_SQL); - - const result = openSenseDb(dbPath, migrationsDir); - expect(result.ok).toBe(true); - - if (!result.ok) return; - const { sqlite } = result.value; - const row = sqlite - .prepare("SELECT name FROM sqlite_master WHERE type='table' AND name='samples'") - .get(); - expect(row).toBeDefined(); - sqlite.close(); - }); - - it("returns err when migrations dir is missing", () => { - const dbPath = makeTempDbPath(); - const result = openSenseDb(dbPath, "/nonexistent/migrations"); - expect(result.ok).toBe(false); - }); - - it("prunes _signals to retention after every 100 inserts", () => { - const dbPath = makeTempDbPath(); - const migrationsDir = makeTempMigrationsDir(INIT_SQL); - const result = openSenseDb(dbPath, migrationsDir, 5); - expect(result.ok).toBe(true); - if (!result.ok) return; - - const { sqlite, persistSignal } = result.value; - for (let i = 0; i < 100; i++) { - persistSignal({ n: i }); - } - - const count = sqlite.prepare("SELECT COUNT(*) AS c FROM _signals").get() as { c: number }; - expect(count.c).toBe(5); - - sqlite.close(); - }); -}); - -// --------------------------------------------------------------------------- -// executeCompute -// --------------------------------------------------------------------------- - describe("executeCompute", () => { function makeRuntime( - computeFn: SenseRuntime["compute"], - sqlite?: DatabaseSync, - ): { runtime: SenseRuntime; sqlite: DatabaseSync } { - const db_sqlite = sqlite ?? new DatabaseSync(":memory:"); - if (!sqlite) db_sqlite.exec(INIT_SQL); - const db = drizzle({ client: db_sqlite }) as DrizzleDB; + compute: SenseComputeFn<{ n: number }>, + state: { n: number }, + statePath?: string, + ): SenseRuntime { return { - runtime: { - name: "test-sense", - db, - compute: computeFn, - table: samples, - persistSignal: () => {}, - }, - sqlite: db_sqlite, + name: "test-sense", + compute: compute as SenseComputeFn, + state, + statePath: statePath ?? makeTempStatePath(), }; } - it("returns non-null and inserts into table when compute returns data", async () => { - const { runtime, sqlite } = makeRuntime(async () => ({ - signal: { ts: 1000, value: 0.5 }, - workflow: null, - })); + it("passes state into compute and persists returned state", async () => { + const path = makeTempStatePath(); + const runtime = makeRuntime( + async (s) => ({ state: { n: s.n + 1 }, workflow: null }), + { n: 0 }, + path, + ); const result = await executeCompute(runtime); expect(result.ok).toBe(true); if (!result.ok) return; - expect(result.value).toEqual({ signal: { ts: 1000, value: 0.5 }, workflow: null }); - - const rows = sqlite.prepare("SELECT * FROM samples").all(); - expect(rows).toHaveLength(1); - sqlite.close(); - }); - - it("returns null and does not insert when compute returns null", async () => { - const { runtime, sqlite } = makeRuntime(async () => null); - - const result = await executeCompute(runtime); - expect(result.ok).toBe(true); - if (!result.ok) return; - expect(result.value).toBeNull(); - - const rows = sqlite.prepare("SELECT * FROM samples").all(); - expect(rows).toHaveLength(0); - sqlite.close(); + expect(result.value).toEqual({ state: { n: 1 }, workflow: null }); + expect(runtime.state).toEqual({ n: 1 }); + expect(JSON.parse(readFileSync(path, "utf8"))).toEqual({ n: 1 }); }); it("returns err when compute throws", async () => { - const { runtime, sqlite } = makeRuntime(async () => { - throw new Error("something went wrong"); - }); + const runtime = makeRuntime( + async () => { + throw new Error("boom"); + }, + { n: 0 }, + ); const result = await executeCompute(runtime); expect(result.ok).toBe(false); if (result.ok) return; - expect(result.error.message).toContain("something went wrong"); - sqlite.close(); - }); - - it("inserts correctly into the sense db from openSenseDb", async () => { - const dbPath = makeTempDbPath(); - const migrationsDir = makeTempMigrationsDir(INIT_SQL); - const dbResult = openSenseDb(dbPath, migrationsDir); - expect(dbResult.ok).toBe(true); - if (!dbResult.ok) return; - - mkdirSync(join(dbPath, "..", "migrations"), { recursive: true }); - - const { sqlite: dbSqlite, db } = dbResult.value; - const runtime: SenseRuntime = { - name: "cpu-usage", - db, - compute: async () => ({ signal: { ts: 1000, value: 1.23 }, workflow: null }), - table: samples, - persistSignal: () => {}, - }; - - const result = await executeCompute(runtime); - expect(result.ok).toBe(true); - - const rows = dbSqlite.prepare("SELECT * FROM samples").all() as Array<{ - ts: number; - value: number; - }>; - expect(rows).toHaveLength(1); - expect(rows[0].ts).toBe(1000); - expect(rows[0].value).toBe(1.23); - dbSqlite.close(); + expect(result.error.message).toContain("boom"); }); it("returns err when compute exceeds timeoutMs", async () => { - const { runtime, sqlite } = makeRuntime( - () => new Promise((resolve) => setTimeout(() => resolve(null), 5_000)), + const runtime = makeRuntime( + async (s) => + new Promise((resolve) => setTimeout(() => resolve({ state: s, workflow: null }), 5_000)), + { n: 0 }, ); const result = await executeCompute(runtime, 50); expect(result.ok).toBe(false); if (result.ok) return; expect(result.error.message).toMatch(/timed out/i); - sqlite.close(); }); it("completes within timeout when compute is fast", async () => { - const { runtime, sqlite } = makeRuntime(async () => ({ - signal: { ts: 1, value: 42 }, - workflow: null, - })); + const runtime = makeRuntime(async (s) => ({ state: { n: s.n }, workflow: null }), { n: 42 }); const result = await executeCompute(runtime, 5_000); expect(result.ok).toBe(true); if (!result.ok) return; - expect(result.value).toEqual({ signal: { ts: 1, value: 42 }, workflow: null }); - sqlite.close(); + expect(result.value.state).toEqual({ n: 42 }); }); }); -// --------------------------------------------------------------------------- -// parseParentMessage (IPC validation) -// --------------------------------------------------------------------------- - -describe("parseParentMessage", () => { +describe("parseParentMessage (IPC validation)", () => { it("accepts a valid compute message", () => { const result = parseParentMessage({ type: "compute", sense: "cpu" }); expect(result.ok).toBe(true); @@ -310,47 +147,3 @@ describe("parseParentMessage", () => { expect(result.error.message).toMatch(/unknown/i); }); }); - -// --------------------------------------------------------------------------- -// runMigrations – journal (idempotency) -// --------------------------------------------------------------------------- - -describe("runMigrations journal", () => { - it("does not re-run an already-applied migration", () => { - const sqlite = new DatabaseSync(":memory:"); - const dir = mkdtempSync(join(tmpdir(), "nerve-journal-")); - writeFileSync(join(dir, "0001_init.sql"), INIT_SQL); - - const first = runMigrations(sqlite, dir); - expect(first.ok).toBe(true); - - sqlite.exec("INSERT INTO samples (ts, value) VALUES (1, 1.0)"); - - const nonIdempotentSql = "CREATE TABLE samples2 (id INTEGER PRIMARY KEY)"; - writeFileSync(join(dir, "0002_samples2.sql"), nonIdempotentSql); - - const second = runMigrations(sqlite, dir); - expect(second.ok).toBe(true); - - const third = runMigrations(sqlite, dir); - expect(third.ok).toBe(true); - - sqlite.close(); - }); - - it("tracks migrations in _migrations table", () => { - const sqlite = new DatabaseSync(":memory:"); - const dir = mkdtempSync(join(tmpdir(), "nerve-journal2-")); - writeFileSync(join(dir, "0001_init.sql"), INIT_SQL); - - runMigrations(sqlite, dir); - - const rows = sqlite.prepare("SELECT name FROM _migrations ORDER BY name").all() as Array<{ - name: string; - }>; - expect(rows).toHaveLength(1); - expect(rows[0].name).toBe("0001_init.sql"); - - sqlite.close(); - }); -}); diff --git a/packages/daemon/src/__tests__/sense-scheduler-throttle-pending.test.ts b/packages/daemon/src/__tests__/sense-scheduler-throttle-pending.test.ts index 0a02152..35382a8 100644 --- a/packages/daemon/src/__tests__/sense-scheduler-throttle-pending.test.ts +++ b/packages/daemon/src/__tests__/sense-scheduler-throttle-pending.test.ts @@ -1,8 +1,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import type { NerveConfig, Signal } from "@uncaged/nerve-core"; +import type { NerveConfig } from "@uncaged/nerve-core"; import { createSenseScheduler } from "../sense-scheduler.js"; -import { createSignalBus } from "../signal-bus.js"; function makeConfig(overrides: Partial = {}): NerveConfig { return { @@ -12,7 +11,6 @@ function makeConfig(overrides: Partial = {}): NerveConfig { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -25,10 +23,6 @@ function makeConfig(overrides: Partial = {}): NerveConfig { }; } -function makeSignal(senseId: string, payload: unknown = 1): Signal { - return { id: 1, senseId, payload, timestamp: Date.now() }; -} - describe("SenseScheduler — throttle + pending deferred trigger", () => { beforeEach(() => { vi.useFakeTimers(); @@ -47,27 +41,21 @@ describe("SenseScheduler — throttle + pending deferred trigger", () => { throttle: 2000, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: ["cpu-usage"], }, }, }); - const bus = createSignalBus(); - const scheduler = createSenseScheduler(config, bus, (name) => triggered.push(name)); + const scheduler = createSenseScheduler(config, (name) => triggered.push(name)); - // First trigger fires immediately (outside throttle window) - bus.emit(makeSignal("cpu-usage")); + scheduler.onSenseCompleted("cpu-usage"); scheduler.onComputeComplete("cpu-usage"); expect(triggered.length).toBe(1); - // Second trigger arrives 500ms later — still within throttle window (2000ms) vi.advanceTimersByTime(500); - bus.emit(makeSignal("cpu-usage")); - // Should not fire yet (throttled) + scheduler.onSenseCompleted("cpu-usage"); expect(triggered.length).toBe(1); - // Advance past the throttle window end; deferred trigger should now fire vi.advanceTimersByTime(1600); expect(triggered.length).toBe(2); @@ -83,30 +71,25 @@ describe("SenseScheduler — throttle + pending deferred trigger", () => { throttle: 2000, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: ["cpu-usage"], }, }, }); - const bus = createSignalBus(); - const scheduler = createSenseScheduler(config, bus, (name) => triggered.push(name)); + const scheduler = createSenseScheduler(config, (name) => triggered.push(name)); - // First trigger fires - bus.emit(makeSignal("cpu-usage")); + scheduler.onSenseCompleted("cpu-usage"); scheduler.onComputeComplete("cpu-usage"); expect(triggered.length).toBe(1); - // Multiple triggers within throttle window — should not stack vi.advanceTimersByTime(300); - bus.emit(makeSignal("cpu-usage")); + scheduler.onSenseCompleted("cpu-usage"); vi.advanceTimersByTime(300); - bus.emit(makeSignal("cpu-usage")); + scheduler.onSenseCompleted("cpu-usage"); vi.advanceTimersByTime(300); - bus.emit(makeSignal("cpu-usage")); + scheduler.onSenseCompleted("cpu-usage"); expect(triggered.length).toBe(1); - // Advance past window — exactly one deferred trigger fires vi.advanceTimersByTime(1200); scheduler.onComputeComplete("cpu-usage"); expect(triggered.length).toBe(2); @@ -123,28 +106,23 @@ describe("SenseScheduler — throttle + pending deferred trigger", () => { throttle: 2000, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: ["cpu-usage"], }, }, }); - const bus = createSignalBus(); - const scheduler = createSenseScheduler(config, bus, (name) => triggered.push(name)); + const scheduler = createSenseScheduler(config, (name) => triggered.push(name)); - bus.emit(makeSignal("cpu-usage")); + scheduler.onSenseCompleted("cpu-usage"); scheduler.onComputeComplete("cpu-usage"); expect(triggered.length).toBe(1); - // Trigger during throttle window — schedules deferred vi.advanceTimersByTime(500); - bus.emit(makeSignal("cpu-usage")); + scheduler.onSenseCompleted("cpu-usage"); expect(triggered.length).toBe(1); - // Stop before window ends scheduler.stop(); - // Advance past window — deferred timer should have been cleared vi.advanceTimersByTime(2000); expect(triggered.length).toBe(1); }); diff --git a/packages/daemon/src/__tests__/sense-scheduler.test.ts b/packages/daemon/src/__tests__/sense-scheduler.test.ts index 9d9cf35..9041638 100644 --- a/packages/daemon/src/__tests__/sense-scheduler.test.ts +++ b/packages/daemon/src/__tests__/sense-scheduler.test.ts @@ -1,12 +1,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import type { NerveConfig, Signal } from "@uncaged/nerve-core"; +import type { NerveConfig } from "@uncaged/nerve-core"; import { createSenseScheduler } from "../sense-scheduler.js"; -import { createSignalBus } from "../signal-bus.js"; - -// --------------------------------------------------------------------------- -// Helpers -// --------------------------------------------------------------------------- function makeConfig(overrides: Partial = {}): NerveConfig { return { @@ -16,7 +11,6 @@ function makeConfig(overrides: Partial = {}): NerveConfig { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -25,7 +19,6 @@ function makeConfig(overrides: Partial = {}): NerveConfig { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -34,7 +27,6 @@ function makeConfig(overrides: Partial = {}): NerveConfig { throttle: null, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: [], }, @@ -47,14 +39,6 @@ function makeConfig(overrides: Partial = {}): NerveConfig { }; } -function makeSignal(senseId: string, payload: unknown = 1): Signal { - return { id: 1, senseId, payload, timestamp: Date.now() }; -} - -// --------------------------------------------------------------------------- -// Tests -// --------------------------------------------------------------------------- - describe("SenseScheduler — interval schedule", () => { beforeEach(() => { vi.useFakeTimers(); @@ -72,14 +56,11 @@ describe("SenseScheduler — interval schedule", () => { "cpu-usage": { ...base.senses["cpu-usage"], interval: 1000, on: [] }, }, }); - const bus = createSignalBus(); - // Use a ref so the triggerFn can call back into the scheduler const ref: { scheduler: ReturnType | null } = { scheduler: null, }; - const scheduler = createSenseScheduler(config, bus, (name) => { + const scheduler = createSenseScheduler(config, (name) => { triggered.push(name); - // Immediately complete compute so the scheduler is not blocked by in-flight state ref.scheduler?.onComputeComplete(name); }); ref.scheduler = scheduler; @@ -101,11 +82,10 @@ describe("SenseScheduler — interval schedule", () => { "cpu-usage": { ...base.senses["cpu-usage"], interval: 500, on: [] }, }, }); - const bus = createSignalBus(); const ref: { scheduler: ReturnType | null } = { scheduler: null, }; - const scheduler = createSenseScheduler(config, bus, (name) => { + const scheduler = createSenseScheduler(config, (name) => { triggered.push(name); ref.scheduler?.onComputeComplete(name); }); @@ -128,10 +108,8 @@ describe("SenseScheduler — interval schedule", () => { "cpu-usage": { ...base.senses["cpu-usage"], interval: 1000, on: [] }, }, }); - const bus = createSignalBus(); - const scheduler = createSenseScheduler(config, bus, (name) => triggered.push(name)); + const scheduler = createSenseScheduler(config, (name) => triggered.push(name)); - // Only advance 500ms — should be 0 triggers (not catching up) vi.advanceTimersByTime(500); expect(triggered.length).toBe(0); @@ -140,7 +118,7 @@ describe("SenseScheduler — interval schedule", () => { }); describe("SenseScheduler — event (on) schedule", () => { - it("triggers target sense when watched sense emits a signal", () => { + it("triggers target sense when watched sense completes", () => { const triggered: string[] = []; const base = makeConfig(); const config = makeConfig({ @@ -153,12 +131,11 @@ describe("SenseScheduler — event (on) schedule", () => { }, }, }); - const bus = createSignalBus(); - const scheduler = createSenseScheduler(config, bus, (name) => triggered.push(name)); + const scheduler = createSenseScheduler(config, (name) => triggered.push(name)); - bus.emit(makeSignal("cpu-usage")); + scheduler.onSenseCompleted("cpu-usage"); scheduler.onComputeComplete("system-health"); - bus.emit(makeSignal("disk-usage")); + scheduler.onSenseCompleted("disk-usage"); scheduler.onComputeComplete("system-health"); expect(triggered.length).toBe(2); @@ -167,7 +144,7 @@ describe("SenseScheduler — event (on) schedule", () => { scheduler.stop(); }); - it("does not trigger for signals from non-watched senses", () => { + it("does not trigger for completions of non-watched senses", () => { const triggered: string[] = []; const base = makeConfig(); const config = makeConfig({ @@ -176,10 +153,9 @@ describe("SenseScheduler — event (on) schedule", () => { "system-health": { ...base.senses["system-health"], interval: null, on: ["cpu-usage"] }, }, }); - const bus = createSignalBus(); - const scheduler = createSenseScheduler(config, bus, (name) => triggered.push(name)); + const scheduler = createSenseScheduler(config, (name) => triggered.push(name)); - bus.emit(makeSignal("disk-usage")); + scheduler.onSenseCompleted("disk-usage"); expect(triggered.length).toBe(0); @@ -195,11 +171,10 @@ describe("SenseScheduler — event (on) schedule", () => { "system-health": { ...base.senses["system-health"], interval: null, on: ["cpu-usage"] }, }, }); - const bus = createSignalBus(); - const scheduler = createSenseScheduler(config, bus, (name) => triggered.push(name)); + const scheduler = createSenseScheduler(config, (name) => triggered.push(name)); scheduler.stop(); - bus.emit(makeSignal("cpu-usage")); + scheduler.onSenseCompleted("cpu-usage"); expect(triggered.length).toBe(0); }); @@ -222,20 +197,17 @@ describe("SenseScheduler — throttle", () => { throttle: 2000, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: ["cpu-usage"], }, }, }); - const bus = createSignalBus(); - const scheduler = createSenseScheduler(config, bus, (name) => triggered.push(name)); + const scheduler = createSenseScheduler(config, (name) => triggered.push(name)); - bus.emit(makeSignal("cpu-usage")); + scheduler.onSenseCompleted("cpu-usage"); scheduler.onComputeComplete("cpu-usage"); - // Immediately trigger again — within throttle window - bus.emit(makeSignal("cpu-usage")); + scheduler.onSenseCompleted("cpu-usage"); expect(triggered.length).toBe(1); @@ -251,20 +223,18 @@ describe("SenseScheduler — throttle", () => { throttle: 1000, timeout: null, gracePeriod: null, - retention: 10_000, interval: null, on: ["cpu-usage"], }, }, }); - const bus = createSignalBus(); - const scheduler = createSenseScheduler(config, bus, (name) => triggered.push(name)); + const scheduler = createSenseScheduler(config, (name) => triggered.push(name)); - bus.emit(makeSignal("cpu-usage")); + scheduler.onSenseCompleted("cpu-usage"); scheduler.onComputeComplete("cpu-usage"); vi.advanceTimersByTime(1500); - bus.emit(makeSignal("cpu-usage")); + scheduler.onSenseCompleted("cpu-usage"); scheduler.onComputeComplete("cpu-usage"); expect(triggered.length).toBe(2); @@ -283,24 +253,19 @@ describe("SenseScheduler — merge/coalesce", () => { "system-health": { ...base.senses["system-health"], interval: null, on: ["cpu-usage"] }, }, }); - const bus = createSignalBus(); - const scheduler = createSenseScheduler(config, bus, (name) => triggered.push(name)); + const scheduler = createSenseScheduler(config, (name) => triggered.push(name)); - // First trigger starts compute - bus.emit(makeSignal("cpu-usage")); + scheduler.onSenseCompleted("cpu-usage"); expect(triggered.length).toBe(1); - // Three more arrive while first is in-flight — all should coalesce to one pending - bus.emit(makeSignal("cpu-usage")); - bus.emit(makeSignal("cpu-usage")); - bus.emit(makeSignal("cpu-usage")); + scheduler.onSenseCompleted("cpu-usage"); + scheduler.onSenseCompleted("cpu-usage"); + scheduler.onSenseCompleted("cpu-usage"); expect(triggered.length).toBe(1); - // Complete first compute → pending drains as exactly one more run scheduler.onComputeComplete("system-health"); expect(triggered.length).toBe(2); - // Complete second compute → no more pending scheduler.onComputeComplete("system-health"); expect(triggered.length).toBe(2); @@ -316,13 +281,11 @@ describe("SenseScheduler — merge/coalesce", () => { "system-health": { ...base.senses["system-health"], interval: null, on: ["cpu-usage"] }, }, }); - const bus = createSignalBus(); - const scheduler = createSenseScheduler(config, bus, (name) => triggered.push(name)); + const scheduler = createSenseScheduler(config, (name) => triggered.push(name)); - bus.emit(makeSignal("cpu-usage")); + scheduler.onSenseCompleted("cpu-usage"); expect(triggered.length).toBe(1); - // Complete with no pending scheduler.onComputeComplete("system-health"); expect(triggered.length).toBe(1); @@ -351,14 +314,11 @@ describe("SenseScheduler — interval + on combined", () => { }, }, }); - const bus = createSignalBus(); - const scheduler = createSenseScheduler(config, bus, (name) => triggered.push(name)); + const scheduler = createSenseScheduler(config, (name) => triggered.push(name)); - // Event trigger - bus.emit(makeSignal("cpu-usage")); + scheduler.onSenseCompleted("cpu-usage"); scheduler.onComputeComplete("system-health"); - // Interval trigger vi.advanceTimersByTime(1000); scheduler.onComputeComplete("system-health"); diff --git a/packages/daemon/src/__tests__/signal-bus.test.ts b/packages/daemon/src/__tests__/signal-bus.test.ts deleted file mode 100644 index 149feec..0000000 --- a/packages/daemon/src/__tests__/signal-bus.test.ts +++ /dev/null @@ -1,99 +0,0 @@ -import { describe, expect, it, vi } from "vitest"; - -import type { Signal } from "@uncaged/nerve-core"; -import { createSignalBus } from "../signal-bus.js"; - -function makeSignal(senseId: string, payload: unknown = 1): Signal { - return { id: 1, senseId, payload, timestamp: Date.now() }; -} - -describe("createSignalBus", () => { - it("delivers emitted signal to a subscriber", () => { - const bus = createSignalBus(); - const received: Signal[] = []; - bus.subscribe((s) => received.push(s)); - - const sig = makeSignal("cpu-usage", 42); - bus.emit(sig); - - expect(received).toHaveLength(1); - expect(received[0]).toBe(sig); - }); - - it("delivers to multiple subscribers", () => { - const bus = createSignalBus(); - const a: Signal[] = []; - const b: Signal[] = []; - bus.subscribe((s) => a.push(s)); - bus.subscribe((s) => b.push(s)); - - bus.emit(makeSignal("cpu-usage")); - - expect(a).toHaveLength(1); - expect(b).toHaveLength(1); - }); - - it("unsubscribe stops delivery", () => { - const bus = createSignalBus(); - const received: Signal[] = []; - const unsub = bus.subscribe((s) => received.push(s)); - - bus.emit(makeSignal("cpu-usage")); - unsub(); - bus.emit(makeSignal("cpu-usage")); - - expect(received).toHaveLength(1); - }); - - it("remaining subscribers still receive after one unsubscribes", () => { - const bus = createSignalBus(); - const a: Signal[] = []; - const b: Signal[] = []; - const unsubA = bus.subscribe((s) => a.push(s)); - bus.subscribe((s) => b.push(s)); - - unsubA(); - bus.emit(makeSignal("cpu-usage")); - - expect(a).toHaveLength(0); - expect(b).toHaveLength(1); - }); - - it("emit with no subscribers does nothing", () => { - const bus = createSignalBus(); - expect(() => bus.emit(makeSignal("cpu-usage"))).not.toThrow(); - }); - - it("dispatch is synchronous", () => { - const bus = createSignalBus(); - const order: string[] = []; - bus.subscribe(() => order.push("handler")); - order.push("before"); - bus.emit(makeSignal("cpu-usage")); - order.push("after"); - expect(order).toEqual(["before", "handler", "after"]); - }); - - it("handler exceptions don't prevent other handlers from running", () => { - const bus = createSignalBus(); - const received: Signal[] = []; - bus.subscribe(() => { - throw new Error("boom"); - }); - bus.subscribe((s) => received.push(s)); - - expect(() => bus.emit(makeSignal("cpu-usage"))).toThrow("boom"); - expect(received).toHaveLength(1); - }); - - it("same handler can be subscribed once and fires once per emit", () => { - const bus = createSignalBus(); - const handler = vi.fn(); - bus.subscribe(handler); - - bus.emit(makeSignal("cpu-usage")); - bus.emit(makeSignal("cpu-usage")); - - expect(handler).toHaveBeenCalledTimes(2); - }); -}); diff --git a/packages/daemon/src/__tests__/worker-pool.test.ts b/packages/daemon/src/__tests__/worker-pool.test.ts index 3f1347e..04a50d4 100644 --- a/packages/daemon/src/__tests__/worker-pool.test.ts +++ b/packages/daemon/src/__tests__/worker-pool.test.ts @@ -80,8 +80,18 @@ describe("createSenseWorkerPool", () => { await startWorkerWithReady(pool, "g1"); expect(mockChildren).toHaveLength(1); const child = mockChildren[0]; - child.emit("message", { type: "signal", sense: "s", payload: 1 }); - expect(onWorkerMessage).toHaveBeenCalledWith({ type: "signal", sense: "s", payload: 1 }); + child.emit("message", { + type: "compute-result", + sense: "s", + state: 1, + workflow: null, + }); + expect(onWorkerMessage).toHaveBeenCalledWith({ + type: "compute-result", + sense: "s", + state: 1, + workflow: null, + }); }); it("sendCompute delivers to the worker for that group", async () => { diff --git a/packages/daemon/src/dashboard.html b/packages/daemon/src/dashboard.html index 35ba1ec..6132981 100644 --- a/packages/daemon/src/dashboard.html +++ b/packages/daemon/src/dashboard.html @@ -191,7 +191,6 @@ Name Type Triggers - Last signal @@ -343,7 +342,7 @@ if (!list.length) { var tr = document.createElement('tr'); var td = document.createElement('td'); - td.colSpan = 5; + td.colSpan = 4; td.style.color = 'var(--muted)'; td.textContent = 'No senses registered.'; tr.appendChild(td); @@ -353,12 +352,10 @@ list.forEach(function (s) { var tr = document.createElement('tr'); var triggers = (s.triggers && s.triggers.length) ? s.triggers.join('; ') : '—'; - var last = s.lastSignalTimestamp != null ? fmtTime(s.lastSignalTimestamp) : '—'; tr.innerHTML = '' + escapeHtml(s.name) + '' + '' + escapeHtml(String(s.group || '—')) + '' + '' + escapeHtml(triggers) + '' + - '' + escapeHtml(last) + '' + ''; var tdBtn = tr.querySelector('td:last-child'); var b = document.createElement('button'); diff --git a/packages/daemon/src/index.ts b/packages/daemon/src/index.ts index e418cfe..0d88a47 100644 --- a/packages/daemon/src/index.ts +++ b/packages/daemon/src/index.ts @@ -4,7 +4,7 @@ export type { HealthRequestMessage, HealthResponseMessage, ParentToWorkerMessage, - SignalMessage, + ComputeResultMessage, ErrorMessage, ReadyMessage, WorkerToParentMessage, @@ -15,16 +15,7 @@ export type { ThreadWorkflowMessageMessage, } from "./ipc.js"; -export type { SignalBus, SignalHandler, Unsubscribe } from "./signal-bus.js"; - -export type { DrizzleDB, SenseRuntime } from "./sense-runtime.js"; - -export { - runMigrations, - openSenseDb, - loadSenseModule, - executeCompute, -} from "./sense-runtime.js"; +export { loadSenseModule, executeCompute, readState, writeState } from "./sense-runtime.js"; export { createKernel } from "./kernel.js"; export type { Kernel, KernelOptions, KernelHealth } from "./kernel.js"; diff --git a/packages/daemon/src/ipc.ts b/packages/daemon/src/ipc.ts index bd6d38d..fc53d1e 100644 --- a/packages/daemon/src/ipc.ts +++ b/packages/daemon/src/ipc.ts @@ -4,7 +4,7 @@ */ import type { Result, WorkflowTrigger } from "@uncaged/nerve-core"; -import { err, isPlainRecord, ok } from "@uncaged/nerve-core"; +import { err, isPlainRecord, ok, parseWorkflowTrigger } from "@uncaged/nerve-core"; /** Parent → Worker: trigger one compute cycle for a sense */ export type ComputeMessage = { @@ -65,11 +65,12 @@ export type ParentToWorkerMessage = | ResumeThreadMessage | KillThreadMessage; -/** Worker → Parent: compute produced a signal */ -export type SignalMessage = { - type: "signal"; +/** Worker → Parent: sense compute finished (state persisted in worker; workflow optional). */ +export type ComputeResultMessage = { + type: "compute-result"; sense: string; - payload: unknown; + state: unknown; + workflow: WorkflowTrigger | null; }; /** Worker → Parent: sense compute result includes a workflow to start */ @@ -140,7 +141,7 @@ export type ThreadWorkflowMessageMessage = { /** Union of all messages a worker sends to the parent */ export type WorkerToParentMessage = - | SignalMessage + | ComputeResultMessage | ErrorMessage | ReadyMessage | HealthResponseMessage @@ -247,17 +248,33 @@ export function parseParentMessage(raw: unknown): Result } } -function parseSignalMsg(obj: Record): Result { +function parseComputeResultMsg(obj: Record): Result { if (typeof obj.sense !== "string") { - return err(new Error("Worker 'signal' message missing string 'sense' field")); + return err(new Error("Worker 'compute-result' message missing string 'sense' field")); } - if (!("payload" in obj)) { - return err(new Error("Worker 'signal' message missing 'payload' field")); + if (!("state" in obj)) { + return err(new Error("Worker 'compute-result' message missing 'state' field")); + } + if (!("workflow" in obj)) { + return err(new Error("Worker 'compute-result' message missing 'workflow' field")); + } + const wfRaw = obj.workflow; + if (wfRaw !== null && !isPlainRecord(wfRaw)) { + return err(new Error("Worker 'compute-result' workflow must be an object or null")); + } + let workflow: WorkflowTrigger | null; + if (wfRaw === null) { + workflow = null; + } else { + const parsed = parseWorkflowTrigger(wfRaw); + if (!parsed.ok) return err(parsed.error); + workflow = parsed.value; } return ok({ - type: "signal", + type: "compute-result", sense: obj.sense, - payload: obj.payload, + state: obj.state, + workflow, }); } @@ -341,7 +358,7 @@ function parseWorkflowErrorMsg(obj: Record): Result if (!WORKER_MSG_TYPES.has(obj.type)) { return err(new Error(`Unknown worker IPC message type: "${obj.type}"`)); } - if (obj.type === "signal") return parseSignalMsg(obj); + if (obj.type === "compute-result") return parseComputeResultMsg(obj); if (obj.type === "error") return parseErrorMsg(obj); if (obj.type === "health-response") return parseHealthResponseMsg(obj); if (obj.type === "thread-event") return parseThreadEventMsg(obj); diff --git a/packages/daemon/src/kernel.ts b/packages/daemon/src/kernel.ts index d48f2fd..148bfa0 100644 --- a/packages/daemon/src/kernel.ts +++ b/packages/daemon/src/kernel.ts @@ -1,5 +1,5 @@ /** - * Kernel — ties sense workers, signal bus, sense scheduler, workflow manager, + * Kernel — ties sense workers, sense scheduler, workflow manager, * optional file watcher, and daemon IPC. */ @@ -12,10 +12,9 @@ import { type HealthInfo, type NerveConfig, type SenseInfo, - type Signal, + type WorkflowTrigger, senseTriggerLabels, } from "@uncaged/nerve-core"; -import { routeSenseComputeOutput } from "@uncaged/nerve-core"; import { createLogStore } from "@uncaged/nerve-store"; import type { LogStore } from "@uncaged/nerve-store"; @@ -36,8 +35,6 @@ import { } from "./kernel-sense-groups.js"; import { createSenseScheduler } from "./sense-scheduler.js"; import type { SenseScheduler } from "./sense-scheduler.js"; -import { createSignalBus } from "./signal-bus.js"; -import type { SignalBus } from "./signal-bus.js"; import { createSenseWorkerPool, resolveWorkerScript } from "./worker-pool.js"; import { createWorkflowManager } from "./workflow-manager.js"; import type { WorkflowManager } from "./workflow-manager.js"; @@ -55,7 +52,6 @@ export type Kernel = { stop: () => Promise; groups: Set; senseCount: number; - bus: SignalBus; logStore: LogStore; workflowManager: WorkflowManager; ready: Promise; @@ -110,7 +106,6 @@ export function createKernel( nerveRoot: string, options: KernelOptions = defaultKernelOptions(), ): Kernel { - const bus: SignalBus = createSignalBus(); const workerScript = options.workerScript ?? resolveWorkerScript(); const startTime = Date.now(); const startedAtIso = new Date(startTime).toISOString(); @@ -127,12 +122,6 @@ export function createKernel( let config = initialConfig; - let _signalIdCounter = 0; - function nextSignalId(): number { - _signalIdCounter += 1; - return _signalIdCounter; - } - const workflowManager = createWorkflowManager(nerveRoot, config, logStore); const groups = new Set(); @@ -156,38 +145,14 @@ export function createKernel( } } - function handleSenseWorkerSignal(senseName: string, payload: unknown): void { - const routeResult = routeSenseComputeOutput(payload); - if (!routeResult.ok) { - process.stderr.write( - `[kernel] sense "${senseName}" invalid compute payload: ${routeResult.error.message}\n`, - ); - logStore.append({ - source: "sense", - type: "error", - refId: senseName, - payload: JSON.stringify({ error: routeResult.error.message }), - timestamp: Date.now(), - }); - scheduler.onComputeComplete(senseName); - return; - } - const { signal: signalPayload, workflow } = routeResult.value; - - const signal: Signal = { - id: nextSignalId(), - senseId: senseName, - payload: signalPayload, - timestamp: Date.now(), - }; + function handleComputeResult(senseName: string, workflow: WorkflowTrigger | null): void { logStore.append({ source: "sense", - type: "signal", + type: "compute-complete", refId: senseName, - payload: JSON.stringify(signalPayload), - timestamp: signal.timestamp, + payload: null, + timestamp: Date.now(), }); - bus.emit(signal); if (workflow !== null) { workflowManager.startWorkflow(workflow.name, { @@ -204,6 +169,7 @@ export function createKernel( }); } scheduler.onComputeComplete(senseName); + scheduler.onSenseCompleted(senseName); } function handleWorkerMessage(raw: unknown): void { @@ -235,8 +201,8 @@ export function createKernel( return; } - if (msg.type === "signal") { - handleSenseWorkerSignal(msg.sense, msg.payload); + if (msg.type === "compute-result") { + handleComputeResult(msg.sense, msg.workflow); } } @@ -270,7 +236,7 @@ export function createKernel( senseWorkerPool.sendCompute(group, senseName); } - scheduler = createSenseScheduler(config, bus, triggerFn, { + scheduler = createSenseScheduler(config, triggerFn, { logStore, }); @@ -306,7 +272,7 @@ export function createKernel( const oldWorkflows = config.workflows; config = newConfig; scheduler.stop(); - scheduler = createSenseScheduler(config, bus, triggerFn, { + scheduler = createSenseScheduler(config, triggerFn, { logStore, }); workflowManager.updateConfig(newConfig); @@ -388,22 +354,13 @@ export function createKernel( workflowManager, triggerSense, listSenses(): SenseInfo[] { - return Object.entries(config.senses).map(([name, senseConfig]) => { - const entries = logStore.query({ - source: "sense", - type: "signal", - refId: name, - }); - const lastEntry = entries.length > 0 ? entries[entries.length - 1] : null; - return { - name, - group: senseConfig.group, - throttle: senseConfig.throttle, - timeout: senseConfig.timeout, - triggers: senseTriggerLabels(name, config.senses), - lastSignalTimestamp: lastEntry !== null ? lastEntry.timestamp : null, - }; - }); + return Object.entries(config.senses).map(([name, senseConfig]) => ({ + name, + group: senseConfig.group, + throttle: senseConfig.throttle, + timeout: senseConfig.timeout, + triggers: senseTriggerLabels(name, config.senses), + })); }, getHealthInfo: getDaemonHealth, getDefaultMaxRounds: () => config.maxRounds, @@ -468,7 +425,6 @@ export function createKernel( stop, groups, senseCount, - bus, logStore, workflowManager, ready, diff --git a/packages/daemon/src/sense-runtime.ts b/packages/daemon/src/sense-runtime.ts index 3977b57..6b38303 100644 --- a/packages/daemon/src/sense-runtime.ts +++ b/packages/daemon/src/sense-runtime.ts @@ -1,172 +1,38 @@ -import { mkdirSync, readFileSync, readdirSync } from "node:fs"; -import { dirname, join } from "node:path"; -import { DatabaseSync } from "node:sqlite"; +import { mkdirSync, readFileSync, writeFileSync } from "node:fs"; +import { dirname } from "node:path"; -import { drizzle } from "drizzle-orm/node-sqlite"; -import type { NodeSQLiteDatabase } from "drizzle-orm/node-sqlite"; -import type { SQLiteTable } from "drizzle-orm/sqlite-core"; - -import type { ComputeResult, Result, SenseComputeFn } from "@uncaged/nerve-core"; -import { DEFAULT_SENSE_SIGNAL_RETENTION, err, isPlainRecord, ok } from "@uncaged/nerve-core"; - -/** A Drizzle DB instance (schema-generic) */ -export type DrizzleDB = NodeSQLiteDatabase>; +import type { Result, SenseComputeFn, WorkflowTrigger } from "@uncaged/nerve-core"; +import { err, isPlainRecord, ok } from "@uncaged/nerve-core"; /** All state held for one sense inside a worker */ export type SenseRuntime = { name: string; - db: DrizzleDB; compute: SenseComputeFn; - table: SQLiteTable; - persistSignal: (payload: unknown) => void; + state: unknown; + statePath: string; }; -function ensureMigrationsTable(sqlite: DatabaseSync): Result { +export function readState(statePath: string, initialState: unknown): unknown { try { - sqlite.exec( - `CREATE TABLE IF NOT EXISTS _migrations ( - name TEXT PRIMARY KEY, - applied_at INTEGER NOT NULL - )`, - ); - return ok(undefined); - } catch (e) { - const msg = e instanceof Error ? e.message : String(e); - return err(new Error(`Failed to create _migrations table: ${msg}`)); + const raw = readFileSync(statePath, "utf8"); + return JSON.parse(raw); + } catch { + return initialState; } } -function listMigrationFiles(migrationsDir: string): Result { - try { - const files = readdirSync(migrationsDir) - .filter((f) => f.endsWith(".sql")) - .sort(); - return ok(files); - } catch (e) { - const msg = e instanceof Error ? e.message : String(e); - return err(new Error(`Failed to read migrations directory "${migrationsDir}": ${msg}`)); - } -} - -function applyMigrationFile(sqlite: DatabaseSync, file: string, filePath: string): Result { - let sql: string; - try { - sql = readFileSync(filePath, "utf8"); - } catch (e) { - const msg = e instanceof Error ? e.message : String(e); - return err(new Error(`Failed to read migration file "${filePath}": ${msg}`)); - } - - const insertJournal = sqlite.prepare("INSERT INTO _migrations (name, applied_at) VALUES (?, ?)"); - sqlite.exec("BEGIN IMMEDIATE"); - try { - sqlite.exec(sql); - insertJournal.run(file, Date.now()); - sqlite.exec("COMMIT"); - return ok(undefined); - } catch (e) { - try { - sqlite.exec("ROLLBACK"); - } catch { - // ignore secondary errors during rollback - } - const msg = e instanceof Error ? e.message : String(e); - return err(new Error(`Migration "${file}" failed: ${msg}`)); - } +export function writeState(statePath: string, state: unknown): void { + mkdirSync(dirname(statePath), { recursive: true }); + writeFileSync(statePath, JSON.stringify(state, null, 2)); } /** - * Run all *.sql migration files in the given directory against a - * `node:sqlite` DatabaseSync, in lexicographic order. - * Tracks applied migrations in _migrations table to avoid re-running. - */ -export function runMigrations(sqlite: DatabaseSync, migrationsDir: string): Result { - const tableResult = ensureMigrationsTable(sqlite); - if (!tableResult.ok) return tableResult; - - const filesResult = listMigrationFiles(migrationsDir); - if (!filesResult.ok) return filesResult; - - const migrationRows = sqlite.prepare("SELECT name FROM _migrations").all(); - const applied = new Set( - migrationRows - .filter((r): r is { name: string } => isPlainRecord(r) && typeof r.name === "string") - .map((r) => r.name), - ); - - for (const file of filesResult.value) { - if (applied.has(file)) continue; - const result = applyMigrationFile(sqlite, file, join(migrationsDir, file)); - if (!result.ok) return result; - } - - return ok(undefined); -} - -/** Run `_signals` row prune after this many inserts (amortize DELETE cost). */ -const SIGNAL_INSERTS_PER_PRUNE = 100; - -/** - * Open (or create) the SQLite file at `dbPath`, run all migrations in - * `migrationsDir`, and wrap with Drizzle ORM. - */ -export function openSenseDb( - dbPath: string, - migrationsDir: string, - retention: number = DEFAULT_SENSE_SIGNAL_RETENTION, -): Result<{ sqlite: DatabaseSync; db: DrizzleDB; persistSignal: (payload: unknown) => void }> { - let sqlite: DatabaseSync; - - try { - mkdirSync(dirname(dbPath), { recursive: true }); - sqlite = new DatabaseSync(dbPath); - sqlite.exec("PRAGMA journal_mode=WAL"); - } catch (e) { - const msg = e instanceof Error ? e.message : String(e); - return err(new Error(`Failed to open database "${dbPath}": ${msg}`)); - } - - const migResult = runMigrations(sqlite, migrationsDir); - if (!migResult.ok) return migResult; - - // Auto-create _signals table for signal persistence (all senses get this) - sqlite.exec( - `CREATE TABLE IF NOT EXISTS _signals ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - payload TEXT NOT NULL, - timestamp INTEGER NOT NULL - )`, - ); - - const insertStmt = sqlite.prepare("INSERT INTO _signals (payload, timestamp) VALUES (?, ?)"); - const pruneStmt = sqlite.prepare( - "DELETE FROM _signals WHERE id <= (SELECT id FROM _signals ORDER BY id DESC LIMIT 1 OFFSET ?)", - ); - - let insertsSincePrune = 0; - - function persistSignal(payload: unknown): void { - const json = JSON.stringify(payload); - insertStmt.run(json, Date.now()); - insertsSincePrune += 1; - if (insertsSincePrune >= SIGNAL_INSERTS_PER_PRUNE) { - insertsSincePrune = 0; - pruneStmt.run(retention); - } - } - - // Drizzle infers a schema-specific DB type; senses are schema-agnostic at this layer. - const db = drizzle({ client: sqlite }) as DrizzleDB; - return ok({ sqlite, db, persistSignal }); -} - -/** - * Dynamically import the compute function and table from a sense's index.ts/js. - * The module must export a named `compute` function and a named `table` (SQLiteTable). + * Dynamically import `compute` and `initialState` from a sense's index.ts/js. + * The module must export named `compute` and `initialState`. */ export async function loadSenseModule( senseIndexPath: string, -): Promise> { +): Promise> { let mod: unknown; try { @@ -183,33 +49,24 @@ export async function loadSenseModule( ); } - if (!("table" in mod) || mod.table === null || typeof mod.table !== "object") { - return err( - new Error( - `Sense module "${senseIndexPath}" must export a named "table" (drizzle SQLiteTable)`, - ), - ); + if (!("initialState" in mod)) { + return err(new Error(`Sense module "${senseIndexPath}" must export a named "initialState"`)); } return ok({ compute: mod.compute as SenseComputeFn, - table: mod.table as SQLiteTable, + initialState: mod.initialState, }); } /** - * Execute a sense's compute function with an optional soft timeout. - * If timeoutMs is provided and compute takes longer, the AbortSignal is - * triggered and an error Result is returned. - * When compute returns non-null, `result.signal` is persisted to the sense's - * table via `db.insert(table).values(result.signal)` and `persistSignal` is - * called with `result.signal`. Returns the full `ComputeResult` so callers - * can inspect the `workflow` field. + * Execute a sense's compute with current runtime state and an optional soft timeout. + * On success, persists `result.state` to `runtime.statePath` and updates `runtime.state`. */ export async function executeCompute( runtime: SenseRuntime, timeoutMs?: number, -): Promise>> { +): Promise> { const controller = new AbortController(); let timer: ReturnType | undefined; @@ -224,16 +81,13 @@ export async function executeCompute( : null; try { - const computePromise = runtime.compute(); + const computePromise = runtime.compute(runtime.state); const result = timeoutPromise ? await Promise.race([computePromise, timeoutPromise]) : await computePromise; - if (result !== null) { - // Cast required: DrizzleDB is schema-agnostic; the sense module guarantees shape compatibility. - await runtime.db.insert(runtime.table).values(result.signal as Record); - runtime.persistSignal(result.signal); - } + runtime.state = result.state; + writeState(runtime.statePath, result.state); return ok(result); } catch (e) { diff --git a/packages/daemon/src/sense-scheduler.ts b/packages/daemon/src/sense-scheduler.ts index 29f9ce7..4a3a036 100644 --- a/packages/daemon/src/sense-scheduler.ts +++ b/packages/daemon/src/sense-scheduler.ts @@ -3,7 +3,7 @@ * * Supports: * - interval: periodic setInterval-based triggering - * - on[]: event-driven triggering via the signal bus + * - on[]: event-driven triggering when watched senses complete a compute cycle * - throttle: skip triggers that arrive too soon after the last compute * - merge/coalesce: if compute is in-flight, record one pending trigger; * run it once after the current compute completes (no unbounded queue) @@ -11,7 +11,6 @@ import type { NerveConfig } from "@uncaged/nerve-core"; import type { LogStore } from "@uncaged/nerve-store"; -import type { SignalBus, Unsubscribe } from "./signal-bus.js"; /** Sends a compute message to the worker responsible for the given sense. */ export type TriggerFn = (senseName: string) => void; @@ -28,6 +27,8 @@ type SenseState = { export type SenseScheduler = { /** Notify scheduler that a compute cycle finished. Drains the pending flag. */ onComputeComplete: (senseName: string) => void; + /** Notify scheduler that a sense completed so `on[]` subscribers may run. */ + onSenseCompleted: (senseName: string) => void; stop: () => void; }; @@ -43,21 +44,29 @@ export type SenseSchedulerOptions = { * Create and start a sense scheduler. * * @param config Full NerveConfig (senses for schedule + throttle). - * @param bus SignalBus to subscribe for event-driven triggers. * @param triggerFn Called with the sense name when a compute should be dispatched. * @param opts Optional: logStore for structured logging. - * @returns SenseScheduler with stop() and onComputeComplete() methods. + * @returns SenseScheduler with stop(), onComputeComplete(), and onSenseCompleted(). */ export function createSenseScheduler( config: NerveConfig, - bus: SignalBus, triggerFn: TriggerFn, opts?: SenseSchedulerOptions, ): SenseScheduler { const logStore = opts?.logStore; const intervals: ReturnType[] = []; - const unsubscribers: Unsubscribe[] = []; const states = new Map(); + let stopped = false; + + /** sense name → senses that list it in `on[]` */ + const onSubscribers = new Map(); + for (const [senseName, sense] of Object.entries(config.senses)) { + for (const watched of sense.on) { + const list = onSubscribers.get(watched) ?? []; + list.push(senseName); + onSubscribers.set(watched, list); + } + } function getState(senseName: string): SenseState { let state = states.get(senseName); @@ -89,6 +98,7 @@ export function createSenseScheduler( * If within throttle window, schedules a single deferred trigger at window end (fix #3). */ function maybeTrigger(senseName: string): void { + if (stopped) return; const senseConfig = config.senses[senseName]; const throttleMs = senseConfig?.throttle ?? null; const state = getState(senseName); @@ -118,10 +128,11 @@ export function createSenseScheduler( } /** - * Called by the kernel when a compute cycle completes (signal or error received). + * Called by the kernel when a compute cycle completes (compute-result or error received). * Drains the pending flag if set. */ function onComputeComplete(senseName: string): void { + if (stopped) return; const state = states.get(senseName); if (state === undefined) return; @@ -152,8 +163,17 @@ export function createSenseScheduler( dispatchCompute(senseName); } + function onSenseCompleted(senseName: string): void { + if (stopped) return; + const subscribers = onSubscribers.get(senseName); + if (subscribers === undefined) return; + for (const sub of subscribers) { + maybeTrigger(sub); + } + } + for (const [senseName, sense] of Object.entries(config.senses)) { - const { interval, on } = sense; + const { interval } = sense; if (interval !== null) { const id = setInterval(() => { @@ -161,25 +181,13 @@ export function createSenseScheduler( }, interval); intervals.push(id); } - - if (on.length > 0) { - const watchedSenses = new Set(on); - const unsub = bus.subscribe((signal) => { - if (watchedSenses.has(signal.senseId)) { - maybeTrigger(senseName); - } - }); - unsubscribers.push(unsub); - } } function stop(): void { + stopped = true; for (const id of intervals) { clearInterval(id); } - for (const unsub of unsubscribers) { - unsub(); - } for (const state of states.values()) { if (state.deferredTimer !== null) { clearTimeout(state.deferredTimer); @@ -187,8 +195,7 @@ export function createSenseScheduler( } } intervals.length = 0; - unsubscribers.length = 0; } - return { onComputeComplete, stop }; + return { onComputeComplete, onSenseCompleted, stop }; } diff --git a/packages/daemon/src/sense-worker.ts b/packages/daemon/src/sense-worker.ts index 8457bc6..f22ffdf 100644 --- a/packages/daemon/src/sense-worker.ts +++ b/packages/daemon/src/sense-worker.ts @@ -8,8 +8,7 @@ * * Layout assumptions (nerve user config at `~/.uncaged-nerve/`): * dist/senses//index.js ← bundled compute (esbuild) - * senses//migrations/ ← SQL migration files - * data/senses/.db ← SQLite data file + * data/senses/.json ← persisted sense state * nerve.yaml ← config */ @@ -19,11 +18,11 @@ import { readFileSync } from "node:fs"; import { join, resolve } from "node:path"; import { parseNerveConfig } from "@uncaged/nerve-core"; -import type { NerveConfig } from "@uncaged/nerve-core"; +import type { NerveConfig, WorkflowTrigger } from "@uncaged/nerve-core"; import type { WorkerToParentMessage } from "./ipc.js"; import { parseParentMessage } from "./ipc.js"; -import { executeCompute, loadSenseModule, openSenseDb } from "./sense-runtime.js"; +import { executeCompute, loadSenseModule, readState } from "./sense-runtime.js"; import type { SenseRuntime } from "./sense-runtime.js"; import { ignoreSessionBroadcastSignals } from "./worker-signals.js"; @@ -41,8 +40,11 @@ function sendReady(): void { send({ type: "ready" }); } -function sendSignal(sense: string, payload: unknown): void { - send({ type: "signal", sense, payload }); +function sendComputeResult( + sense: string, + value: { state: unknown; workflow: WorkflowTrigger | null }, +): void { + send({ type: "compute-result", sense, state: value.state, workflow: value.workflow }); } function sendError(sense: string, error: string): void { @@ -72,31 +74,23 @@ function readConfig(nerveRoot: string): NerveConfig { return configResult.value; } -async function initSense( - nerveRoot: string, - senseName: string, - retention: number, -): Promise { - const dbPath = join(nerveRoot, "data", "senses", `${senseName}.db`); - const migrationsDir = join(nerveRoot, "senses", senseName, "migrations"); +async function initSense(nerveRoot: string, senseName: string): Promise { + const statePath = join(nerveRoot, "data", "senses", `${senseName}.json`); const senseIndexPath = resolve(join(nerveRoot, "dist", "senses", senseName, "index.js")); - const dbResult = openSenseDb(dbPath, migrationsDir, retention); - if (!dbResult.ok) { - throw new Error(`Failed to init DB for "${senseName}": ${dbResult.error.message}`); - } - const moduleResult = await loadSenseModule(senseIndexPath); if (!moduleResult.ok) { throw new Error(`Failed to load module for "${senseName}": ${moduleResult.error.message}`); } + const { compute, initialState } = moduleResult.value; + const state = readState(statePath, initialState); + return { name: senseName, - db: dbResult.value.db, - compute: moduleResult.value.compute, - table: moduleResult.value.table, - persistSignal: dbResult.value.persistSignal, + compute, + state, + statePath, }; } @@ -149,10 +143,7 @@ async function runCompute( return; } clearGracePeriodTimer(senseName); - if (result.value != null) { - // Single IPC message: kernel uses routeSenseComputeOutput(payload) for signal + optional workflow. - sendSignal(senseName, result.value); - } + sendComputeResult(senseName, result.value); } catch (e: unknown) { const errMsg = e instanceof Error ? e.message : String(e); sendError(senseName, errMsg); @@ -236,8 +227,7 @@ async function bootstrap(nerveRoot: string, group: string): Promise { for (const senseName of groupSenses) { try { - const retention = config.senses[senseName].retention; - const runtime = await initSense(nerveRoot, senseName, retention); + const runtime = await initSense(nerveRoot, senseName); runtimes.set(senseName, runtime); } catch (e: unknown) { const eMsg = e instanceof Error ? e.message : String(e); diff --git a/packages/daemon/src/signal-bus.ts b/packages/daemon/src/signal-bus.ts deleted file mode 100644 index c8499a3..0000000 --- a/packages/daemon/src/signal-bus.ts +++ /dev/null @@ -1,51 +0,0 @@ -/** - * In-memory signal bus for routing signals between sense workers and sense-scheduler subscribers. - * Synchronous dispatch — no persistence, no async queuing. - * - * If a handler throws, the error is logged and remaining handlers still run. - * The first error is re-thrown after all handlers complete so callers can observe it. - */ - -import type { Signal } from "@uncaged/nerve-core"; - -export type SignalHandler = (signal: Signal) => void; -export type Unsubscribe = () => void; - -export type SignalBus = { - emit: (signal: Signal) => void; - subscribe: (handler: SignalHandler) => Unsubscribe; -}; - -export function createSignalBus(): SignalBus { - const handlers = new Set(); - - function emit(signal: Signal): void { - let firstError: unknown = null; - let hasError = false; - - for (const handler of handlers) { - try { - handler(signal); - } catch (e) { - console.error("[signal-bus] handler error:", e); - if (!hasError) { - firstError = e; - hasError = true; - } - } - } - - if (hasError) { - throw firstError; - } - } - - function subscribe(handler: SignalHandler): Unsubscribe { - handlers.add(handler); - return () => { - handlers.delete(handler); - }; - } - - return { emit, subscribe }; -} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 2c8e62f..f397524 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -111,9 +111,6 @@ importers: '@uncaged/nerve-store': specifier: workspace:* version: link:../store - drizzle-orm: - specifier: 1.0.0-beta.23-c10d10c - version: 1.0.0-beta.23-c10d10c(@cloudflare/workers-types@4.20260425.1)(@types/better-sqlite3@7.6.13)(@types/mssql@9.1.11(@azure/core-client@1.10.1))(better-sqlite3@11.10.0)(mssql@11.0.1(@azure/core-client@1.10.1))(sql.js@1.14.1)(zod@4.3.6) yaml: specifier: ^2.8.3 version: 2.8.3