fix(daemon): address all 12 PR #10 review items
🔴 Critical: 1. parseWorkerMessage() in ipc.ts — validates worker→parent IPC messages 2. signalIdCounter moved inside createKernel closure 3. throttle deferred trigger — pending trigger fires after window ends ⚠️ Warnings: 4. worker respawn on crash with backoff 5. stop() awaits worker exit with SIGKILL fallback 6. signal-bus handler errors caught, no re-throw 7. removed unnecessary as SenseReflexConfig cast 8. config validates sense reflex has at least one trigger 💡 Suggestions: 9. signal ID documented as process-scoped (solved by #2) 10. +3 throttle-pending tests 11. +6 kernel unit tests (mock fork, message routing) 12. example imports verified correct 54 tests (was 45), all green. biome check passes. 小橘 🍊(NEKO Team)
This commit is contained in:
@@ -98,6 +98,12 @@ function parseSenseReflex(
|
||||
const intervalResult = parseDurationField(obj.interval, `reflexes[${index}].interval`);
|
||||
if (!intervalResult.ok) return intervalResult;
|
||||
|
||||
if (intervalResult.value === null && on !== null && on.length === 0) {
|
||||
return err(
|
||||
new Error(`reflexes[${index}]: sense reflex must have at least one of "interval" or "on"`),
|
||||
);
|
||||
}
|
||||
|
||||
return ok({
|
||||
kind: "sense" as const,
|
||||
sense: obj.sense,
|
||||
|
||||
@@ -0,0 +1,200 @@
|
||||
import { EventEmitter } from "node:events";
|
||||
|
||||
import type { NerveConfig } from "@uncaged/nerve-core";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Mock child_process.fork before importing kernel
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const mockChildren: MockChild[] = [];
|
||||
|
||||
type MockChild = EventEmitter & {
|
||||
send: ReturnType<typeof vi.fn>;
|
||||
kill: ReturnType<typeof vi.fn>;
|
||||
pid: number;
|
||||
};
|
||||
|
||||
function makeMockChild(pid = 1): MockChild {
|
||||
const child = new EventEmitter() as MockChild;
|
||||
child.send = vi.fn((msg: unknown) => {
|
||||
// Auto-exit on shutdown so stop() resolves
|
||||
if (
|
||||
msg !== null &&
|
||||
typeof msg === "object" &&
|
||||
(msg as Record<string, unknown>).type === "shutdown"
|
||||
) {
|
||||
setImmediate(() => child.emit("exit", 0, null));
|
||||
}
|
||||
});
|
||||
child.kill = vi.fn((_signal?: string) => {
|
||||
child.emit("exit", null, _signal ?? "SIGKILL");
|
||||
});
|
||||
child.pid = pid;
|
||||
return child;
|
||||
}
|
||||
|
||||
vi.mock("node:child_process", () => ({
|
||||
fork: vi.fn((_script: string, _args: string[], _opts: unknown) => {
|
||||
const child = makeMockChild(mockChildren.length + 1);
|
||||
mockChildren.push(child);
|
||||
return child;
|
||||
}),
|
||||
}));
|
||||
|
||||
// Import after mock is set up
|
||||
const { createKernel } = await import("../kernel.js");
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
return {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("kernel — message routing", () => {
|
||||
beforeEach(() => {
|
||||
mockChildren.length = 0;
|
||||
vi.useFakeTimers({ shouldAdvanceTime: true });
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("routes signal message to bus without throwing", async () => {
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
});
|
||||
const kernel = createKernel(config, "/tmp/nerve-test");
|
||||
|
||||
expect(mockChildren.length).toBe(1);
|
||||
const child = mockChildren[0];
|
||||
|
||||
expect(() => {
|
||||
child.emit("message", { type: "signal", sense: "cpu-usage", payload: 42 });
|
||||
}).not.toThrow();
|
||||
|
||||
await kernel.stop();
|
||||
});
|
||||
|
||||
it("routes error message to stderr", async () => {
|
||||
const stderrSpy = vi.spyOn(process.stderr, "write").mockImplementation(() => true);
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
});
|
||||
const kernel = createKernel(config, "/tmp/nerve-test");
|
||||
|
||||
const child = mockChildren[0];
|
||||
child.emit("message", { type: "error", sense: "cpu-usage", error: "compute failed" });
|
||||
|
||||
expect(stderrSpy).toHaveBeenCalledWith(
|
||||
expect.stringContaining('sense "cpu-usage" error: compute failed'),
|
||||
);
|
||||
|
||||
stderrSpy.mockRestore();
|
||||
await kernel.stop();
|
||||
});
|
||||
|
||||
it("ignores ready messages without logging", async () => {
|
||||
const stderrSpy = vi.spyOn(process.stderr, "write").mockImplementation(() => true);
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
});
|
||||
const kernel = createKernel(config, "/tmp/nerve-test");
|
||||
|
||||
const child = mockChildren[0];
|
||||
const callsBefore = stderrSpy.mock.calls.length;
|
||||
child.emit("message", { type: "ready" });
|
||||
|
||||
expect(stderrSpy.mock.calls.length).toBe(callsBefore);
|
||||
|
||||
stderrSpy.mockRestore();
|
||||
await kernel.stop();
|
||||
});
|
||||
|
||||
it("logs invalid worker messages without throwing", async () => {
|
||||
const stderrSpy = vi.spyOn(process.stderr, "write").mockImplementation(() => true);
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
});
|
||||
const kernel = createKernel(config, "/tmp/nerve-test");
|
||||
|
||||
const child = mockChildren[0];
|
||||
expect(() => child.emit("message", { type: "unknown-type" })).not.toThrow();
|
||||
|
||||
expect(stderrSpy).toHaveBeenCalledWith(expect.stringContaining("invalid worker message"));
|
||||
|
||||
stderrSpy.mockRestore();
|
||||
await kernel.stop();
|
||||
});
|
||||
});
|
||||
|
||||
describe("kernel — groupForSense mapping", () => {
|
||||
beforeEach(() => {
|
||||
mockChildren.length = 0;
|
||||
vi.useFakeTimers({ shouldAdvanceTime: true });
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("spawns one worker per unique group", async () => {
|
||||
const config: NerveConfig = {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"disk-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"net-usage": { group: "network", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
};
|
||||
const kernel = createKernel(config, "/tmp/nerve-test");
|
||||
|
||||
// system and network = 2 unique groups
|
||||
expect(mockChildren.length).toBe(2);
|
||||
await kernel.stop();
|
||||
});
|
||||
|
||||
it("sends compute to the correct worker on interval trigger", async () => {
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 500, on: null }],
|
||||
});
|
||||
createKernel(config, "/tmp/nerve-test");
|
||||
|
||||
const child = mockChildren[0];
|
||||
vi.advanceTimersByTime(500);
|
||||
|
||||
expect(child.send).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ type: "compute", sense: "cpu-usage" }),
|
||||
);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,120 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
import type { NerveConfig, Signal } from "@uncaged/nerve-core";
|
||||
import { createReflexScheduler } from "../reflex-scheduler.js";
|
||||
import { createSignalBus } from "../signal-bus.js";
|
||||
|
||||
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
return {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
function makeSignal(senseId: string, payload: unknown = 1): Signal {
|
||||
return { id: 1, senseId, payload, ts: Date.now() };
|
||||
}
|
||||
|
||||
describe("ReflexScheduler — throttle + pending deferred trigger", () => {
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("trigger during throttle window fires deferred trigger after window ends", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: 2000, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
// First trigger fires immediately (outside throttle window)
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onComputeComplete("cpu-usage");
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
// Second trigger arrives 500ms later — still within throttle window (2000ms)
|
||||
vi.advanceTimersByTime(500);
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
// Should not fire yet (throttled)
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
// Advance past the throttle window end; deferred trigger should now fire
|
||||
vi.advanceTimersByTime(1600);
|
||||
expect(triggered.length).toBe(2);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
|
||||
it("multiple triggers during throttle window only produce one deferred trigger", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: 2000, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
// First trigger fires
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onComputeComplete("cpu-usage");
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
// Multiple triggers within throttle window — should not stack
|
||||
vi.advanceTimersByTime(300);
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
vi.advanceTimersByTime(300);
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
vi.advanceTimersByTime(300);
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
// Advance past window — exactly one deferred trigger fires
|
||||
vi.advanceTimersByTime(1200);
|
||||
scheduler.onComputeComplete("cpu-usage");
|
||||
expect(triggered.length).toBe(2);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
|
||||
it("deferred trigger does not fire after stop()", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: 2000, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onComputeComplete("cpu-usage");
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
// Trigger during throttle window — schedules deferred
|
||||
vi.advanceTimersByTime(500);
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
// Stop before window ends
|
||||
scheduler.stop();
|
||||
|
||||
// Advance past window — deferred timer should have been cleared
|
||||
vi.advanceTimersByTime(2000);
|
||||
expect(triggered.length).toBe(1);
|
||||
});
|
||||
});
|
||||
@@ -57,3 +57,37 @@ export function parseParentMessage(raw: unknown): Result<ParentToWorkerMessage>
|
||||
}
|
||||
return ok(raw as ParentToWorkerMessage);
|
||||
}
|
||||
|
||||
/** Validate and parse an unknown IPC message received from a worker process. */
|
||||
export function parseWorkerMessage(raw: unknown): Result<WorkerToParentMessage> {
|
||||
if (raw === null || typeof raw !== "object") {
|
||||
return err(new Error("Worker IPC message is not an object"));
|
||||
}
|
||||
const obj = raw as Record<string, unknown>;
|
||||
if (typeof obj.type !== "string") {
|
||||
return err(new Error("Worker IPC message missing string 'type' field"));
|
||||
}
|
||||
const type = obj.type;
|
||||
if (type !== "signal" && type !== "error" && type !== "ready") {
|
||||
return err(new Error(`Unknown worker IPC message type: "${type}"`));
|
||||
}
|
||||
if (type === "signal") {
|
||||
if (typeof obj.sense !== "string") {
|
||||
return err(new Error("Worker 'signal' message missing string 'sense' field"));
|
||||
}
|
||||
if (!("payload" in obj)) {
|
||||
return err(new Error("Worker 'signal' message missing 'payload' field"));
|
||||
}
|
||||
return ok(raw as SignalMessage);
|
||||
}
|
||||
if (type === "error") {
|
||||
if (typeof obj.sense !== "string") {
|
||||
return err(new Error("Worker 'error' message missing string 'sense' field"));
|
||||
}
|
||||
if (typeof obj.error !== "string") {
|
||||
return err(new Error("Worker 'error' message missing string 'error' field"));
|
||||
}
|
||||
return ok(raw as ErrorMessage);
|
||||
}
|
||||
return ok({ type: "ready" });
|
||||
}
|
||||
|
||||
@@ -17,14 +17,15 @@ import { fileURLToPath } from "node:url";
|
||||
|
||||
import type { NerveConfig, Signal } from "@uncaged/nerve-core";
|
||||
|
||||
import type { ComputeMessage, ShutdownMessage, WorkerToParentMessage } from "./ipc.js";
|
||||
import type { ComputeMessage, ShutdownMessage } from "./ipc.js";
|
||||
import { parseWorkerMessage } from "./ipc.js";
|
||||
import { createReflexScheduler } from "./reflex-scheduler.js";
|
||||
import type { ReflexScheduler } from "./reflex-scheduler.js";
|
||||
import { createSignalBus } from "./signal-bus.js";
|
||||
import type { SignalBus } from "./signal-bus.js";
|
||||
|
||||
export type Kernel = {
|
||||
stop: () => void;
|
||||
stop: () => Promise<void>;
|
||||
};
|
||||
|
||||
type WorkerEntry = {
|
||||
@@ -32,13 +33,6 @@ type WorkerEntry = {
|
||||
process: ChildProcess;
|
||||
};
|
||||
|
||||
let _signalIdCounter = 0;
|
||||
|
||||
function nextSignalId(): number {
|
||||
_signalIdCounter += 1;
|
||||
return _signalIdCounter;
|
||||
}
|
||||
|
||||
function resolveWorkerScript(): string {
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dir = dirname(__filename);
|
||||
@@ -71,12 +65,73 @@ export function createKernel(config: NerveConfig, nerveRoot: string): Kernel {
|
||||
const bus: SignalBus = createSignalBus();
|
||||
const workerScript = resolveWorkerScript();
|
||||
|
||||
// Signal ID counter is instance-scoped (fix #2)
|
||||
let _signalIdCounter = 0;
|
||||
function nextSignalId(): number {
|
||||
_signalIdCounter += 1;
|
||||
return _signalIdCounter;
|
||||
}
|
||||
|
||||
const groups = new Set<string>();
|
||||
for (const senseConfig of Object.values(config.senses)) {
|
||||
groups.add(senseConfig.group);
|
||||
}
|
||||
|
||||
const workers = new Map<string, WorkerEntry>();
|
||||
let stopped = false;
|
||||
|
||||
function handleWorkerMessage(raw: unknown): void {
|
||||
const result = parseWorkerMessage(raw);
|
||||
if (!result.ok) {
|
||||
process.stderr.write(`[kernel] invalid worker message: ${result.error.message}\n`);
|
||||
return;
|
||||
}
|
||||
const msg = result.value;
|
||||
|
||||
if (msg.type === "ready") {
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.type === "error") {
|
||||
process.stderr.write(`[kernel] sense "${msg.sense}" error: ${msg.error}\n`);
|
||||
scheduler.onComputeComplete(msg.sense);
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.type === "signal") {
|
||||
const signal: Signal = {
|
||||
id: nextSignalId(),
|
||||
senseId: msg.sense,
|
||||
payload: msg.payload,
|
||||
ts: Date.now(),
|
||||
};
|
||||
bus.emit(signal);
|
||||
scheduler.onComputeComplete(msg.sense);
|
||||
}
|
||||
}
|
||||
|
||||
function startWorker(group: string): void {
|
||||
const child = spawnWorker(nerveRoot, group, workerScript);
|
||||
|
||||
child.on("message", handleWorkerMessage);
|
||||
|
||||
child.on("exit", (code) => {
|
||||
process.stderr.write(
|
||||
`[kernel] worker for group "${group}" exited with code ${code ?? "null"}\n`,
|
||||
);
|
||||
// Respawn on unexpected exit (fix #4)
|
||||
if (!stopped && code !== 0) {
|
||||
process.stderr.write(`[kernel] respawning worker for group "${group}" in 1s\n`);
|
||||
setTimeout(() => {
|
||||
if (!stopped) {
|
||||
startWorker(group);
|
||||
}
|
||||
}, 1000);
|
||||
}
|
||||
});
|
||||
|
||||
workers.set(group, { group, process: child });
|
||||
}
|
||||
|
||||
function triggerFn(senseName: string): void {
|
||||
const group = groupForSense(config, senseName);
|
||||
@@ -95,47 +150,32 @@ export function createKernel(config: NerveConfig, nerveRoot: string): Kernel {
|
||||
const scheduler: ReflexScheduler = createReflexScheduler(config, bus, triggerFn);
|
||||
|
||||
for (const group of groups) {
|
||||
const child = spawnWorker(nerveRoot, group, workerScript);
|
||||
|
||||
child.on("message", (raw: unknown) => {
|
||||
const msg = raw as WorkerToParentMessage;
|
||||
|
||||
if (msg.type === "ready") {
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.type === "error") {
|
||||
process.stderr.write(`[kernel] sense "${msg.sense}" error: ${msg.error}\n`);
|
||||
scheduler.onComputeComplete(msg.sense);
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.type === "signal") {
|
||||
const signal: Signal = {
|
||||
id: nextSignalId(),
|
||||
senseId: msg.sense,
|
||||
payload: msg.payload,
|
||||
ts: Date.now(),
|
||||
};
|
||||
bus.emit(signal);
|
||||
scheduler.onComputeComplete(msg.sense);
|
||||
}
|
||||
});
|
||||
|
||||
child.on("exit", (code) => {
|
||||
process.stderr.write(
|
||||
`[kernel] worker for group "${group}" exited with code ${code ?? "null"}\n`,
|
||||
);
|
||||
});
|
||||
|
||||
workers.set(group, { group, process: child });
|
||||
startWorker(group);
|
||||
}
|
||||
|
||||
function stop(): void {
|
||||
// Wait for a worker process to exit, with a timeout + SIGKILL fallback (fix #5)
|
||||
function waitForExit(child: ChildProcess, timeoutMs: number): Promise<void> {
|
||||
return new Promise((resolve) => {
|
||||
const timer = setTimeout(() => {
|
||||
child.kill("SIGKILL");
|
||||
resolve();
|
||||
}, timeoutMs);
|
||||
child.once("exit", () => {
|
||||
clearTimeout(timer);
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
async function stop(): Promise<void> {
|
||||
stopped = true;
|
||||
scheduler.stop();
|
||||
const exitPromises: Promise<void>[] = [];
|
||||
for (const entry of workers.values()) {
|
||||
sendShutdown(entry.process);
|
||||
exitPromises.push(waitForExit(entry.process, 5000));
|
||||
}
|
||||
await Promise.all(exitPromises);
|
||||
}
|
||||
|
||||
return { stop };
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
* run it once after the current compute completes (no unbounded queue)
|
||||
*/
|
||||
|
||||
import type { NerveConfig, SenseReflexConfig } from "@uncaged/nerve-core";
|
||||
import type { NerveConfig } from "@uncaged/nerve-core";
|
||||
import type { SignalBus, Unsubscribe } from "./signal-bus.js";
|
||||
|
||||
/** Sends a compute message to the worker responsible for the given sense. */
|
||||
@@ -20,6 +20,7 @@ type SenseState = {
|
||||
lastComputeAt: number;
|
||||
inFlight: boolean;
|
||||
pending: boolean;
|
||||
deferredTimer: ReturnType<typeof setTimeout> | null;
|
||||
};
|
||||
|
||||
/** Handle returned by createReflexScheduler — call stop() for graceful shutdown. */
|
||||
@@ -30,7 +31,7 @@ export type ReflexScheduler = {
|
||||
};
|
||||
|
||||
function makeSenseState(): SenseState {
|
||||
return { lastComputeAt: 0, inFlight: false, pending: false };
|
||||
return { lastComputeAt: 0, inFlight: false, pending: false, deferredTimer: null };
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -70,6 +71,7 @@ export function createReflexScheduler(
|
||||
/**
|
||||
* Attempt to trigger compute for a sense.
|
||||
* Respects throttle window and merge/coalesce semantics.
|
||||
* If within throttle window, schedules a single deferred trigger at window end (fix #3).
|
||||
*/
|
||||
function maybeTrigger(senseName: string): void {
|
||||
const senseConfig = config.senses[senseName];
|
||||
@@ -77,8 +79,19 @@ export function createReflexScheduler(
|
||||
const state = getState(senseName);
|
||||
const now = Date.now();
|
||||
|
||||
if (throttleMs !== null && now - state.lastComputeAt < throttleMs) {
|
||||
return;
|
||||
if (throttleMs !== null) {
|
||||
const elapsed = now - state.lastComputeAt;
|
||||
if (elapsed < throttleMs) {
|
||||
// Schedule one deferred trigger at the end of the throttle window (no stacking)
|
||||
if (state.deferredTimer === null) {
|
||||
const remaining = throttleMs - elapsed;
|
||||
state.deferredTimer = setTimeout(() => {
|
||||
state.deferredTimer = null;
|
||||
maybeTrigger(senseName);
|
||||
}, remaining);
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (state.inFlight) {
|
||||
@@ -106,6 +119,17 @@ export function createReflexScheduler(
|
||||
const now = Date.now();
|
||||
|
||||
if (throttleMs !== null && now - state.lastComputeAt < throttleMs) {
|
||||
// Schedule deferred trigger if not already pending (fix #3)
|
||||
if (state.deferredTimer === null) {
|
||||
const remaining = throttleMs - (now - state.lastComputeAt);
|
||||
state.deferredTimer = setTimeout(() => {
|
||||
state.deferredTimer = null;
|
||||
const s = states.get(senseName);
|
||||
if (s !== undefined && !s.inFlight) {
|
||||
dispatchCompute(senseName);
|
||||
}
|
||||
}, remaining);
|
||||
}
|
||||
state.pending = false;
|
||||
return;
|
||||
}
|
||||
@@ -115,7 +139,7 @@ export function createReflexScheduler(
|
||||
|
||||
for (const reflex of config.reflexes) {
|
||||
if (reflex.kind !== "sense") continue;
|
||||
const senseReflex = reflex as SenseReflexConfig;
|
||||
const senseReflex = reflex;
|
||||
const senseName = senseReflex.sense;
|
||||
|
||||
if (senseReflex.interval !== null) {
|
||||
@@ -143,6 +167,12 @@ export function createReflexScheduler(
|
||||
for (const unsub of unsubscribers) {
|
||||
unsub();
|
||||
}
|
||||
for (const state of states.values()) {
|
||||
if (state.deferredTimer !== null) {
|
||||
clearTimeout(state.deferredTimer);
|
||||
state.deferredTimer = null;
|
||||
}
|
||||
}
|
||||
intervals.length = 0;
|
||||
unsubscribers.length = 0;
|
||||
}
|
||||
|
||||
@@ -2,8 +2,8 @@
|
||||
* In-memory signal bus for routing signals between sense workers and reflex subscribers.
|
||||
* Synchronous dispatch — no persistence, no async queuing.
|
||||
*
|
||||
* If a handler throws, remaining handlers still run; the first exception is re-thrown
|
||||
* after all handlers complete.
|
||||
* If a handler throws, the error is logged and remaining handlers still run.
|
||||
* The first error is re-thrown after all handlers complete so callers can observe it.
|
||||
*/
|
||||
|
||||
import type { Signal } from "@uncaged/nerve-core";
|
||||
@@ -27,6 +27,7 @@ export function createSignalBus(): SignalBus {
|
||||
try {
|
||||
handler(signal);
|
||||
} catch (e) {
|
||||
console.error("[signal-bus] handler error:", e);
|
||||
if (!hasError) {
|
||||
firstError = e;
|
||||
hasError = true;
|
||||
|
||||
Reference in New Issue
Block a user