feat(daemon): Signal Bus, Reflex Scheduler & Kernel (Phase 3)
- signal-bus: in-memory pub/sub for Signal dispatch, sync broadcast, subscriber error isolation - reflex-scheduler: interval + event-driven triggers, throttle enforcement, merge/coalesce (pending flag, no unbounded queue), workflow reflexes skipped - kernel: orchestrator tying workers, signal bus, and scheduler together, graceful shutdown - examples/nerve.yaml: cpu-usage (10s), disk-usage (30s), system-health (on: [cpu-usage, disk-usage]) - 20 new tests (45 total): signal bus (8) + reflex scheduler (12) Closes #4 小橘 🍊(NEKO Team)
This commit is contained in:
@@ -0,0 +1,40 @@
|
||||
# Example nerve.yaml demonstrating Signal Bus & Reflex Scheduler (Phase 3)
|
||||
#
|
||||
# Layout:
|
||||
# - cpu-usage: periodic every 10s, throttled to 5s minimum between computes
|
||||
# - disk-usage: periodic every 30s
|
||||
# - system-health: derived sense, triggered whenever cpu-usage OR disk-usage emits
|
||||
|
||||
senses:
|
||||
cpu-usage:
|
||||
group: system
|
||||
throttle: 5s
|
||||
timeout: 8s
|
||||
grace_period: null
|
||||
|
||||
disk-usage:
|
||||
group: system
|
||||
throttle: null
|
||||
timeout: 15s
|
||||
grace_period: null
|
||||
|
||||
system-health:
|
||||
group: derived
|
||||
throttle: 2s
|
||||
timeout: 10s
|
||||
grace_period: null
|
||||
|
||||
reflexes:
|
||||
# cpu-usage runs on a 10-second interval
|
||||
- sense: cpu-usage
|
||||
interval: 10s
|
||||
|
||||
# disk-usage runs on a 30-second interval
|
||||
- sense: disk-usage
|
||||
interval: 30s
|
||||
|
||||
# system-health is event-driven: fires whenever cpu-usage or disk-usage emits a signal
|
||||
- sense: system-health
|
||||
on:
|
||||
- cpu-usage
|
||||
- disk-usage
|
||||
@@ -0,0 +1,310 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
import type { NerveConfig, Signal } from "@uncaged/nerve-core";
|
||||
import { createReflexScheduler } from "../reflex-scheduler.js";
|
||||
import { createSignalBus } from "../signal-bus.js";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
return {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"disk-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"system-health": { group: "derived", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
function makeSignal(senseId: string, payload: unknown = 1): Signal {
|
||||
return { id: 1, senseId, payload, ts: Date.now() };
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("ReflexScheduler — interval reflex", () => {
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
});
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("fires triggerFn on schedule", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 1000, on: null }],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
// Use a ref so the triggerFn can call back into the scheduler
|
||||
const ref: { scheduler: ReturnType<typeof createReflexScheduler> | null } = {
|
||||
scheduler: null,
|
||||
};
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => {
|
||||
triggered.push(name);
|
||||
// Immediately complete compute so the scheduler is not blocked by in-flight state
|
||||
ref.scheduler?.onComputeComplete(name);
|
||||
});
|
||||
ref.scheduler = scheduler;
|
||||
|
||||
vi.advanceTimersByTime(3000);
|
||||
|
||||
expect(triggered.length).toBeGreaterThanOrEqual(3);
|
||||
expect(triggered.every((n) => n === "cpu-usage")).toBe(true);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
|
||||
it("stops firing after stop() is called", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 500, on: null }],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const ref: { scheduler: ReturnType<typeof createReflexScheduler> | null } = {
|
||||
scheduler: null,
|
||||
};
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => {
|
||||
triggered.push(name);
|
||||
ref.scheduler?.onComputeComplete(name);
|
||||
});
|
||||
ref.scheduler = scheduler;
|
||||
|
||||
vi.advanceTimersByTime(1000);
|
||||
const countBefore = triggered.length;
|
||||
scheduler.stop();
|
||||
vi.advanceTimersByTime(2000);
|
||||
|
||||
expect(triggered.length).toBe(countBefore);
|
||||
});
|
||||
|
||||
it("starts from current time — does not compensate for past intervals", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 1000, on: null }],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
// Only advance 500ms — should be 0 triggers (not catching up)
|
||||
vi.advanceTimersByTime(500);
|
||||
expect(triggered.length).toBe(0);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
});
|
||||
|
||||
describe("ReflexScheduler — event (on) reflex", () => {
|
||||
it("triggers target sense when watched sense emits a signal", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
reflexes: [
|
||||
{ kind: "sense", sense: "system-health", interval: null, on: ["cpu-usage", "disk-usage"] },
|
||||
],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onComputeComplete("system-health");
|
||||
bus.emit(makeSignal("disk-usage"));
|
||||
scheduler.onComputeComplete("system-health");
|
||||
|
||||
expect(triggered.length).toBe(2);
|
||||
expect(triggered.every((n) => n === "system-health")).toBe(true);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
|
||||
it("does not trigger for signals from non-watched senses", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
reflexes: [{ kind: "sense", sense: "system-health", interval: null, on: ["cpu-usage"] }],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
bus.emit(makeSignal("disk-usage"));
|
||||
|
||||
expect(triggered.length).toBe(0);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
|
||||
it("stops responding after stop()", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
reflexes: [{ kind: "sense", sense: "system-health", interval: null, on: ["cpu-usage"] }],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
scheduler.stop();
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
|
||||
expect(triggered.length).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe("ReflexScheduler — throttle", () => {
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
});
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("skips rapid triggers within throttle window", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: 2000, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onComputeComplete("cpu-usage");
|
||||
|
||||
// Immediately trigger again — within throttle window
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
|
||||
it("allows trigger after throttle window has passed", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: 1000, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onComputeComplete("cpu-usage");
|
||||
|
||||
vi.advanceTimersByTime(1500);
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onComputeComplete("cpu-usage");
|
||||
|
||||
expect(triggered.length).toBe(2);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
});
|
||||
|
||||
describe("ReflexScheduler — merge/coalesce", () => {
|
||||
it("concurrent triggers collapse to at most one pending run", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
reflexes: [{ kind: "sense", sense: "system-health", interval: null, on: ["cpu-usage"] }],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
// First trigger starts compute
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
// Three more arrive while first is in-flight — all should coalesce to one pending
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
// Complete first compute → pending drains as exactly one more run
|
||||
scheduler.onComputeComplete("system-health");
|
||||
expect(triggered.length).toBe(2);
|
||||
|
||||
// Complete second compute → no more pending
|
||||
scheduler.onComputeComplete("system-health");
|
||||
expect(triggered.length).toBe(2);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
|
||||
it("no new trigger while in-flight without pending → no extra run after complete", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
reflexes: [{ kind: "sense", sense: "system-health", interval: null, on: ["cpu-usage"] }],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
// Complete with no pending
|
||||
scheduler.onComputeComplete("system-health");
|
||||
expect(triggered.length).toBe(1);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
});
|
||||
|
||||
describe("ReflexScheduler — interval + on combined", () => {
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
});
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("fires both via interval and event", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
reflexes: [{ kind: "sense", sense: "system-health", interval: 1000, on: ["cpu-usage"] }],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
// Event trigger
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
scheduler.onComputeComplete("system-health");
|
||||
|
||||
// Interval trigger
|
||||
vi.advanceTimersByTime(1000);
|
||||
scheduler.onComputeComplete("system-health");
|
||||
|
||||
expect(triggered.length).toBeGreaterThanOrEqual(2);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
});
|
||||
|
||||
describe("ReflexScheduler — workflow reflexes ignored", () => {
|
||||
it("does not set up any scheduling for workflow kind reflexes", () => {
|
||||
const triggered: string[] = [];
|
||||
const config: NerveConfig = {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [{ kind: "workflow", workflow: "my-workflow", on: ["cpu-usage"] }],
|
||||
workflows: {
|
||||
"my-workflow": { concurrency: 1, overflow: "drop" },
|
||||
},
|
||||
};
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
|
||||
expect(triggered.length).toBe(0);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,99 @@
|
||||
import { describe, expect, it, vi } from "vitest";
|
||||
|
||||
import type { Signal } from "@uncaged/nerve-core";
|
||||
import { createSignalBus } from "../signal-bus.js";
|
||||
|
||||
function makeSignal(senseId: string, payload: unknown = 1): Signal {
|
||||
return { id: 1, senseId, payload, ts: Date.now() };
|
||||
}
|
||||
|
||||
describe("createSignalBus", () => {
|
||||
it("delivers emitted signal to a subscriber", () => {
|
||||
const bus = createSignalBus();
|
||||
const received: Signal[] = [];
|
||||
bus.subscribe((s) => received.push(s));
|
||||
|
||||
const sig = makeSignal("cpu-usage", 42);
|
||||
bus.emit(sig);
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0]).toBe(sig);
|
||||
});
|
||||
|
||||
it("delivers to multiple subscribers", () => {
|
||||
const bus = createSignalBus();
|
||||
const a: Signal[] = [];
|
||||
const b: Signal[] = [];
|
||||
bus.subscribe((s) => a.push(s));
|
||||
bus.subscribe((s) => b.push(s));
|
||||
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
|
||||
expect(a).toHaveLength(1);
|
||||
expect(b).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("unsubscribe stops delivery", () => {
|
||||
const bus = createSignalBus();
|
||||
const received: Signal[] = [];
|
||||
const unsub = bus.subscribe((s) => received.push(s));
|
||||
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
unsub();
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("remaining subscribers still receive after one unsubscribes", () => {
|
||||
const bus = createSignalBus();
|
||||
const a: Signal[] = [];
|
||||
const b: Signal[] = [];
|
||||
const unsubA = bus.subscribe((s) => a.push(s));
|
||||
bus.subscribe((s) => b.push(s));
|
||||
|
||||
unsubA();
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
|
||||
expect(a).toHaveLength(0);
|
||||
expect(b).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("emit with no subscribers does nothing", () => {
|
||||
const bus = createSignalBus();
|
||||
expect(() => bus.emit(makeSignal("cpu-usage"))).not.toThrow();
|
||||
});
|
||||
|
||||
it("dispatch is synchronous", () => {
|
||||
const bus = createSignalBus();
|
||||
const order: string[] = [];
|
||||
bus.subscribe(() => order.push("handler"));
|
||||
order.push("before");
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
order.push("after");
|
||||
expect(order).toEqual(["before", "handler", "after"]);
|
||||
});
|
||||
|
||||
it("handler exceptions don't prevent other handlers from running", () => {
|
||||
const bus = createSignalBus();
|
||||
const received: Signal[] = [];
|
||||
bus.subscribe(() => {
|
||||
throw new Error("boom");
|
||||
});
|
||||
bus.subscribe((s) => received.push(s));
|
||||
|
||||
expect(() => bus.emit(makeSignal("cpu-usage"))).toThrow("boom");
|
||||
expect(received).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("same handler can be subscribed once and fires once per emit", () => {
|
||||
const bus = createSignalBus();
|
||||
const handler = vi.fn();
|
||||
bus.subscribe(handler);
|
||||
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
|
||||
expect(handler).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,142 @@
|
||||
/**
|
||||
* Kernel — the main orchestrator that ties sense workers, signal bus, and
|
||||
* reflex scheduler together.
|
||||
*
|
||||
* Responsibilities:
|
||||
* - Spawn one child process per sense group (via fork)
|
||||
* - Route SignalMessage from workers → SignalBus
|
||||
* - Route ErrorMessage from workers → stderr log
|
||||
* - Drive compute triggers via ReflexScheduler
|
||||
* - Graceful shutdown: stop scheduler, send shutdown to all workers
|
||||
*/
|
||||
|
||||
import { fork } from "node:child_process";
|
||||
import type { ChildProcess } from "node:child_process";
|
||||
import { dirname, join } from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
|
||||
import type { NerveConfig, Signal } from "@uncaged/nerve-core";
|
||||
|
||||
import type { ComputeMessage, ShutdownMessage, WorkerToParentMessage } from "./ipc.js";
|
||||
import { createReflexScheduler } from "./reflex-scheduler.js";
|
||||
import type { ReflexScheduler } from "./reflex-scheduler.js";
|
||||
import { createSignalBus } from "./signal-bus.js";
|
||||
import type { SignalBus } from "./signal-bus.js";
|
||||
|
||||
export type Kernel = {
|
||||
stop: () => void;
|
||||
};
|
||||
|
||||
type WorkerEntry = {
|
||||
group: string;
|
||||
process: ChildProcess;
|
||||
};
|
||||
|
||||
let _signalIdCounter = 0;
|
||||
|
||||
function nextSignalId(): number {
|
||||
_signalIdCounter += 1;
|
||||
return _signalIdCounter;
|
||||
}
|
||||
|
||||
function resolveWorkerScript(): string {
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dir = dirname(__filename);
|
||||
return join(__dir, "sense-worker.js");
|
||||
}
|
||||
|
||||
function spawnWorker(nerveRoot: string, group: string, workerScript: string): ChildProcess {
|
||||
return fork(workerScript, ["--group", group, "--root", nerveRoot], {
|
||||
stdio: ["ignore", "inherit", "inherit", "ipc"],
|
||||
});
|
||||
}
|
||||
|
||||
function sendCompute(worker: ChildProcess, senseName: string): void {
|
||||
const msg: ComputeMessage = { type: "compute", sense: senseName };
|
||||
worker.send(msg);
|
||||
}
|
||||
|
||||
function sendShutdown(worker: ChildProcess): void {
|
||||
const msg: ShutdownMessage = { type: "shutdown" };
|
||||
worker.send(msg);
|
||||
}
|
||||
|
||||
function groupForSense(config: NerveConfig, senseName: string): string | null {
|
||||
const senseConfig = config.senses[senseName];
|
||||
if (senseConfig === undefined) return null;
|
||||
return senseConfig.group;
|
||||
}
|
||||
|
||||
export function createKernel(config: NerveConfig, nerveRoot: string): Kernel {
|
||||
const bus: SignalBus = createSignalBus();
|
||||
const workerScript = resolveWorkerScript();
|
||||
|
||||
const groups = new Set<string>();
|
||||
for (const senseConfig of Object.values(config.senses)) {
|
||||
groups.add(senseConfig.group);
|
||||
}
|
||||
|
||||
const workers = new Map<string, WorkerEntry>();
|
||||
|
||||
function triggerFn(senseName: string): void {
|
||||
const group = groupForSense(config, senseName);
|
||||
if (group === null) {
|
||||
process.stderr.write(`[kernel] triggerFn: unknown sense "${senseName}"\n`);
|
||||
return;
|
||||
}
|
||||
const entry = workers.get(group);
|
||||
if (entry === undefined) {
|
||||
process.stderr.write(`[kernel] triggerFn: no worker for group "${group}"\n`);
|
||||
return;
|
||||
}
|
||||
sendCompute(entry.process, senseName);
|
||||
}
|
||||
|
||||
const scheduler: ReflexScheduler = createReflexScheduler(config, bus, triggerFn);
|
||||
|
||||
for (const group of groups) {
|
||||
const child = spawnWorker(nerveRoot, group, workerScript);
|
||||
|
||||
child.on("message", (raw: unknown) => {
|
||||
const msg = raw as WorkerToParentMessage;
|
||||
|
||||
if (msg.type === "ready") {
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.type === "error") {
|
||||
process.stderr.write(`[kernel] sense "${msg.sense}" error: ${msg.error}\n`);
|
||||
scheduler.onComputeComplete(msg.sense);
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.type === "signal") {
|
||||
const signal: Signal = {
|
||||
id: nextSignalId(),
|
||||
senseId: msg.sense,
|
||||
payload: msg.payload,
|
||||
ts: Date.now(),
|
||||
};
|
||||
bus.emit(signal);
|
||||
scheduler.onComputeComplete(msg.sense);
|
||||
}
|
||||
});
|
||||
|
||||
child.on("exit", (code) => {
|
||||
process.stderr.write(
|
||||
`[kernel] worker for group "${group}" exited with code ${code ?? "null"}\n`,
|
||||
);
|
||||
});
|
||||
|
||||
workers.set(group, { group, process: child });
|
||||
}
|
||||
|
||||
function stop(): void {
|
||||
scheduler.stop();
|
||||
for (const entry of workers.values()) {
|
||||
sendShutdown(entry.process);
|
||||
}
|
||||
}
|
||||
|
||||
return { stop };
|
||||
}
|
||||
@@ -0,0 +1,151 @@
|
||||
/**
|
||||
* Reflex Scheduler — drives sense compute cycles based on ReflexConfig.
|
||||
*
|
||||
* Supports:
|
||||
* - interval: periodic setInterval-based triggering
|
||||
* - on[]: event-driven triggering via the signal bus
|
||||
* - throttle: skip triggers that arrive too soon after the last compute
|
||||
* - merge/coalesce: if compute is in-flight, record one pending trigger;
|
||||
* run it once after the current compute completes (no unbounded queue)
|
||||
*/
|
||||
|
||||
import type { NerveConfig, SenseReflexConfig } from "@uncaged/nerve-core";
|
||||
import type { SignalBus, Unsubscribe } from "./signal-bus.js";
|
||||
|
||||
/** Sends a compute message to the worker responsible for the given sense. */
|
||||
export type TriggerFn = (senseName: string) => void;
|
||||
|
||||
/** Per-sense mutable state tracked by the scheduler. */
|
||||
type SenseState = {
|
||||
lastComputeAt: number;
|
||||
inFlight: boolean;
|
||||
pending: boolean;
|
||||
};
|
||||
|
||||
/** Handle returned by createReflexScheduler — call stop() for graceful shutdown. */
|
||||
export type ReflexScheduler = {
|
||||
/** Notify scheduler that a compute cycle finished. Drains the pending flag. */
|
||||
onComputeComplete: (senseName: string) => void;
|
||||
stop: () => void;
|
||||
};
|
||||
|
||||
function makeSenseState(): SenseState {
|
||||
return { lastComputeAt: 0, inFlight: false, pending: false };
|
||||
}
|
||||
|
||||
/**
|
||||
* Create and start a reflex scheduler.
|
||||
*
|
||||
* @param config Full NerveConfig (reads senses for throttle/timeout, reflexes for schedule).
|
||||
* @param bus SignalBus to subscribe for event-driven reflexes.
|
||||
* @param triggerFn Called with the sense name when a compute should be dispatched.
|
||||
* @returns ReflexScheduler with stop() and onComputeComplete() methods.
|
||||
*/
|
||||
export function createReflexScheduler(
|
||||
config: NerveConfig,
|
||||
bus: SignalBus,
|
||||
triggerFn: TriggerFn,
|
||||
): ReflexScheduler {
|
||||
const intervals: ReturnType<typeof setInterval>[] = [];
|
||||
const unsubscribers: Unsubscribe[] = [];
|
||||
const states = new Map<string, SenseState>();
|
||||
|
||||
function getState(senseName: string): SenseState {
|
||||
let state = states.get(senseName);
|
||||
if (state === undefined) {
|
||||
state = makeSenseState();
|
||||
states.set(senseName, state);
|
||||
}
|
||||
return state;
|
||||
}
|
||||
|
||||
function dispatchCompute(senseName: string): void {
|
||||
const state = getState(senseName);
|
||||
state.inFlight = true;
|
||||
state.pending = false;
|
||||
state.lastComputeAt = Date.now();
|
||||
triggerFn(senseName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempt to trigger compute for a sense.
|
||||
* Respects throttle window and merge/coalesce semantics.
|
||||
*/
|
||||
function maybeTrigger(senseName: string): void {
|
||||
const senseConfig = config.senses[senseName];
|
||||
const throttleMs = senseConfig?.throttle ?? null;
|
||||
const state = getState(senseName);
|
||||
const now = Date.now();
|
||||
|
||||
if (throttleMs !== null && now - state.lastComputeAt < throttleMs) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (state.inFlight) {
|
||||
state.pending = true;
|
||||
return;
|
||||
}
|
||||
|
||||
dispatchCompute(senseName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by the kernel when a compute cycle completes (signal or error received).
|
||||
* Drains the pending flag if set.
|
||||
*/
|
||||
function onComputeComplete(senseName: string): void {
|
||||
const state = states.get(senseName);
|
||||
if (state === undefined) return;
|
||||
|
||||
state.inFlight = false;
|
||||
|
||||
if (!state.pending) return;
|
||||
|
||||
const senseConfig = config.senses[senseName];
|
||||
const throttleMs = senseConfig?.throttle ?? null;
|
||||
const now = Date.now();
|
||||
|
||||
if (throttleMs !== null && now - state.lastComputeAt < throttleMs) {
|
||||
state.pending = false;
|
||||
return;
|
||||
}
|
||||
|
||||
dispatchCompute(senseName);
|
||||
}
|
||||
|
||||
for (const reflex of config.reflexes) {
|
||||
if (reflex.kind !== "sense") continue;
|
||||
const senseReflex = reflex as SenseReflexConfig;
|
||||
const senseName = senseReflex.sense;
|
||||
|
||||
if (senseReflex.interval !== null) {
|
||||
const id = setInterval(() => {
|
||||
maybeTrigger(senseName);
|
||||
}, senseReflex.interval);
|
||||
intervals.push(id);
|
||||
}
|
||||
|
||||
if (senseReflex.on !== null && senseReflex.on.length > 0) {
|
||||
const watchedSenses = new Set(senseReflex.on);
|
||||
const unsub = bus.subscribe((signal) => {
|
||||
if (watchedSenses.has(signal.senseId)) {
|
||||
maybeTrigger(senseName);
|
||||
}
|
||||
});
|
||||
unsubscribers.push(unsub);
|
||||
}
|
||||
}
|
||||
|
||||
function stop(): void {
|
||||
for (const id of intervals) {
|
||||
clearInterval(id);
|
||||
}
|
||||
for (const unsub of unsubscribers) {
|
||||
unsub();
|
||||
}
|
||||
intervals.length = 0;
|
||||
unsubscribers.length = 0;
|
||||
}
|
||||
|
||||
return { onComputeComplete, stop };
|
||||
}
|
||||
@@ -0,0 +1,50 @@
|
||||
/**
|
||||
* In-memory signal bus for routing signals between sense workers and reflex subscribers.
|
||||
* Synchronous dispatch — no persistence, no async queuing.
|
||||
*
|
||||
* If a handler throws, remaining handlers still run; the first exception is re-thrown
|
||||
* after all handlers complete.
|
||||
*/
|
||||
|
||||
import type { Signal } from "@uncaged/nerve-core";
|
||||
|
||||
export type SignalHandler = (signal: Signal) => void;
|
||||
export type Unsubscribe = () => void;
|
||||
|
||||
export type SignalBus = {
|
||||
emit: (signal: Signal) => void;
|
||||
subscribe: (handler: SignalHandler) => Unsubscribe;
|
||||
};
|
||||
|
||||
export function createSignalBus(): SignalBus {
|
||||
const handlers = new Set<SignalHandler>();
|
||||
|
||||
function emit(signal: Signal): void {
|
||||
let firstError: unknown = null;
|
||||
let hasError = false;
|
||||
|
||||
for (const handler of handlers) {
|
||||
try {
|
||||
handler(signal);
|
||||
} catch (e) {
|
||||
if (!hasError) {
|
||||
firstError = e;
|
||||
hasError = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (hasError) {
|
||||
throw firstError;
|
||||
}
|
||||
}
|
||||
|
||||
function subscribe(handler: SignalHandler): Unsubscribe {
|
||||
handlers.add(handler);
|
||||
return () => {
|
||||
handlers.delete(handler);
|
||||
};
|
||||
}
|
||||
|
||||
return { emit, subscribe };
|
||||
}
|
||||
Reference in New Issue
Block a user