refactor: remove legacy reflexes backward-compat code
BREAKING CHANGE: NerveConfig no longer has 'reflexes' field. SenseReflexConfig/ReflexConfig types removed. Config with top-level 'reflexes' array now errors instead of migrating. Use sense-level 'interval' and 'on' fields instead. - Remove reflexes from NerveConfig type - Remove legacy parsing, deprecation warning, buildReflexesFromSenses - Simplify reflex-scheduler to only read sense-level config - Rename senseTriggerLabelsWithFallback → senseTriggerLabels - Delete all legacy reflexes test cases - -639/+114 lines Fixes #199
This commit is contained in:
+2
-12
@@ -11,30 +11,20 @@ senses:
|
||||
throttle: 5s
|
||||
timeout: 8s
|
||||
grace_period: null
|
||||
interval: 10s
|
||||
|
||||
disk-usage:
|
||||
group: system
|
||||
throttle: null
|
||||
timeout: 15s
|
||||
grace_period: null
|
||||
interval: 30s
|
||||
|
||||
system-health:
|
||||
group: derived
|
||||
throttle: 2s
|
||||
timeout: 10s
|
||||
grace_period: null
|
||||
|
||||
reflexes:
|
||||
# cpu-usage runs on a 10-second interval
|
||||
- sense: cpu-usage
|
||||
interval: 10s
|
||||
|
||||
# disk-usage runs on a 30-second interval
|
||||
- sense: disk-usage
|
||||
interval: 30s
|
||||
|
||||
# system-health is event-driven: fires whenever cpu-usage or disk-usage emits a signal
|
||||
- sense: system-health
|
||||
on:
|
||||
- cpu-usage
|
||||
- disk-usage
|
||||
|
||||
@@ -11,9 +11,6 @@
|
||||
* group: internal
|
||||
* throttle: 30s
|
||||
* timeout: 5s
|
||||
*
|
||||
* reflexes:
|
||||
* - sense: nerve-health
|
||||
* interval: 30s
|
||||
*/
|
||||
|
||||
|
||||
@@ -187,7 +187,6 @@ function defaultTestConfig(withNoopWorkflow: boolean): NerveConfig {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: {
|
||||
echo: { concurrency: 1, overflow: "queue" as const, maxQueue: 10 },
|
||||
...(withNoopWorkflow ? { noop: { concurrency: 1, overflow: "drop" as const } } : {}),
|
||||
|
||||
@@ -6,7 +6,7 @@ import {
|
||||
type SenseInfo,
|
||||
isPlainRecord,
|
||||
parseNerveConfig,
|
||||
senseTriggerLabelsWithFallback,
|
||||
senseTriggerLabels,
|
||||
} from "@uncaged/nerve-core";
|
||||
import { defineCommand } from "citty";
|
||||
|
||||
@@ -69,13 +69,13 @@ export function sensesFromConfig(configPath: string): SenseInfo[] {
|
||||
}
|
||||
const result = parseNerveConfig(raw);
|
||||
if (!result.ok) return [];
|
||||
const { senses, reflexes } = result.value;
|
||||
const { senses } = result.value;
|
||||
return Object.entries(senses).map(([name, cfg]) => ({
|
||||
name,
|
||||
group: cfg.group,
|
||||
throttle: cfg.throttle,
|
||||
timeout: cfg.timeout,
|
||||
triggers: senseTriggerLabelsWithFallback(name, senses, reflexes),
|
||||
triggers: senseTriggerLabels(name, senses),
|
||||
lastSignalTimestamp: null,
|
||||
}));
|
||||
}
|
||||
|
||||
@@ -30,7 +30,9 @@ export const validateCommand = defineCommand({
|
||||
|
||||
const config = result.value;
|
||||
const senseCount = Object.keys(config.senses).length;
|
||||
const triggerScheduleCount = config.reflexes.length;
|
||||
const triggerScheduleCount = Object.values(config.senses).filter(
|
||||
(s) => s.interval !== null || s.on.length > 0,
|
||||
).length;
|
||||
const workflowCount = Object.keys(config.workflows).length;
|
||||
|
||||
process.stdout.write(
|
||||
|
||||
@@ -1,25 +1,17 @@
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { describe, expect, it } from "vitest";
|
||||
|
||||
import { parseNerveConfig } from "../parse-nerve-config.js";
|
||||
|
||||
function testConsole(): { warn: (...args: unknown[]) => void } {
|
||||
return (globalThis as unknown as { console: { warn: (...args: unknown[]) => void } }).console;
|
||||
}
|
||||
|
||||
const VALID_CONFIG = `
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
throttle: 5s
|
||||
interval: 30s
|
||||
memory:
|
||||
group: system
|
||||
timeout: 10s
|
||||
grace_period: 3s
|
||||
|
||||
reflexes:
|
||||
- sense: cpu
|
||||
interval: 30s
|
||||
- sense: memory
|
||||
on:
|
||||
- high_usage
|
||||
|
||||
@@ -31,21 +23,11 @@ workflows:
|
||||
`;
|
||||
|
||||
describe("parseNerveConfig", () => {
|
||||
beforeEach(() => {
|
||||
vi.spyOn(testConsole(), "warn").mockImplementation(() => {});
|
||||
});
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks();
|
||||
});
|
||||
|
||||
describe("valid configs", () => {
|
||||
it("parses a full valid config", () => {
|
||||
const result = parseNerveConfig(VALID_CONFIG);
|
||||
expect(result.ok).toBe(true);
|
||||
if (!result.ok) return;
|
||||
expect(testConsole().warn).toHaveBeenCalledWith(
|
||||
expect.stringMatching(/top-level `reflexes`.*deprecated/i),
|
||||
);
|
||||
|
||||
expect(result.value.senses.cpu).toEqual({
|
||||
group: "system",
|
||||
@@ -65,19 +47,6 @@ describe("parseNerveConfig", () => {
|
||||
interval: null,
|
||||
on: ["high_usage"],
|
||||
});
|
||||
expect(result.value.reflexes).toHaveLength(2);
|
||||
expect(result.value.reflexes[0]).toEqual({
|
||||
kind: "sense",
|
||||
sense: "cpu",
|
||||
interval: 30_000,
|
||||
on: [],
|
||||
});
|
||||
expect(result.value.reflexes[1]).toEqual({
|
||||
kind: "sense",
|
||||
sense: "memory",
|
||||
interval: null,
|
||||
on: ["high_usage"],
|
||||
});
|
||||
expect(result.value.workflows.alert).toEqual({
|
||||
concurrency: 2,
|
||||
overflow: "queue",
|
||||
@@ -86,27 +55,11 @@ describe("parseNerveConfig", () => {
|
||||
expect(result.value.api).toEqual({ port: null, token: null, host: "127.0.0.1" });
|
||||
});
|
||||
|
||||
it("parses config with empty reflexes array", () => {
|
||||
const yaml = `
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
reflexes: []
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(true);
|
||||
if (!result.ok) return;
|
||||
expect(result.value.reflexes).toEqual([]);
|
||||
expect(testConsole().warn).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("parses config without workflows section", () => {
|
||||
const yaml = `
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
reflexes:
|
||||
- sense: cpu
|
||||
interval: 1s
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
@@ -120,7 +73,6 @@ reflexes:
|
||||
senses:
|
||||
net:
|
||||
group: network
|
||||
reflexes: []
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(true);
|
||||
@@ -142,7 +94,6 @@ senses:
|
||||
cpu:
|
||||
group: system
|
||||
retention: 5000
|
||||
reflexes: []
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(true);
|
||||
@@ -162,7 +113,6 @@ senses:
|
||||
c:
|
||||
group: g
|
||||
timeout: 1h
|
||||
reflexes: []
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(true);
|
||||
@@ -173,7 +123,6 @@ reflexes: []
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
reflexes: []
|
||||
workflows:
|
||||
alert:
|
||||
concurrency: 1
|
||||
@@ -194,7 +143,6 @@ workflows:
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
reflexes: []
|
||||
workflows:
|
||||
alert:
|
||||
concurrency: 1
|
||||
@@ -215,7 +163,6 @@ workflows:
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
reflexes: []
|
||||
api:
|
||||
port: 9800
|
||||
`;
|
||||
@@ -230,7 +177,6 @@ api:
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
reflexes: []
|
||||
api:
|
||||
port: 9800
|
||||
host: localhost
|
||||
@@ -246,7 +192,6 @@ api:
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
reflexes: []
|
||||
api:
|
||||
port: 9800
|
||||
token: "secret"
|
||||
@@ -257,15 +202,60 @@ api:
|
||||
if (!result.ok) return;
|
||||
expect(result.value.api).toEqual({ port: 9800, token: "secret", host: "0.0.0.0" });
|
||||
});
|
||||
|
||||
it("accepts inline interval and on only (no reflexes key)", () => {
|
||||
const yaml = `
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
interval: 5s
|
||||
on:
|
||||
- memory
|
||||
memory:
|
||||
group: system
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(true);
|
||||
if (!result.ok) return;
|
||||
expect(result.value.senses.cpu.interval).toBe(5000);
|
||||
expect(result.value.senses.cpu.on).toEqual(["memory"]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("invalid configs", () => {
|
||||
it("returns error when api.token is empty string", () => {
|
||||
it("returns error when top-level reflexes is present", () => {
|
||||
const yaml = `
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
reflexes: []
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(false);
|
||||
if (result.ok) return;
|
||||
expect(result.error.message).toMatch(/reflexes: top-level key is no longer supported/);
|
||||
});
|
||||
|
||||
it("returns error when reflexes contains legacy rows", () => {
|
||||
const yaml = `
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
reflexes:
|
||||
- sense: cpu
|
||||
interval: 1s
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(false);
|
||||
if (result.ok) return;
|
||||
expect(result.error.message).toMatch(/reflexes: top-level key is no longer supported/);
|
||||
});
|
||||
|
||||
it("returns error when api.token is empty string", () => {
|
||||
const yaml = `
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
api:
|
||||
port: 9800
|
||||
token: ""
|
||||
@@ -281,7 +271,6 @@ api:
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
reflexes: []
|
||||
api:
|
||||
port: 9800
|
||||
host: ""
|
||||
@@ -297,7 +286,6 @@ api:
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
reflexes: []
|
||||
api:
|
||||
port: 9800
|
||||
host: "0.0.0.0"
|
||||
@@ -315,7 +303,6 @@ api:
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
reflexes: []
|
||||
api:
|
||||
port: 99999
|
||||
`;
|
||||
@@ -332,54 +319,12 @@ api:
|
||||
expect(result.error.message).toMatch(/YAML parse error/);
|
||||
});
|
||||
|
||||
it("returns error when reflex references a non-existent sense", () => {
|
||||
const yaml = `
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
reflexes:
|
||||
- sense: disk
|
||||
`;
|
||||
it("returns error when senses is missing", () => {
|
||||
const yaml = "max_rounds: 10";
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(false);
|
||||
if (result.ok) return;
|
||||
expect(result.error.message).toMatch(/disk.*not found in senses/);
|
||||
});
|
||||
|
||||
it("returns error when reflex uses unsupported workflow field", () => {
|
||||
const yaml = `
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
reflexes:
|
||||
- workflow: missing_wf
|
||||
on:
|
||||
- cpu
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(false);
|
||||
if (result.ok) return;
|
||||
expect(result.error.message).toMatch(/workflow.*not supported/);
|
||||
});
|
||||
|
||||
it("returns error when reflex uses unsupported workflow field (with workflows defined)", () => {
|
||||
const yaml = `
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
reflexes:
|
||||
- workflow: unknown
|
||||
on:
|
||||
- cpu
|
||||
workflows:
|
||||
alert:
|
||||
concurrency: 1
|
||||
overflow: drop
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(false);
|
||||
if (result.ok) return;
|
||||
expect(result.error.message).toMatch(/workflow.*not supported/);
|
||||
expect(result.error.message).toMatch(/senses/);
|
||||
});
|
||||
|
||||
it("returns error when retention is zero", () => {
|
||||
@@ -388,7 +333,6 @@ senses:
|
||||
cpu:
|
||||
group: system
|
||||
retention: 0
|
||||
reflexes: []
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(false);
|
||||
@@ -402,7 +346,6 @@ senses:
|
||||
cpu:
|
||||
group: system
|
||||
retention: 1.5
|
||||
reflexes: []
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(false);
|
||||
@@ -416,7 +359,6 @@ senses:
|
||||
cpu:
|
||||
group: system
|
||||
retention: "5000"
|
||||
reflexes: []
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(false);
|
||||
@@ -430,7 +372,6 @@ senses:
|
||||
cpu:
|
||||
group: system
|
||||
throttle: 5sec
|
||||
reflexes: []
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(false);
|
||||
@@ -444,7 +385,6 @@ senses:
|
||||
cpu:
|
||||
group: system
|
||||
timeout: two-minutes
|
||||
reflexes: []
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(false);
|
||||
@@ -452,87 +392,17 @@ reflexes: []
|
||||
expect(result.error.message).toMatch(/timeout.*invalid duration/);
|
||||
});
|
||||
|
||||
it("returns error for invalid interval format", () => {
|
||||
it("returns error for invalid interval format on sense", () => {
|
||||
const yaml = `
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
reflexes:
|
||||
- sense: cpu
|
||||
interval: 30seconds
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(false);
|
||||
if (result.ok) return;
|
||||
expect(result.error.message).toMatch(/interval.*invalid duration/);
|
||||
});
|
||||
|
||||
it("returns error when senses is missing", () => {
|
||||
const yaml = `
|
||||
reflexes: []
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(false);
|
||||
if (result.ok) return;
|
||||
expect(result.error.message).toMatch(/senses/);
|
||||
});
|
||||
|
||||
it("accepts config without reflexes key (inline triggers only)", () => {
|
||||
const yaml = `
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
interval: 5s
|
||||
on:
|
||||
- memory
|
||||
memory:
|
||||
group: system
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(true);
|
||||
if (!result.ok) return;
|
||||
expect(result.value.senses.cpu.interval).toBe(5000);
|
||||
expect(result.value.senses.cpu.on).toEqual(["memory"]);
|
||||
expect(result.value.reflexes).toEqual([
|
||||
{ kind: "sense", sense: "cpu", interval: 5000, on: ["memory"] },
|
||||
]);
|
||||
});
|
||||
|
||||
it("returns error when legacy reflex interval conflicts with sense interval", () => {
|
||||
const yaml = `
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
interval: 10s
|
||||
reflexes:
|
||||
- sense: cpu
|
||||
interval: 5s
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(false);
|
||||
if (result.ok) return;
|
||||
expect(result.error.message).toMatch(/conflicting interval/);
|
||||
});
|
||||
|
||||
it("merges legacy reflex on[] into sense on and dedupes", () => {
|
||||
const yaml = `
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
on:
|
||||
- a
|
||||
reflexes:
|
||||
- sense: cpu
|
||||
interval: 1s
|
||||
on:
|
||||
- a
|
||||
- b
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(true);
|
||||
if (!result.ok) return;
|
||||
expect(result.value.senses.cpu.on).toEqual(["a", "b"]);
|
||||
expect(result.value.senses.cpu.interval).toBe(1000);
|
||||
expect(result.error.message).toMatch(/senses\.cpu\.interval.*invalid duration/);
|
||||
});
|
||||
|
||||
it("returns error for invalid group name", () => {
|
||||
@@ -540,7 +410,6 @@ reflexes:
|
||||
senses:
|
||||
cpu:
|
||||
group: "my group!"
|
||||
reflexes: []
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(false);
|
||||
@@ -553,7 +422,6 @@ reflexes: []
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
reflexes: []
|
||||
workflows:
|
||||
alert:
|
||||
concurrency: 1
|
||||
@@ -570,7 +438,6 @@ workflows:
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
reflexes: []
|
||||
workflows:
|
||||
alert:
|
||||
concurrency: 1
|
||||
@@ -582,34 +449,5 @@ workflows:
|
||||
if (result.ok) return;
|
||||
expect(result.error.message).toMatch(/max_queue.*not allowed.*drop/);
|
||||
});
|
||||
|
||||
it("returns error when reflex has both sense and workflow", () => {
|
||||
const yaml = `
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
reflexes:
|
||||
- sense: cpu
|
||||
workflow: alert
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(false);
|
||||
if (result.ok) return;
|
||||
expect(result.error.message).toMatch(/workflow.*not supported/);
|
||||
});
|
||||
|
||||
it("returns error when reflex has neither sense nor workflow", () => {
|
||||
const yaml = `
|
||||
senses:
|
||||
cpu:
|
||||
group: system
|
||||
reflexes:
|
||||
- interval: 10s
|
||||
`;
|
||||
const result = parseNerveConfig(yaml);
|
||||
expect(result.ok).toBe(false);
|
||||
if (result.ok) return;
|
||||
expect(result.error.message).toMatch(/must include "sense"/);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -8,22 +8,12 @@ export type SenseConfig = {
|
||||
gracePeriod: number | null;
|
||||
/** Max rows to retain in `_signals`; older rows are pruned periodically after inserts. */
|
||||
retention: number;
|
||||
/** Polling interval (ms). When set, the sense is triggered periodically. Replaces reflex interval. */
|
||||
/** Polling interval (ms). When set, the sense is triggered periodically. */
|
||||
interval: number | null;
|
||||
/** Other sense names whose signals trigger this sense. Replaces reflex `on` field. */
|
||||
/** Other sense names whose signals trigger this sense. */
|
||||
on: string[];
|
||||
};
|
||||
|
||||
export type SenseReflexConfig = {
|
||||
kind: "sense";
|
||||
sense: string;
|
||||
interval: number | null;
|
||||
on: string[];
|
||||
};
|
||||
|
||||
/** Reflexes only schedule Senses; workflow launches come from Sense return values. */
|
||||
export type ReflexConfig = SenseReflexConfig;
|
||||
|
||||
export type DropOverflowConfig = {
|
||||
concurrency: number;
|
||||
overflow: "drop";
|
||||
@@ -50,7 +40,6 @@ export type NerveConfig = {
|
||||
/** Engine-wide default max moderator rounds (e.g. CLI workflow trigger when omitted). */
|
||||
maxRounds: number;
|
||||
senses: Record<string, SenseConfig>;
|
||||
reflexes: ReflexConfig[];
|
||||
workflows: Record<string, WorkflowConfig>;
|
||||
api: NerveApiConfig;
|
||||
};
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
export { DEFAULT_SENSE_SIGNAL_RETENTION } from "./config.js";
|
||||
export type {
|
||||
SenseConfig,
|
||||
SenseReflexConfig,
|
||||
ReflexConfig,
|
||||
DropOverflowConfig,
|
||||
QueueOverflowConfig,
|
||||
WorkflowConfig,
|
||||
@@ -10,11 +8,7 @@ export type {
|
||||
NerveConfig,
|
||||
} from "./config.js";
|
||||
export type { Signal, SenseInfo, SenseResult } from "./sense.js";
|
||||
export {
|
||||
labelSenseReflexTrigger,
|
||||
senseTriggerLabels,
|
||||
senseTriggerLabelsWithFallback,
|
||||
} from "./sense-trigger-labels.js";
|
||||
export { labelSenseTrigger, senseTriggerLabels } from "./sense-trigger-labels.js";
|
||||
export type {
|
||||
WorkflowMessage,
|
||||
RoleResult,
|
||||
|
||||
@@ -4,7 +4,6 @@ import {
|
||||
DEFAULT_SENSE_SIGNAL_RETENTION,
|
||||
type NerveApiConfig,
|
||||
type NerveConfig,
|
||||
type ReflexConfig,
|
||||
type SenseConfig,
|
||||
type WorkflowConfig,
|
||||
} from "./config.js";
|
||||
@@ -113,170 +112,6 @@ function validateSenseConfig(name: string, raw: unknown): Result<SenseConfig> {
|
||||
});
|
||||
}
|
||||
|
||||
const LEGACY_REFLEXES_DEPRECATION =
|
||||
"[nerve] Deprecated: top-level `reflexes` in nerve.yaml is deprecated. Move each reflex's `interval` and `on` fields onto the corresponding sense under `senses.<name>`.";
|
||||
|
||||
/** Uses `globalThis` so declaration emit works without DOM / Node `lib` typings for `console`. */
|
||||
function warnToConsole(message: string): void {
|
||||
const c = (globalThis as unknown as { console: { warn: (m: string) => void } }).console;
|
||||
c.warn(message);
|
||||
}
|
||||
|
||||
function mergeOnLists(existing: readonly string[], extra: readonly string[]): string[] {
|
||||
const seen = new Set(existing);
|
||||
const out = [...existing];
|
||||
for (const name of extra) {
|
||||
if (seen.has(name)) continue;
|
||||
seen.add(name);
|
||||
out.push(name);
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
function mergeSenseIntervalFromLegacyReflex(
|
||||
senseName: string,
|
||||
senseInterval: number | null,
|
||||
reflexInterval: number | null,
|
||||
reflexIndex: number,
|
||||
): Result<number | null> {
|
||||
if (reflexInterval === null) return ok(senseInterval);
|
||||
if (senseInterval === null) return ok(reflexInterval);
|
||||
if (senseInterval === reflexInterval) return ok(senseInterval);
|
||||
return err(
|
||||
new Error(
|
||||
`reflexes[${reflexIndex}].sense "${senseName}": conflicting interval (sense has ${senseInterval} ms, reflex has ${reflexInterval} ms); use a single interval source`,
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
function buildReflexesFromSenses(senses: Record<string, SenseConfig>): ReflexConfig[] {
|
||||
const reflexes: ReflexConfig[] = [];
|
||||
for (const [senseName, sc] of Object.entries(senses)) {
|
||||
if (sc.interval !== null || sc.on.length > 0) {
|
||||
reflexes.push({
|
||||
kind: "sense" as const,
|
||||
sense: senseName,
|
||||
interval: sc.interval,
|
||||
on: sc.on,
|
||||
});
|
||||
}
|
||||
}
|
||||
return reflexes;
|
||||
}
|
||||
|
||||
/**
|
||||
* If `reflexes` is present and non-empty, parse legacy entries and merge `interval` / `on`
|
||||
* into the corresponding sense. Emits a deprecation warning once when any legacy row exists.
|
||||
*/
|
||||
function mergeLegacyReflexesIntoSenses(
|
||||
obj: Record<string, unknown>,
|
||||
senses: Record<string, SenseConfig>,
|
||||
senseNames: Set<string>,
|
||||
): Result<Record<string, SenseConfig>> {
|
||||
if (!Object.hasOwn(obj, "reflexes") || obj.reflexes === undefined || obj.reflexes === null) {
|
||||
return ok(senses);
|
||||
}
|
||||
if (!Array.isArray(obj.reflexes)) {
|
||||
return err(new Error("reflexes: must be an array if provided"));
|
||||
}
|
||||
|
||||
if (obj.reflexes.length > 0) {
|
||||
warnToConsole(LEGACY_REFLEXES_DEPRECATION);
|
||||
}
|
||||
|
||||
let merged: Record<string, SenseConfig> = senses;
|
||||
for (let i = 0; i < obj.reflexes.length; i++) {
|
||||
const reflexResult = validateReflexConfig(i, obj.reflexes[i], senseNames);
|
||||
if (!reflexResult.ok) return reflexResult;
|
||||
const ref = reflexResult.value;
|
||||
const senseName = ref.sense;
|
||||
const sense = merged[senseName];
|
||||
const intervalResult = mergeSenseIntervalFromLegacyReflex(
|
||||
senseName,
|
||||
sense.interval,
|
||||
ref.interval,
|
||||
i,
|
||||
);
|
||||
if (!intervalResult.ok) return intervalResult;
|
||||
const next: SenseConfig = {
|
||||
...sense,
|
||||
interval: intervalResult.value,
|
||||
on: mergeOnLists(sense.on, ref.on),
|
||||
};
|
||||
merged = { ...merged, [senseName]: next };
|
||||
}
|
||||
|
||||
return ok(merged);
|
||||
}
|
||||
|
||||
function parseOnField(index: number, obj: Record<string, unknown>): Result<string[]> {
|
||||
if (obj.on === undefined || obj.on === null) return ok([]);
|
||||
if (!Array.isArray(obj.on) || !obj.on.every((item): item is string => typeof item === "string")) {
|
||||
return err(new Error(`reflexes[${index}].on: must be an array of strings`));
|
||||
}
|
||||
return ok(obj.on);
|
||||
}
|
||||
|
||||
function parseSenseReflex(
|
||||
index: number,
|
||||
obj: Record<string, unknown>,
|
||||
senseNames: Set<string>,
|
||||
on: string[],
|
||||
): Result<ReflexConfig> {
|
||||
if (typeof obj.sense !== "string") {
|
||||
return err(new Error(`reflexes[${index}].sense: must be a string`));
|
||||
}
|
||||
if (!senseNames.has(obj.sense)) {
|
||||
return err(new Error(`reflexes[${index}].sense: "${obj.sense}" not found in senses`));
|
||||
}
|
||||
|
||||
const intervalResult = parseDurationField(obj.interval, `reflexes[${index}].interval`);
|
||||
if (!intervalResult.ok) return intervalResult;
|
||||
|
||||
if (intervalResult.value === null && on.length === 0) {
|
||||
return err(
|
||||
new Error(`reflexes[${index}]: sense reflex must have at least one of "interval" or "on"`),
|
||||
);
|
||||
}
|
||||
|
||||
return ok({
|
||||
kind: "sense" as const,
|
||||
sense: obj.sense,
|
||||
interval: intervalResult.value,
|
||||
on,
|
||||
});
|
||||
}
|
||||
|
||||
function validateReflexConfig(
|
||||
index: number,
|
||||
raw: unknown,
|
||||
senseNames: Set<string>,
|
||||
): Result<ReflexConfig> {
|
||||
if (!isPlainRecord(raw)) {
|
||||
return err(new Error(`reflexes[${index}]: must be an object`));
|
||||
}
|
||||
|
||||
const obj = raw;
|
||||
const hasSense = obj.sense !== undefined;
|
||||
const hasWorkflowKey = Object.hasOwn(obj, "workflow");
|
||||
|
||||
if (hasWorkflowKey) {
|
||||
return err(
|
||||
new Error(
|
||||
`reflexes[${index}]: YAML "workflow" entries are not supported — start workflows from a Sense compute return value using a "workflow" string field (format: name|maxRounds|prompt)`,
|
||||
),
|
||||
);
|
||||
}
|
||||
if (!hasSense) {
|
||||
return err(new Error(`reflexes[${index}]: must include "sense"`));
|
||||
}
|
||||
|
||||
const onResult = parseOnField(index, obj);
|
||||
if (!onResult.ok) return onResult;
|
||||
|
||||
return parseSenseReflex(index, obj, senseNames, onResult.value);
|
||||
}
|
||||
|
||||
function parseEngineMaxRounds(obj: Record<string, unknown>): Result<number> {
|
||||
if (obj.max_rounds === undefined || obj.max_rounds === null) {
|
||||
return ok(DEFAULT_ENGINE_MAX_ROUNDS);
|
||||
@@ -342,14 +177,13 @@ function validateWorkflowConfig(name: string, raw: unknown): Result<WorkflowConf
|
||||
|
||||
function parseSenses(
|
||||
obj: Record<string, unknown>,
|
||||
): Result<{ senses: Record<string, SenseConfig>; senseNames: Set<string> }> {
|
||||
): Result<{ senses: Record<string, SenseConfig> }> {
|
||||
if (!isPlainRecord(obj.senses)) {
|
||||
return err(new Error("senses: required object"));
|
||||
}
|
||||
|
||||
const sensesRaw = obj.senses;
|
||||
const senses: Record<string, SenseConfig> = {};
|
||||
const senseNames = new Set(Object.keys(sensesRaw));
|
||||
|
||||
for (const [name, senseRaw] of Object.entries(sensesRaw)) {
|
||||
const result = validateSenseConfig(name, senseRaw);
|
||||
@@ -357,7 +191,7 @@ function parseSenses(
|
||||
senses[name] = result.value;
|
||||
}
|
||||
|
||||
return ok({ senses, senseNames });
|
||||
return ok({ senses });
|
||||
}
|
||||
|
||||
const DEFAULT_API_BIND_HOST = "127.0.0.1";
|
||||
@@ -465,12 +299,15 @@ export function parseNerveConfig(raw: string): Result<NerveConfig> {
|
||||
|
||||
const sensesResult = parseSenses(obj);
|
||||
if (!sensesResult.ok) return sensesResult;
|
||||
const { senses, senseNames } = sensesResult.value;
|
||||
const { senses } = sensesResult.value;
|
||||
|
||||
const mergedSensesResult = mergeLegacyReflexesIntoSenses(obj, senses, senseNames);
|
||||
if (!mergedSensesResult.ok) return mergedSensesResult;
|
||||
const mergedSenses = mergedSensesResult.value;
|
||||
const reflexes = buildReflexesFromSenses(mergedSenses);
|
||||
if (Object.hasOwn(obj, "reflexes")) {
|
||||
return err(
|
||||
new Error(
|
||||
"reflexes: top-level key is no longer supported; set `interval` and `on` on each sense under `senses.<name>`",
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
const workflowsResult = parseWorkflows(obj);
|
||||
if (!workflowsResult.ok) return workflowsResult;
|
||||
@@ -483,8 +320,7 @@ export function parseNerveConfig(raw: string): Result<NerveConfig> {
|
||||
|
||||
return ok({
|
||||
maxRounds: maxRoundsResult.value,
|
||||
senses: mergedSenses,
|
||||
reflexes,
|
||||
senses,
|
||||
workflows: workflowsResult.value,
|
||||
api: apiResult.value,
|
||||
});
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import type { ReflexConfig, SenseConfig } from "./config.js";
|
||||
import type { SenseConfig } from "./config.js";
|
||||
|
||||
function formatIntervalMs(ms: number): string {
|
||||
const totalSeconds = Math.floor(ms / 1000);
|
||||
@@ -10,50 +10,31 @@ function formatIntervalMs(ms: number): string {
|
||||
return `${hours}h ${remainingMinutes}m`;
|
||||
}
|
||||
|
||||
/** Human-readable label for one sense reflex (interval and/or signal subscriptions). */
|
||||
export function labelSenseReflexTrigger(reflex: Extract<ReflexConfig, { kind: "sense" }>): string {
|
||||
/** Human-readable label for a sense schedule (`interval` and/or `on`). */
|
||||
export function labelSenseTrigger(slice: Pick<SenseConfig, "interval" | "on">): string {
|
||||
const parts: string[] = [];
|
||||
if (reflex.interval !== null) {
|
||||
parts.push(`every ${formatIntervalMs(reflex.interval)}`);
|
||||
if (slice.interval !== null) {
|
||||
parts.push(`every ${formatIntervalMs(slice.interval)}`);
|
||||
}
|
||||
if (reflex.on.length > 0) {
|
||||
parts.push(`on: ${reflex.on.join(", ")}`);
|
||||
if (slice.on.length > 0) {
|
||||
parts.push(`on: ${slice.on.join(", ")}`);
|
||||
}
|
||||
if (parts.length === 0) {
|
||||
return "reflex (no interval or on)";
|
||||
return "trigger (no interval or on)";
|
||||
}
|
||||
return parts.join(" · ");
|
||||
}
|
||||
|
||||
/** All reflex trigger descriptions that target the given sense name. */
|
||||
export function senseTriggerLabels(senseName: string, reflexes: readonly ReflexConfig[]): string[] {
|
||||
const out: string[] = [];
|
||||
for (const ref of reflexes) {
|
||||
if (ref.kind !== "sense" || ref.sense !== senseName) continue;
|
||||
out.push(labelSenseReflexTrigger(ref));
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
/**
|
||||
* Human-readable trigger labels for a sense: prefers inline `interval` / `on` on
|
||||
* `SenseConfig`, otherwise falls back to legacy `config.reflexes` rows.
|
||||
* Human-readable trigger labels for a sense from its `SenseConfig.interval` / `.on`.
|
||||
* Returns an empty array when the sense is missing or has no schedule.
|
||||
*/
|
||||
export function senseTriggerLabelsWithFallback(
|
||||
export function senseTriggerLabels(
|
||||
senseName: string,
|
||||
senses: Record<string, SenseConfig>,
|
||||
reflexes: readonly ReflexConfig[],
|
||||
): string[] {
|
||||
const sc = senses[senseName];
|
||||
if (sc !== undefined && (sc.interval !== null || sc.on.length > 0)) {
|
||||
return [
|
||||
labelSenseReflexTrigger({
|
||||
kind: "sense",
|
||||
sense: senseName,
|
||||
interval: sc.interval,
|
||||
on: sc.on,
|
||||
}),
|
||||
];
|
||||
}
|
||||
return senseTriggerLabels(senseName, reflexes);
|
||||
if (sc === undefined) return [];
|
||||
if (sc.interval === null && sc.on.length === 0) return [];
|
||||
return [labelSenseTrigger({ interval: sc.interval, on: sc.on })];
|
||||
}
|
||||
|
||||
@@ -62,7 +62,6 @@ const { createWorkflowManager } = await import("../workflow-manager.js");
|
||||
function makeConfig(workflows: Record<string, WorkflowConfig> = {}): NerveConfig {
|
||||
return {
|
||||
senses: {},
|
||||
reflexes: [],
|
||||
workflows,
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
|
||||
@@ -16,7 +16,7 @@ import type { FileChange, FileWatcher } from "../file-watcher.js";
|
||||
function makeTempNerveRoot(): string {
|
||||
const dir = mkdtempSync(join(tmpdir(), "nerve-fw-wf-test-"));
|
||||
mkdirSync(join(dir, "workflows", "my-workflow"), { recursive: true });
|
||||
writeFileSync(join(dir, "nerve.yaml"), "senses: {}\nreflexes: []\n");
|
||||
writeFileSync(join(dir, "nerve.yaml"), "senses: {}\n");
|
||||
writeFileSync(
|
||||
join(dir, "workflows", "my-workflow", "index.ts"),
|
||||
"export default { roles: {}, moderate: () => null };",
|
||||
@@ -79,7 +79,7 @@ describe("createFileWatcher — workflow file changes (Phase 3)", () => {
|
||||
// stale workflow creation events into the nerve.yaml write window.
|
||||
const dir = mkdtempSync(join(tmpdir(), "nerve-fw-wf-noworkflow-"));
|
||||
mkdirSync(join(dir, "senses"), { recursive: true });
|
||||
writeFileSync(join(dir, "nerve.yaml"), "senses: {}\nreflexes: []\n");
|
||||
writeFileSync(join(dir, "nerve.yaml"), "senses: {}\n");
|
||||
|
||||
const changes: FileChange[] = [];
|
||||
watcher = createFileWatcher(dir, (change) => changes.push(change), 50);
|
||||
@@ -88,7 +88,7 @@ describe("createFileWatcher — workflow file changes (Phase 3)", () => {
|
||||
await new Promise((r) => setTimeout(r, 200));
|
||||
changes.length = 0;
|
||||
|
||||
writeFileSync(join(dir, "nerve.yaml"), "senses: {}\nreflexes: []\n# changed\n");
|
||||
writeFileSync(join(dir, "nerve.yaml"), "senses: {}\n# changed\n");
|
||||
|
||||
await waitFor(() => changes.some((c) => c.kind === "config"), 3000);
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ import type { FileChange, FileWatcher } from "../file-watcher.js";
|
||||
function makeTempNerveRoot(): string {
|
||||
const dir = mkdtempSync(join(tmpdir(), "nerve-fw-test-"));
|
||||
mkdirSync(join(dir, "senses", "cpu-usage"), { recursive: true });
|
||||
writeFileSync(join(dir, "nerve.yaml"), "senses: {}\nreflexes: []\n");
|
||||
writeFileSync(join(dir, "nerve.yaml"), "senses: {}\n");
|
||||
writeFileSync(
|
||||
join(dir, "senses", "cpu-usage", "index.ts"),
|
||||
"export async function compute() { return null; }",
|
||||
@@ -62,7 +62,7 @@ describe("createFileWatcher", () => {
|
||||
await new Promise((r) => setTimeout(r, 200));
|
||||
// Clear any events from the setup phase
|
||||
changes.length = 0;
|
||||
writeFileSync(join(root, "nerve.yaml"), "senses: {}\nreflexes: []\n# changed\n");
|
||||
writeFileSync(join(root, "nerve.yaml"), "senses: {}\n# changed\n");
|
||||
|
||||
await waitFor(() => changes.length > 0, 3000);
|
||||
|
||||
@@ -102,7 +102,7 @@ describe("createFileWatcher", () => {
|
||||
watcher.close();
|
||||
watcher = null;
|
||||
|
||||
writeFileSync(join(root, "nerve.yaml"), "senses: {}\nreflexes: []\n# after close\n");
|
||||
writeFileSync(join(root, "nerve.yaml"), "senses: {}\n# after close\n");
|
||||
|
||||
// Wait and verify no changes were captured
|
||||
await new Promise((r) => setTimeout(r, 500));
|
||||
@@ -119,7 +119,7 @@ describe("createFileWatcher", () => {
|
||||
|
||||
// Write rapidly
|
||||
for (let i = 0; i < 5; i++) {
|
||||
writeFileSync(join(root, "nerve.yaml"), `senses: {}\nreflexes: []\n# v${i}\n`);
|
||||
writeFileSync(join(root, "nerve.yaml"), `senses: {}\n# v${i}\n`);
|
||||
}
|
||||
|
||||
await waitFor(() => changes.length > 0, 3000);
|
||||
|
||||
@@ -68,7 +68,6 @@ const { createKernel } = await import("../kernel.js");
|
||||
function makeWfConfig(workflows: Record<string, WorkflowConfig> = {}): NerveConfig {
|
||||
return {
|
||||
senses: {},
|
||||
reflexes: [],
|
||||
workflows,
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
@@ -457,7 +456,6 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
|
||||
const logStore = makeLogStore();
|
||||
const config: NerveConfig = {
|
||||
senses: {},
|
||||
reflexes: [],
|
||||
workflows: { "my-wf": { concurrency: 1, overflow: "drop" } },
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
@@ -493,7 +491,6 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
|
||||
const logStore = makeLogStore();
|
||||
const initialConfig: NerveConfig = {
|
||||
senses: {},
|
||||
reflexes: [],
|
||||
workflows: { "old-wf": { concurrency: 1, overflow: "drop" } },
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
@@ -515,7 +512,6 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
|
||||
// Reload config without old-wf
|
||||
const newConfig: NerveConfig = {
|
||||
senses: {},
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
@@ -538,7 +534,6 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
|
||||
const logStore = makeLogStore();
|
||||
const initialConfig: NerveConfig = {
|
||||
senses: {},
|
||||
reflexes: [],
|
||||
workflows: { "my-wf": { concurrency: 1, overflow: "drop" } },
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
@@ -555,7 +550,6 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
|
||||
// Reload with updated concurrency — should NOT spawn a new workflow worker
|
||||
const newConfig: NerveConfig = {
|
||||
senses: {},
|
||||
reflexes: [],
|
||||
workflows: { "my-wf": { concurrency: 5, overflow: "queue", maxQueue: 50 } },
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
|
||||
@@ -35,7 +35,6 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
|
||||
@@ -83,7 +83,6 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
@@ -243,7 +242,6 @@ describe("kernel — reloadConfig", () => {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
@@ -277,7 +275,6 @@ describe("kernel — reloadConfig", () => {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
@@ -301,7 +298,6 @@ describe("kernel — reloadConfig", () => {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
@@ -341,7 +337,6 @@ describe("kernel — reloadConfig", () => {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
|
||||
@@ -102,7 +102,6 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
@@ -162,7 +161,6 @@ describe("kernel.triggerSense()", () => {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
});
|
||||
const kernel = createKernel(config, nerveRoot, {
|
||||
workerScript: null,
|
||||
@@ -208,7 +206,6 @@ describe("kernel.triggerSense()", () => {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
});
|
||||
const kernel = createKernel(config, nerveRoot, {
|
||||
workerScript: null,
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
/**
|
||||
* Integration tests for Kernel + WorkflowManager integration.
|
||||
*
|
||||
* Verifies that sense signals trigger workflow runs via workflow reflexes,
|
||||
* that workflow events are logged, that reloadConfig handles workflow changes,
|
||||
* Verifies that sense signals trigger workflow runs when Sense compute routes
|
||||
* to workflows; that workflow events are logged; that reloadConfig handles workflow changes;
|
||||
* and that graceful shutdown stops workflow workers.
|
||||
*
|
||||
* Uses mocked child_process.fork to avoid real subprocesses.
|
||||
@@ -105,7 +105,6 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
@@ -147,7 +146,6 @@ describe("kernel + workflowManager integration", () => {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: { "my-workflow": { concurrency: 2, overflow: "drop" } },
|
||||
});
|
||||
|
||||
@@ -199,7 +197,6 @@ describe("kernel + workflowManager integration", () => {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: { "alert-workflow": { concurrency: 1, overflow: "drop" } },
|
||||
});
|
||||
|
||||
@@ -265,7 +262,6 @@ describe("kernel + workflowManager integration", () => {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: { "my-workflow": { concurrency: 1, overflow: "drop" } },
|
||||
});
|
||||
|
||||
@@ -316,7 +312,6 @@ describe("kernel + workflowManager integration", () => {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: { "log-test-workflow": { concurrency: 2, overflow: "drop" } },
|
||||
});
|
||||
|
||||
@@ -361,7 +356,6 @@ describe("kernel + workflowManager integration", () => {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
});
|
||||
@@ -384,7 +378,6 @@ describe("kernel + workflowManager integration", () => {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: { "new-workflow": { concurrency: 1, overflow: "drop" } },
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
@@ -432,7 +425,6 @@ describe("kernel + workflowManager integration", () => {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: { "old-workflow": { concurrency: 1, overflow: "drop" } },
|
||||
});
|
||||
|
||||
@@ -454,7 +446,6 @@ describe("kernel + workflowManager integration", () => {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
@@ -508,7 +499,6 @@ describe("kernel + workflowManager integration", () => {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: { "shutdown-test": { concurrency: 1, overflow: "drop" } },
|
||||
});
|
||||
|
||||
@@ -565,7 +555,6 @@ describe("kernel + workflowManager integration", () => {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: { "health-wf": { concurrency: 2, overflow: "drop" } },
|
||||
});
|
||||
|
||||
|
||||
@@ -66,7 +66,6 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
@@ -105,7 +104,6 @@ describe("kernel — message routing", () => {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
});
|
||||
const kernel = createKernel(config, nerveRoot);
|
||||
|
||||
@@ -135,7 +133,6 @@ describe("kernel — message routing", () => {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
});
|
||||
const kernel = createKernel(config, tmpDir, { logStore });
|
||||
const child = mockChildren[0];
|
||||
@@ -164,7 +161,6 @@ describe("kernel — message routing", () => {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
});
|
||||
const kernel = createKernel(config, nerveRoot);
|
||||
|
||||
@@ -193,7 +189,6 @@ describe("kernel — message routing", () => {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
});
|
||||
const kernel = createKernel(config, nerveRoot);
|
||||
|
||||
@@ -221,7 +216,6 @@ describe("kernel — message routing", () => {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
});
|
||||
const kernel = createKernel(config, nerveRoot);
|
||||
|
||||
@@ -280,7 +274,6 @@ describe("kernel — groupForSense mapping", () => {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
@@ -301,11 +294,10 @@ describe("kernel — groupForSense mapping", () => {
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
interval: 500,
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 500, on: [] }],
|
||||
});
|
||||
const kernel = createKernel(config, nerveRoot);
|
||||
|
||||
|
||||
@@ -23,7 +23,7 @@ describe("LogStore + ReflexScheduler integration", () => {
|
||||
rmSync(tmpDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it("logs run_start when reflex triggers a compute", () => {
|
||||
it("logs run_start when scheduler triggers a compute", () => {
|
||||
const config: NerveConfig = {
|
||||
senses: {
|
||||
"cpu-usage": {
|
||||
@@ -33,10 +33,9 @@ describe("LogStore + ReflexScheduler integration", () => {
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
on: ["cpu-usage"],
|
||||
},
|
||||
},
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
@@ -58,7 +57,7 @@ describe("LogStore + ReflexScheduler integration", () => {
|
||||
scheduler.stop();
|
||||
});
|
||||
|
||||
it("interval reflex produces run_start logs", () => {
|
||||
it("interval schedule produces run_start logs", () => {
|
||||
vi.useFakeTimers();
|
||||
|
||||
const config: NerveConfig = {
|
||||
@@ -69,11 +68,10 @@ describe("LogStore + ReflexScheduler integration", () => {
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
interval: 1000,
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 1000, on: [] }],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
@@ -100,7 +98,7 @@ describe("LogStore + ReflexScheduler integration", () => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("logs cannot trigger reflexes (architectural constraint)", () => {
|
||||
it("logs cannot trigger the scheduler (architectural constraint)", () => {
|
||||
const config: NerveConfig = {
|
||||
senses: {
|
||||
"cpu-usage": {
|
||||
@@ -110,10 +108,9 @@ describe("LogStore + ReflexScheduler integration", () => {
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
on: ["cpu-usage"],
|
||||
},
|
||||
},
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
@@ -138,8 +135,8 @@ describe("LogStore + ReflexScheduler integration", () => {
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
|
||||
// Writing to the log store should NOT trigger any reflex.
|
||||
// Only bus.emit(signal) triggers reflexes.
|
||||
// Writing to the log store should NOT trigger any sense compute.
|
||||
// Only bus.emit(signal) triggers scheduled senses.
|
||||
expect(triggered).toHaveLength(0);
|
||||
|
||||
scheduler.stop();
|
||||
|
||||
@@ -32,7 +32,6 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
@@ -174,7 +173,6 @@ describe("phase6 — reloadConfig", () => {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
@@ -211,7 +209,6 @@ describe("phase6 — reloadConfig", () => {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
@@ -235,7 +232,6 @@ describe("phase6 — reloadConfig", () => {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
@@ -289,7 +285,6 @@ describe("phase6 — error isolation", () => {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
@@ -440,7 +435,6 @@ describe("phase6 — getHealth", () => {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
|
||||
@@ -17,7 +17,6 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
|
||||
@@ -39,7 +39,6 @@ function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: {},
|
||||
maxRounds: 10,
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
@@ -137,27 +136,6 @@ describe("ReflexScheduler — interval reflex", () => {
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
|
||||
it("falls back to config.reflexes when sense has no inline interval or on", () => {
|
||||
const triggered: string[] = [];
|
||||
const config = makeConfig({
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 1000, on: [] }],
|
||||
});
|
||||
const bus = createSignalBus();
|
||||
const ref: { scheduler: ReturnType<typeof createReflexScheduler> | null } = {
|
||||
scheduler: null,
|
||||
};
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => {
|
||||
triggered.push(name);
|
||||
ref.scheduler?.onComputeComplete(name);
|
||||
});
|
||||
ref.scheduler = scheduler;
|
||||
|
||||
vi.advanceTimersByTime(3000);
|
||||
expect(triggered.length).toBeGreaterThanOrEqual(3);
|
||||
expect(triggered.every((n) => n === "cpu-usage")).toBe(true);
|
||||
scheduler.stop();
|
||||
});
|
||||
});
|
||||
|
||||
describe("ReflexScheduler — event (on) reflex", () => {
|
||||
@@ -388,36 +366,3 @@ describe("ReflexScheduler — interval + on combined", () => {
|
||||
scheduler.stop();
|
||||
});
|
||||
});
|
||||
|
||||
describe("ReflexScheduler — workflow reflexes ignored", () => {
|
||||
it("does not set up any scheduling for workflow kind reflexes", () => {
|
||||
const triggered: string[] = [];
|
||||
const config: NerveConfig = {
|
||||
maxRounds: 10,
|
||||
senses: {
|
||||
"cpu-usage": {
|
||||
group: "system",
|
||||
throttle: null,
|
||||
timeout: null,
|
||||
gracePeriod: null,
|
||||
retention: 10_000,
|
||||
interval: null,
|
||||
on: [],
|
||||
},
|
||||
},
|
||||
reflexes: [{ kind: "workflow", workflow: "my-workflow", on: ["cpu-usage"] } as any],
|
||||
workflows: {
|
||||
"my-workflow": { concurrency: 1, overflow: "drop" },
|
||||
},
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
};
|
||||
const bus = createSignalBus();
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name));
|
||||
|
||||
bus.emit(makeSignal("cpu-usage"));
|
||||
|
||||
expect(triggered.length).toBe(0);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -87,7 +87,6 @@ function makeConfig(overrides: Partial<NerveConfig["workflows"]> = {}): NerveCon
|
||||
return {
|
||||
maxRounds: 10,
|
||||
senses: {},
|
||||
reflexes: [],
|
||||
workflows: overrides as NerveConfig["workflows"],
|
||||
api: { port: null, token: null, host: "127.0.0.1" },
|
||||
};
|
||||
|
||||
@@ -13,7 +13,7 @@ import {
|
||||
type NerveConfig,
|
||||
type SenseInfo,
|
||||
type Signal,
|
||||
senseTriggerLabelsWithFallback,
|
||||
senseTriggerLabels,
|
||||
} from "@uncaged/nerve-core";
|
||||
import { routeSenseComputeOutput } from "@uncaged/nerve-core";
|
||||
|
||||
@@ -377,7 +377,7 @@ export function createKernel(
|
||||
group: senseConfig.group,
|
||||
throttle: senseConfig.throttle,
|
||||
timeout: senseConfig.timeout,
|
||||
triggers: senseTriggerLabelsWithFallback(name, config.senses, config.reflexes),
|
||||
triggers: senseTriggerLabels(name, config.senses),
|
||||
lastSignalTimestamp: lastEntry !== null ? lastEntry.timestamp : null,
|
||||
};
|
||||
});
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
/**
|
||||
* Reflex Scheduler — drives sense compute cycles from each sense's `interval` / `on`
|
||||
* (inline triggers). Falls back to `config.reflexes` when a sense has no schedule there
|
||||
* but legacy reflex rows still define one.
|
||||
* Reflex Scheduler — drives sense compute cycles from each sense's `interval` / `on`.
|
||||
*
|
||||
* Supports:
|
||||
* - interval: periodic setInterval-based triggering
|
||||
@@ -11,7 +9,7 @@
|
||||
* run it once after the current compute completes (no unbounded queue)
|
||||
*/
|
||||
|
||||
import type { NerveConfig, ReflexConfig, SenseConfig } from "@uncaged/nerve-core";
|
||||
import type { NerveConfig } from "@uncaged/nerve-core";
|
||||
import type { LogStore } from "@uncaged/nerve-store";
|
||||
import type { SignalBus, Unsubscribe } from "./signal-bus.js";
|
||||
|
||||
@@ -41,59 +39,11 @@ export type ReflexSchedulerOptions = {
|
||||
logStore?: LogStore;
|
||||
};
|
||||
|
||||
type ScheduleSlice = {
|
||||
interval: number | null;
|
||||
on: string[];
|
||||
};
|
||||
|
||||
function mergeOnUnique(a: readonly string[], b: readonly string[]): string[] {
|
||||
const seen = new Set<string>();
|
||||
const out: string[] = [];
|
||||
for (const item of [...a, ...b]) {
|
||||
if (seen.has(item)) continue;
|
||||
seen.add(item);
|
||||
out.push(item);
|
||||
}
|
||||
return out;
|
||||
}
|
||||
|
||||
/** Aggregate legacy `reflexes[]` rows for one sense (same sense name, kind sense only). */
|
||||
function scheduleFromReflexesOnly(
|
||||
senseName: string,
|
||||
reflexes: readonly ReflexConfig[],
|
||||
): ScheduleSlice {
|
||||
let interval: number | null = null;
|
||||
let on: string[] = [];
|
||||
for (const ref of reflexes) {
|
||||
if (ref.kind !== "sense" || ref.sense !== senseName) continue;
|
||||
if (ref.interval !== null) {
|
||||
interval = interval === null ? ref.interval : interval;
|
||||
}
|
||||
on = mergeOnUnique(on, ref.on);
|
||||
}
|
||||
return { interval, on };
|
||||
}
|
||||
|
||||
/**
|
||||
* Effective polling / signal subscriptions for one sense: prefer fields on `SenseConfig`,
|
||||
* else legacy-only reflex rows targeting that sense.
|
||||
*/
|
||||
function effectiveSenseSchedule(
|
||||
senseName: string,
|
||||
sense: SenseConfig,
|
||||
reflexes: readonly ReflexConfig[],
|
||||
): ScheduleSlice {
|
||||
if (sense.interval !== null || sense.on.length > 0) {
|
||||
return { interval: sense.interval, on: sense.on };
|
||||
}
|
||||
return scheduleFromReflexesOnly(senseName, reflexes);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create and start a reflex scheduler.
|
||||
*
|
||||
* @param config Full NerveConfig (senses for schedule + throttle; reflexes as fallback).
|
||||
* @param bus SignalBus to subscribe for event-driven reflexes.
|
||||
* @param config Full NerveConfig (senses for schedule + throttle).
|
||||
* @param bus SignalBus to subscribe for event-driven triggers.
|
||||
* @param triggerFn Called with the sense name when a compute should be dispatched.
|
||||
* @param opts Optional: logStore for structured logging.
|
||||
* @returns ReflexScheduler with stop() and onComputeComplete() methods.
|
||||
@@ -203,7 +153,7 @@ export function createReflexScheduler(
|
||||
}
|
||||
|
||||
for (const [senseName, sense] of Object.entries(config.senses)) {
|
||||
const { interval, on } = effectiveSenseSchedule(senseName, sense, config.reflexes);
|
||||
const { interval, on } = sense;
|
||||
|
||||
if (interval !== null) {
|
||||
const id = setInterval(() => {
|
||||
|
||||
Reference in New Issue
Block a user