Merge pull request 'refactor: inline reflex config — sense-level trigger declarations' (#198) from refactor/189-inline-reflex-config into main

This commit was merged in pull request #198.
This commit is contained in:
2026-04-27 10:58:47 +00:00
24 changed files with 532 additions and 138 deletions
+2 -4
View File
@@ -65,8 +65,6 @@ const nerveYamlTemplate = `senses:
counter:
group: e2e
reflexes: []
workflows:
echo:
concurrency: 1
@@ -110,8 +108,6 @@ const nerveYamlWithNoopWorkflow = `senses:
counter:
group: e2e
reflexes: []
workflows:
noop:
concurrency: 1
@@ -187,6 +183,8 @@ function defaultTestConfig(withNoopWorkflow: boolean): NerveConfig {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -116,8 +116,6 @@ const VALID_NERVE_YAML = `senses:
counter:
group: e2e
reflexes: []
workflows: {}
max_rounds: 10
@@ -45,8 +45,6 @@ senses:
group: system
throttle: 5s
timeout: 3s
reflexes:
- sense: cpu-usage
interval: 5s
`.trim();
}
@@ -121,7 +119,7 @@ reflexes:
rmSync(sockDir, { recursive: true, force: true });
});
it("prints sense list from daemon path with name, group, throttle, triggers, and last signal time", async () => {
it("prints sense list from daemon path with name, group, throttle, trigger schedule, and last signal time", async () => {
// With a real daemon, we would wait for a compute cycle; the mock server
// returns SenseInfo as if one already produced lastSignalTimestamp.
await runCommand(senseCommand, { rawArgs: ["list"] });
@@ -134,7 +132,7 @@ reflexes:
expect(out).toContain("group: system");
expect(out).toContain("throttle: 5s");
expect(out).toContain("timeout: 3s");
expect(out).toContain("triggers: every 5s");
expect(out).toContain("trigger schedule: every 5s");
expect(out).not.toContain("(never)");
expect(out).toContain(new Date(LAST_SIGNAL_TS).toISOString());
});
+24 -62
View File
@@ -14,63 +14,7 @@ import { tmpdir } from "node:os";
import { join } from "node:path";
import type { SenseInfo } from "@uncaged/nerve-core";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
function intervalLabelFromReflexRecord(rec: Record<string, unknown>): string | null {
if (typeof rec.interval !== "number") return null;
const totalSeconds = Math.floor(rec.interval / 1000);
if (totalSeconds < 60) return `every ${totalSeconds}s`;
return `every ${Math.floor(totalSeconds / 60)}m`;
}
function onTriggerPartFromReflexRecord(rec: Record<string, unknown>): string | null {
if (!Array.isArray(rec.on) || rec.on.length === 0) return null;
const onLabels = rec.on.filter((v): v is string => typeof v === "string");
if (onLabels.length === 0) return null;
return `on: ${onLabels.join(", ")}`;
}
/** Returns a display line when this reflex targets `senseName`; otherwise null (skip). */
function reflexTriggerLineForSense(rec: Record<string, unknown>, senseName: string): string | null {
if (rec.kind !== "sense" || rec.sense !== senseName) return null;
const parts: string[] = [];
const intervalPart = intervalLabelFromReflexRecord(rec);
if (intervalPart !== null) parts.push(intervalPart);
const onPart = onTriggerPartFromReflexRecord(rec);
if (onPart !== null) parts.push(onPart);
return parts.length > 0 ? parts.join(" · ") : "reflex (no interval or on)";
}
function mockSenseTriggerLabels(senseName: string, reflexes: readonly unknown[]): string[] {
const out: string[] = [];
for (const reflex of reflexes) {
if (typeof reflex !== "object" || reflex === null) continue;
const rec = reflex as Record<string, unknown>;
const line = reflexTriggerLineForSense(rec, senseName);
if (line !== null) out.push(line);
}
return out;
}
vi.mock("@uncaged/nerve-core", async () => {
const actual = await vi.importActual<typeof import("@uncaged/nerve-core")>("@uncaged/nerve-core");
return {
...actual,
senseTriggerLabels: mockSenseTriggerLabels,
isSenseInfo: (value: unknown): value is SenseInfo => {
if (typeof value !== "object" || value === null) return false;
const rec = value as Record<string, unknown>;
return (
typeof rec.name === "string" &&
typeof rec.group === "string" &&
(typeof rec.throttle === "number" || rec.throttle === null) &&
(typeof rec.timeout === "number" || rec.timeout === null) &&
Array.isArray(rec.triggers) &&
(typeof rec.lastSignalTimestamp === "number" || rec.lastSignalTimestamp === null)
);
},
};
});
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { formatDuration, formatSenseList, sensesFromConfig } from "../commands/sense.js";
import { listSensesViaDaemon } from "../daemon-client.js";
@@ -171,9 +115,9 @@ describe("formatSenseList", () => {
expect(output).toContain("—");
});
it("shows triggers from sense metadata", () => {
it("shows trigger schedule from sense metadata", () => {
const output = formatSenseList(SAMPLE_SENSES);
expect(output).toContain("triggers:");
expect(output).toContain("trigger schedule:");
expect(output).toContain("every 30s");
expect(output).toContain("(none)");
});
@@ -230,7 +174,6 @@ senses:
disk-usage:
group: system
throttle: 30s
reflexes: []
`.trim(),
);
const result = sensesFromConfig(path);
@@ -255,7 +198,6 @@ reflexes: []
senses:
my-sense:
group: default
reflexes: []
`.trim(),
);
const result = sensesFromConfig(path);
@@ -272,13 +214,33 @@ senses:
group: default
throttle: 10s
timeout: 5s
reflexes: []
`.trim(),
);
const result = sensesFromConfig(path);
expect(result[0].throttle).toBe(10000);
expect(result[0].timeout).toBe(5000);
});
it("uses inline interval and on for trigger schedule labels", () => {
const path = join(tmpDir, "nerve.yaml");
writeFileSync(
path,
`
senses:
downstream:
group: default
interval: 15s
on: [upstream]
upstream:
group: default
`.trim(),
);
const result = sensesFromConfig(path);
const downstream = result.find((s) => s.name === "downstream");
const upstream = result.find((s) => s.name === "upstream");
expect(downstream?.triggers).toEqual(["every 15s · on: upstream"]);
expect(upstream?.triggers).toEqual([]);
});
});
// ---------------------------------------------------------------------------
-4
View File
@@ -14,10 +14,6 @@ senses:
throttle: 5s
timeout: 10s
grace_period: null
reflexes:
- kind: sense
sense: cpu-usage
interval: 10s
`;
+5 -3
View File
@@ -6,7 +6,7 @@ import {
type SenseInfo,
isPlainRecord,
parseNerveConfig,
senseTriggerLabels,
senseTriggerLabelsWithFallback,
} from "@uncaged/nerve-core";
import { defineCommand } from "citty";
@@ -49,7 +49,9 @@ export function formatSenseList(senses: SenseInfo[]): string {
lines.push(` group: ${s.group}\n`);
lines.push(` throttle: ${formatDuration(s.throttle)}\n`);
lines.push(` timeout: ${formatDuration(s.timeout)}\n`);
lines.push(` triggers: ${s.triggers.length > 0 ? s.triggers.join("; ") : "(none)"}\n`);
lines.push(
` trigger schedule: ${s.triggers.length > 0 ? s.triggers.join("; ") : "(none)"}\n`,
);
const lastSignal =
s.lastSignalTimestamp !== null ? new Date(s.lastSignalTimestamp).toISOString() : "(never)";
lines.push(` last signal: ${lastSignal}\n`);
@@ -73,7 +75,7 @@ export function sensesFromConfig(configPath: string): SenseInfo[] {
group: cfg.group,
throttle: cfg.throttle,
timeout: cfg.timeout,
triggers: senseTriggerLabels(name, reflexes),
triggers: senseTriggerLabelsWithFallback(name, senses, reflexes),
lastSignalTimestamp: null,
}));
}
+2 -2
View File
@@ -30,11 +30,11 @@ export const validateCommand = defineCommand({
const config = result.value;
const senseCount = Object.keys(config.senses).length;
const reflexCount = config.reflexes.length;
const triggerScheduleCount = config.reflexes.length;
const workflowCount = Object.keys(config.workflows).length;
process.stdout.write(
`✅ nerve.yaml is valid — ${senseCount} sense(s), ${reflexCount} reflex(es), ${workflowCount} workflow(s)\n`,
`✅ nerve.yaml is valid — ${senseCount} sense(s), ${triggerScheduleCount} sense trigger schedule(s), ${workflowCount} workflow(s)\n`,
);
},
});
+70 -3
View File
@@ -1,7 +1,11 @@
import { describe, expect, it } from "vitest";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import { parseNerveConfig } from "../parse-nerve-config.js";
function testConsole(): { warn: (...args: unknown[]) => void } {
return (globalThis as unknown as { console: { warn: (...args: unknown[]) => void } }).console;
}
const VALID_CONFIG = `
senses:
cpu:
@@ -27,11 +31,21 @@ workflows:
`;
describe("parseNerveConfig", () => {
beforeEach(() => {
vi.spyOn(testConsole(), "warn").mockImplementation(() => {});
});
afterEach(() => {
vi.restoreAllMocks();
});
describe("valid configs", () => {
it("parses a full valid config", () => {
const result = parseNerveConfig(VALID_CONFIG);
expect(result.ok).toBe(true);
if (!result.ok) return;
expect(testConsole().warn).toHaveBeenCalledWith(
expect.stringMatching(/top-level `reflexes`.*deprecated/i),
);
expect(result.value.senses.cpu).toEqual({
group: "system",
@@ -39,6 +53,8 @@ describe("parseNerveConfig", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: 30_000,
on: [],
});
expect(result.value.senses.memory).toEqual({
group: "system",
@@ -46,6 +62,8 @@ describe("parseNerveConfig", () => {
timeout: 10_000,
gracePeriod: 3000,
retention: 10_000,
interval: null,
on: ["high_usage"],
});
expect(result.value.reflexes).toHaveLength(2);
expect(result.value.reflexes[0]).toEqual({
@@ -79,6 +97,7 @@ reflexes: []
expect(result.ok).toBe(true);
if (!result.ok) return;
expect(result.value.reflexes).toEqual([]);
expect(testConsole().warn).not.toHaveBeenCalled();
});
it("parses config without workflows section", () => {
@@ -112,6 +131,8 @@ reflexes: []
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
});
});
@@ -456,16 +477,62 @@ reflexes: []
expect(result.error.message).toMatch(/senses/);
});
it("returns error when reflexes is missing", () => {
it("accepts config without reflexes key (inline triggers only)", () => {
const yaml = `
senses:
cpu:
group: system
interval: 5s
on:
- memory
memory:
group: system
`;
const result = parseNerveConfig(yaml);
expect(result.ok).toBe(true);
if (!result.ok) return;
expect(result.value.senses.cpu.interval).toBe(5000);
expect(result.value.senses.cpu.on).toEqual(["memory"]);
expect(result.value.reflexes).toEqual([
{ kind: "sense", sense: "cpu", interval: 5000, on: ["memory"] },
]);
});
it("returns error when legacy reflex interval conflicts with sense interval", () => {
const yaml = `
senses:
cpu:
group: system
interval: 10s
reflexes:
- sense: cpu
interval: 5s
`;
const result = parseNerveConfig(yaml);
expect(result.ok).toBe(false);
if (result.ok) return;
expect(result.error.message).toMatch(/reflexes/);
expect(result.error.message).toMatch(/conflicting interval/);
});
it("merges legacy reflex on[] into sense on and dedupes", () => {
const yaml = `
senses:
cpu:
group: system
on:
- a
reflexes:
- sense: cpu
interval: 1s
on:
- a
- b
`;
const result = parseNerveConfig(yaml);
expect(result.ok).toBe(true);
if (!result.ok) return;
expect(result.value.senses.cpu.on).toEqual(["a", "b"]);
expect(result.value.senses.cpu.interval).toBe(1000);
});
it("returns error for invalid group name", () => {
+4
View File
@@ -8,6 +8,10 @@ export type SenseConfig = {
gracePeriod: number | null;
/** Max rows to retain in `_signals`; older rows are pruned periodically after inserts. */
retention: number;
/** Polling interval (ms). When set, the sense is triggered periodically. Replaces reflex interval. */
interval: number | null;
/** Other sense names whose signals trigger this sense. Replaces reflex `on` field. */
on: string[];
};
export type SenseReflexConfig = {
+5 -1
View File
@@ -10,7 +10,11 @@ export type {
NerveConfig,
} from "./config.js";
export type { Signal, SenseInfo, SenseResult } from "./sense.js";
export { labelSenseReflexTrigger, senseTriggerLabels } from "./sense-trigger-labels.js";
export {
labelSenseReflexTrigger,
senseTriggerLabels,
senseTriggerLabelsWithFallback,
} from "./sense-trigger-labels.js";
export type {
WorkflowMessage,
RoleResult,
+118 -22
View File
@@ -88,15 +88,127 @@ function validateSenseConfig(name: string, raw: unknown): Result<SenseConfig> {
const retentionResult = parseRetentionField(name, obj.retention);
if (!retentionResult.ok) return retentionResult;
const intervalResult = parseDurationField(obj.interval, `senses.${name}.interval`);
if (!intervalResult.ok) return intervalResult;
let on: string[] = [];
if (obj.on !== undefined && obj.on !== null) {
if (
!Array.isArray(obj.on) ||
!obj.on.every((item: unknown): item is string => typeof item === "string")
) {
return err(new Error(`senses.${name}.on: must be an array of strings`));
}
on = obj.on;
}
return ok({
group: obj.group,
throttle: throttleResult.value,
timeout: timeoutResult.value,
gracePeriod: graceResult.value,
retention: retentionResult.value,
interval: intervalResult.value,
on,
});
}
const LEGACY_REFLEXES_DEPRECATION =
"[nerve] Deprecated: top-level `reflexes` in nerve.yaml is deprecated. Move each reflex's `interval` and `on` fields onto the corresponding sense under `senses.<name>`.";
/** Uses `globalThis` so declaration emit works without DOM / Node `lib` typings for `console`. */
function warnToConsole(message: string): void {
const c = (globalThis as unknown as { console: { warn: (m: string) => void } }).console;
c.warn(message);
}
function mergeOnLists(existing: readonly string[], extra: readonly string[]): string[] {
const seen = new Set(existing);
const out = [...existing];
for (const name of extra) {
if (seen.has(name)) continue;
seen.add(name);
out.push(name);
}
return out;
}
function mergeSenseIntervalFromLegacyReflex(
senseName: string,
senseInterval: number | null,
reflexInterval: number | null,
reflexIndex: number,
): Result<number | null> {
if (reflexInterval === null) return ok(senseInterval);
if (senseInterval === null) return ok(reflexInterval);
if (senseInterval === reflexInterval) return ok(senseInterval);
return err(
new Error(
`reflexes[${reflexIndex}].sense "${senseName}": conflicting interval (sense has ${senseInterval} ms, reflex has ${reflexInterval} ms); use a single interval source`,
),
);
}
function buildReflexesFromSenses(senses: Record<string, SenseConfig>): ReflexConfig[] {
const reflexes: ReflexConfig[] = [];
for (const [senseName, sc] of Object.entries(senses)) {
if (sc.interval !== null || sc.on.length > 0) {
reflexes.push({
kind: "sense" as const,
sense: senseName,
interval: sc.interval,
on: sc.on,
});
}
}
return reflexes;
}
/**
* If `reflexes` is present and non-empty, parse legacy entries and merge `interval` / `on`
* into the corresponding sense. Emits a deprecation warning once when any legacy row exists.
*/
function mergeLegacyReflexesIntoSenses(
obj: Record<string, unknown>,
senses: Record<string, SenseConfig>,
senseNames: Set<string>,
): Result<Record<string, SenseConfig>> {
if (!Object.hasOwn(obj, "reflexes") || obj.reflexes === undefined || obj.reflexes === null) {
return ok(senses);
}
if (!Array.isArray(obj.reflexes)) {
return err(new Error("reflexes: must be an array if provided"));
}
if (obj.reflexes.length > 0) {
warnToConsole(LEGACY_REFLEXES_DEPRECATION);
}
let merged: Record<string, SenseConfig> = senses;
for (let i = 0; i < obj.reflexes.length; i++) {
const reflexResult = validateReflexConfig(i, obj.reflexes[i], senseNames);
if (!reflexResult.ok) return reflexResult;
const ref = reflexResult.value;
const senseName = ref.sense;
const sense = merged[senseName];
const intervalResult = mergeSenseIntervalFromLegacyReflex(
senseName,
sense.interval,
ref.interval,
i,
);
if (!intervalResult.ok) return intervalResult;
const next: SenseConfig = {
...sense,
interval: intervalResult.value,
on: mergeOnLists(sense.on, ref.on),
};
merged = { ...merged, [senseName]: next };
}
return ok(merged);
}
function parseOnField(index: number, obj: Record<string, unknown>): Result<string[]> {
if (obj.on === undefined || obj.on === null) return ok([]);
if (!Array.isArray(obj.on) || !obj.on.every((item): item is string => typeof item === "string")) {
@@ -248,24 +360,6 @@ function parseSenses(
return ok({ senses, senseNames });
}
function parseReflexes(
obj: Record<string, unknown>,
senseNames: Set<string>,
): Result<ReflexConfig[]> {
if (!Array.isArray(obj.reflexes)) {
return err(new Error("reflexes: required array"));
}
const reflexes: ReflexConfig[] = [];
for (let i = 0; i < obj.reflexes.length; i++) {
const result = validateReflexConfig(i, obj.reflexes[i], senseNames);
if (!result.ok) return result;
reflexes.push(result.value);
}
return ok(reflexes);
}
const DEFAULT_API_BIND_HOST = "127.0.0.1";
/** Hosts that may bind the HTTP API without `api.token` (loopback-only). */
@@ -373,8 +467,10 @@ export function parseNerveConfig(raw: string): Result<NerveConfig> {
if (!sensesResult.ok) return sensesResult;
const { senses, senseNames } = sensesResult.value;
const reflexesResult = parseReflexes(obj, senseNames);
if (!reflexesResult.ok) return reflexesResult;
const mergedSensesResult = mergeLegacyReflexesIntoSenses(obj, senses, senseNames);
if (!mergedSensesResult.ok) return mergedSensesResult;
const mergedSenses = mergedSensesResult.value;
const reflexes = buildReflexesFromSenses(mergedSenses);
const workflowsResult = parseWorkflows(obj);
if (!workflowsResult.ok) return workflowsResult;
@@ -387,8 +483,8 @@ export function parseNerveConfig(raw: string): Result<NerveConfig> {
return ok({
maxRounds: maxRoundsResult.value,
senses,
reflexes: reflexesResult.value,
senses: mergedSenses,
reflexes,
workflows: workflowsResult.value,
api: apiResult.value,
});
+24 -1
View File
@@ -1,4 +1,4 @@
import type { ReflexConfig } from "./config.js";
import type { ReflexConfig, SenseConfig } from "./config.js";
function formatIntervalMs(ms: number): string {
const totalSeconds = Math.floor(ms / 1000);
@@ -34,3 +34,26 @@ export function senseTriggerLabels(senseName: string, reflexes: readonly ReflexC
}
return out;
}
/**
* Human-readable trigger labels for a sense: prefers inline `interval` / `on` on
* `SenseConfig`, otherwise falls back to legacy `config.reflexes` rows.
*/
export function senseTriggerLabelsWithFallback(
senseName: string,
senses: Record<string, SenseConfig>,
reflexes: readonly ReflexConfig[],
): string[] {
const sc = senses[senseName];
if (sc !== undefined && (sc.interval !== null || sc.on.length > 0)) {
return [
labelSenseReflexTrigger({
kind: "sense",
sense: senseName,
interval: sc.interval,
on: sc.on,
}),
];
}
return senseTriggerLabels(senseName, reflexes);
}
@@ -31,6 +31,8 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -87,6 +89,8 @@ describe("kernel integration — real child processes", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
"disk-io": {
group: "system",
@@ -94,6 +98,8 @@ describe("kernel integration — real child processes", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
"net-rx": {
group: "network",
@@ -101,6 +107,8 @@ describe("kernel integration — real child processes", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
});
@@ -150,6 +158,8 @@ describe("kernel integration — real child processes", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
"net-rx": {
group: "network",
@@ -157,6 +167,8 @@ describe("kernel integration — real child processes", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
});
@@ -79,6 +79,8 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -116,6 +118,8 @@ describe("kernel — getHealth", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
"disk-usage": {
group: "system",
@@ -123,6 +127,8 @@ describe("kernel — getHealth", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
"net-rx": {
group: "network",
@@ -130,6 +136,8 @@ describe("kernel — getHealth", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
});
@@ -222,6 +230,8 @@ describe("kernel — reloadConfig", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
"net-rx": {
group: "network",
@@ -229,6 +239,8 @@ describe("kernel — reloadConfig", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -252,6 +264,8 @@ describe("kernel — reloadConfig", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
"net-rx": {
group: "network",
@@ -259,6 +273,8 @@ describe("kernel — reloadConfig", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -281,6 +297,8 @@ describe("kernel — reloadConfig", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -310,6 +328,8 @@ describe("kernel — reloadConfig", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
"disk-usage": {
group: "system",
@@ -317,6 +337,8 @@ describe("kernel — reloadConfig", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -98,6 +98,8 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -147,6 +149,8 @@ describe("kernel.triggerSense()", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
"net-io": {
group: "network",
@@ -154,6 +158,8 @@ describe("kernel.triggerSense()", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -189,6 +195,8 @@ describe("kernel.triggerSense()", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
"disk-usage": {
group: "system",
@@ -196,6 +204,8 @@ describe("kernel.triggerSense()", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -101,6 +101,8 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -141,6 +143,8 @@ describe("kernel + workflowManager integration", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -191,6 +195,8 @@ describe("kernel + workflowManager integration", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -246,6 +252,8 @@ describe("kernel + workflowManager integration", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
"disk-io": {
group: "system",
@@ -253,6 +261,8 @@ describe("kernel + workflowManager integration", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -302,6 +312,8 @@ describe("kernel + workflowManager integration", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -345,6 +357,8 @@ describe("kernel + workflowManager integration", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -366,6 +380,8 @@ describe("kernel + workflowManager integration", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -412,6 +428,8 @@ describe("kernel + workflowManager integration", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -432,6 +450,8 @@ describe("kernel + workflowManager integration", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -484,6 +504,8 @@ describe("kernel + workflowManager integration", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -539,6 +561,8 @@ describe("kernel + workflowManager integration", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -62,6 +62,8 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -99,6 +101,8 @@ describe("kernel — message routing", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -127,6 +131,8 @@ describe("kernel — message routing", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -154,6 +160,8 @@ describe("kernel — message routing", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -181,6 +189,8 @@ describe("kernel — message routing", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -207,6 +217,8 @@ describe("kernel — message routing", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -246,6 +258,8 @@ describe("kernel — groupForSense mapping", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
"disk-usage": {
group: "system",
@@ -253,6 +267,8 @@ describe("kernel — groupForSense mapping", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
"net-usage": {
group: "network",
@@ -260,6 +276,8 @@ describe("kernel — groupForSense mapping", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -283,6 +301,8 @@ describe("kernel — groupForSense mapping", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 500, on: [] }],
@@ -32,6 +32,8 @@ describe("LogStore + ReflexScheduler integration", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
@@ -67,6 +69,8 @@ describe("LogStore + ReflexScheduler integration", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 1000, on: [] }],
@@ -105,6 +109,8 @@ describe("LogStore + ReflexScheduler integration", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
@@ -28,6 +28,8 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -159,6 +161,8 @@ describe("phase6 — reloadConfig", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
"net-rx": {
group: "network",
@@ -166,6 +170,8 @@ describe("phase6 — reloadConfig", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -192,6 +198,8 @@ describe("phase6 — reloadConfig", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
"net-rx": {
group: "network",
@@ -199,6 +207,8 @@ describe("phase6 — reloadConfig", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -221,6 +231,8 @@ describe("phase6 — reloadConfig", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -264,6 +276,8 @@ describe("phase6 — error isolation", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
"bad-sense": {
group: "mixed",
@@ -271,6 +285,8 @@ describe("phase6 — error isolation", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -357,6 +373,8 @@ describe("phase6 — getHealth", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
"disk-usage": {
group: "system",
@@ -364,6 +382,8 @@ describe("phase6 — getHealth", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
"net-rx": {
group: "network",
@@ -371,6 +391,8 @@ describe("phase6 — getHealth", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
});
@@ -405,6 +427,8 @@ describe("phase6 — getHealth", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
"net-rx": {
group: "network",
@@ -412,6 +436,8 @@ describe("phase6 — getHealth", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -13,6 +13,8 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -46,9 +48,10 @@ describe("ReflexScheduler — throttle + pending deferred trigger", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: ["cpu-usage"],
},
},
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
@@ -81,9 +84,10 @@ describe("ReflexScheduler — throttle + pending deferred trigger", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: ["cpu-usage"],
},
},
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
@@ -120,9 +124,10 @@ describe("ReflexScheduler — throttle + pending deferred trigger", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: ["cpu-usage"],
},
},
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
@@ -17,6 +17,8 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
"disk-usage": {
group: "system",
@@ -24,6 +26,8 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
"system-health": {
group: "derived",
@@ -31,6 +35,8 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [],
@@ -59,8 +65,12 @@ describe("ReflexScheduler — interval reflex", () => {
it("fires triggerFn on schedule", () => {
const triggered: string[] = [];
const base = makeConfig();
const config = makeConfig({
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 1000, on: [] }],
senses: {
...base.senses,
"cpu-usage": { ...base.senses["cpu-usage"], interval: 1000, on: [] },
},
});
const bus = createSignalBus();
// Use a ref so the triggerFn can call back into the scheduler
@@ -84,8 +94,12 @@ describe("ReflexScheduler — interval reflex", () => {
it("stops firing after stop() is called", () => {
const triggered: string[] = [];
const base = makeConfig();
const config = makeConfig({
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 500, on: [] }],
senses: {
...base.senses,
"cpu-usage": { ...base.senses["cpu-usage"], interval: 500, on: [] },
},
});
const bus = createSignalBus();
const ref: { scheduler: ReturnType<typeof createReflexScheduler> | null } = {
@@ -107,8 +121,12 @@ describe("ReflexScheduler — interval reflex", () => {
it("starts from current time — does not compensate for past intervals", () => {
const triggered: string[] = [];
const base = makeConfig();
const config = makeConfig({
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 1000, on: [] }],
senses: {
...base.senses,
"cpu-usage": { ...base.senses["cpu-usage"], interval: 1000, on: [] },
},
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
@@ -119,15 +137,42 @@ describe("ReflexScheduler — interval reflex", () => {
scheduler.stop();
});
it("falls back to config.reflexes when sense has no inline interval or on", () => {
const triggered: string[] = [];
const config = makeConfig({
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 1000, on: [] }],
});
const bus = createSignalBus();
const ref: { scheduler: ReturnType<typeof createReflexScheduler> | null } = {
scheduler: null,
};
const scheduler = createReflexScheduler(config, bus, (name) => {
triggered.push(name);
ref.scheduler?.onComputeComplete(name);
});
ref.scheduler = scheduler;
vi.advanceTimersByTime(3000);
expect(triggered.length).toBeGreaterThanOrEqual(3);
expect(triggered.every((n) => n === "cpu-usage")).toBe(true);
scheduler.stop();
});
});
describe("ReflexScheduler — event (on) reflex", () => {
it("triggers target sense when watched sense emits a signal", () => {
const triggered: string[] = [];
const base = makeConfig();
const config = makeConfig({
reflexes: [
{ kind: "sense", sense: "system-health", interval: null, on: ["cpu-usage", "disk-usage"] },
],
senses: {
...base.senses,
"system-health": {
...base.senses["system-health"],
interval: null,
on: ["cpu-usage", "disk-usage"],
},
},
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
@@ -145,8 +190,12 @@ describe("ReflexScheduler — event (on) reflex", () => {
it("does not trigger for signals from non-watched senses", () => {
const triggered: string[] = [];
const base = makeConfig();
const config = makeConfig({
reflexes: [{ kind: "sense", sense: "system-health", interval: null, on: ["cpu-usage"] }],
senses: {
...base.senses,
"system-health": { ...base.senses["system-health"], interval: null, on: ["cpu-usage"] },
},
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
@@ -160,8 +209,12 @@ describe("ReflexScheduler — event (on) reflex", () => {
it("stops responding after stop()", () => {
const triggered: string[] = [];
const base = makeConfig();
const config = makeConfig({
reflexes: [{ kind: "sense", sense: "system-health", interval: null, on: ["cpu-usage"] }],
senses: {
...base.senses,
"system-health": { ...base.senses["system-health"], interval: null, on: ["cpu-usage"] },
},
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
@@ -191,9 +244,10 @@ describe("ReflexScheduler — throttle", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: ["cpu-usage"],
},
},
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
@@ -219,9 +273,10 @@ describe("ReflexScheduler — throttle", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: ["cpu-usage"],
},
},
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
@@ -242,8 +297,12 @@ describe("ReflexScheduler — throttle", () => {
describe("ReflexScheduler — merge/coalesce", () => {
it("concurrent triggers collapse to at most one pending run", () => {
const triggered: string[] = [];
const base = makeConfig();
const config = makeConfig({
reflexes: [{ kind: "sense", sense: "system-health", interval: null, on: ["cpu-usage"] }],
senses: {
...base.senses,
"system-health": { ...base.senses["system-health"], interval: null, on: ["cpu-usage"] },
},
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
@@ -271,8 +330,12 @@ describe("ReflexScheduler — merge/coalesce", () => {
it("no new trigger while in-flight without pending → no extra run after complete", () => {
const triggered: string[] = [];
const base = makeConfig();
const config = makeConfig({
reflexes: [{ kind: "sense", sense: "system-health", interval: null, on: ["cpu-usage"] }],
senses: {
...base.senses,
"system-health": { ...base.senses["system-health"], interval: null, on: ["cpu-usage"] },
},
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
@@ -298,8 +361,16 @@ describe("ReflexScheduler — interval + on combined", () => {
it("fires both via interval and event", () => {
const triggered: string[] = [];
const base = makeConfig();
const config = makeConfig({
reflexes: [{ kind: "sense", sense: "system-health", interval: 1000, on: ["cpu-usage"] }],
senses: {
...base.senses,
"system-health": {
...base.senses["system-health"],
interval: 1000,
on: ["cpu-usage"],
},
},
});
const bus = createSignalBus();
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
@@ -330,6 +401,8 @@ describe("ReflexScheduler — workflow reflexes ignored", () => {
timeout: null,
gracePeriod: null,
retention: 10_000,
interval: null,
on: [],
},
},
reflexes: [{ kind: "workflow", workflow: "my-workflow", on: ["cpu-usage"] } as any],
+2 -2
View File
@@ -13,7 +13,7 @@ import {
type NerveConfig,
type SenseInfo,
type Signal,
senseTriggerLabels,
senseTriggerLabelsWithFallback,
} from "@uncaged/nerve-core";
import { routeSenseComputeOutput } from "@uncaged/nerve-core";
@@ -377,7 +377,7 @@ export function createKernel(
group: senseConfig.group,
throttle: senseConfig.throttle,
timeout: senseConfig.timeout,
triggers: senseTriggerLabels(name, config.reflexes),
triggers: senseTriggerLabelsWithFallback(name, config.senses, config.reflexes),
lastSignalTimestamp: lastEntry !== null ? lastEntry.timestamp : null,
};
});
+59 -11
View File
@@ -1,5 +1,7 @@
/**
* Reflex Scheduler — drives sense compute cycles based on ReflexConfig.
* Reflex Scheduler — drives sense compute cycles from each sense's `interval` / `on`
* (inline triggers). Falls back to `config.reflexes` when a sense has no schedule there
* but legacy reflex rows still define one.
*
* Supports:
* - interval: periodic setInterval-based triggering
@@ -9,7 +11,7 @@
* run it once after the current compute completes (no unbounded queue)
*/
import type { NerveConfig } from "@uncaged/nerve-core";
import type { NerveConfig, ReflexConfig, SenseConfig } from "@uncaged/nerve-core";
import type { LogStore } from "@uncaged/nerve-store";
import type { SignalBus, Unsubscribe } from "./signal-bus.js";
@@ -39,10 +41,58 @@ export type ReflexSchedulerOptions = {
logStore?: LogStore;
};
type ScheduleSlice = {
interval: number | null;
on: string[];
};
function mergeOnUnique(a: readonly string[], b: readonly string[]): string[] {
const seen = new Set<string>();
const out: string[] = [];
for (const item of [...a, ...b]) {
if (seen.has(item)) continue;
seen.add(item);
out.push(item);
}
return out;
}
/** Aggregate legacy `reflexes[]` rows for one sense (same sense name, kind sense only). */
function scheduleFromReflexesOnly(
senseName: string,
reflexes: readonly ReflexConfig[],
): ScheduleSlice {
let interval: number | null = null;
let on: string[] = [];
for (const ref of reflexes) {
if (ref.kind !== "sense" || ref.sense !== senseName) continue;
if (ref.interval !== null) {
interval = interval === null ? ref.interval : interval;
}
on = mergeOnUnique(on, ref.on);
}
return { interval, on };
}
/**
* Effective polling / signal subscriptions for one sense: prefer fields on `SenseConfig`,
* else legacy-only reflex rows targeting that sense.
*/
function effectiveSenseSchedule(
senseName: string,
sense: SenseConfig,
reflexes: readonly ReflexConfig[],
): ScheduleSlice {
if (sense.interval !== null || sense.on.length > 0) {
return { interval: sense.interval, on: sense.on };
}
return scheduleFromReflexesOnly(senseName, reflexes);
}
/**
* Create and start a reflex scheduler.
*
* @param config Full NerveConfig (reads senses for throttle/timeout, reflexes for schedule).
* @param config Full NerveConfig (senses for schedule + throttle; reflexes as fallback).
* @param bus SignalBus to subscribe for event-driven reflexes.
* @param triggerFn Called with the sense name when a compute should be dispatched.
* @param opts Optional: logStore for structured logging.
@@ -152,20 +202,18 @@ export function createReflexScheduler(
dispatchCompute(senseName);
}
for (const reflex of config.reflexes) {
if (reflex.kind !== "sense") continue;
const senseReflex = reflex;
const senseName = senseReflex.sense;
for (const [senseName, sense] of Object.entries(config.senses)) {
const { interval, on } = effectiveSenseSchedule(senseName, sense, config.reflexes);
if (senseReflex.interval !== null) {
if (interval !== null) {
const id = setInterval(() => {
maybeTrigger(senseName);
}, senseReflex.interval);
}, interval);
intervals.push(id);
}
if (senseReflex.on.length > 0) {
const watchedSenses = new Set(senseReflex.on);
if (on.length > 0) {
const watchedSenses = new Set(on);
const unsub = bus.subscribe((signal) => {
if (watchedSenses.has(signal.senseId)) {
maybeTrigger(senseName);
+1 -1
View File
@@ -188,7 +188,7 @@ async function runCompute(
return;
}
clearGracePeriodTimer(senseName);
if (result.value !== null) {
if (result.value != null) {
runtime.persistSignal(result.value);
sendSignal(senseName, result.value);
}