Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a887fc04ca | |||
| d5931a9e19 | |||
| f12f187d8d | |||
| 1512113a01 |
@@ -48,9 +48,9 @@ export const cpuUsage = sqliteTable("cpu_usage", {
|
||||
});
|
||||
`;
|
||||
|
||||
const CPU_INDEX_TS = `import { cpus } from "node:os";
|
||||
const CPU_INDEX_JS = `import { cpus } from "node:os";
|
||||
|
||||
export async function compute(): Promise<unknown> {
|
||||
export async function compute() {
|
||||
const cpuList = cpus();
|
||||
|
||||
let totalIdle = 0;
|
||||
@@ -135,13 +135,14 @@ export const initCommand = defineCommand({
|
||||
}
|
||||
|
||||
mkdirSync(join(nerveRoot, "data"), { recursive: true });
|
||||
mkdirSync(join(nerveRoot, "data", "senses"), { recursive: true });
|
||||
mkdirSync(join(nerveRoot, "senses", "cpu-usage", "migrations"), { recursive: true });
|
||||
|
||||
writeFile(join(nerveRoot, "nerve.yaml"), NERVE_YAML);
|
||||
writeFile(join(nerveRoot, "package.json"), PACKAGE_JSON);
|
||||
writeFile(join(nerveRoot, ".gitignore"), GITIGNORE);
|
||||
writeFile(join(nerveRoot, "senses", "cpu-usage", "schema.ts"), CPU_SCHEMA_TS);
|
||||
writeFile(join(nerveRoot, "senses", "cpu-usage", "index.ts"), CPU_INDEX_TS);
|
||||
writeFile(join(nerveRoot, "senses", "cpu-usage", "index.js"), CPU_INDEX_JS);
|
||||
writeFile(
|
||||
join(nerveRoot, "senses", "cpu-usage", "migrations", "0001_init.sql"),
|
||||
CPU_MIGRATION_SQL,
|
||||
|
||||
@@ -0,0 +1,418 @@
|
||||
/**
|
||||
* Integration tests for Kernel + WorkflowManager integration.
|
||||
*
|
||||
* Verifies that sense signals trigger workflow runs via workflow reflexes,
|
||||
* that workflow events are logged, that reloadConfig handles workflow changes,
|
||||
* and that graceful shutdown stops workflow workers.
|
||||
*
|
||||
* Uses mocked child_process.fork to avoid real subprocesses.
|
||||
*/
|
||||
|
||||
import { EventEmitter } from "node:events";
|
||||
|
||||
import type { NerveConfig } from "@uncaged/nerve-core";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Mock child_process.fork before importing kernel
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type MockChild = EventEmitter & {
|
||||
send: ReturnType<typeof vi.fn>;
|
||||
kill: ReturnType<typeof vi.fn>;
|
||||
connected: boolean;
|
||||
exitCode: number | null;
|
||||
pid: number;
|
||||
};
|
||||
|
||||
const mockChildren: MockChild[] = [];
|
||||
|
||||
function makeMockChild(pid = 1): MockChild {
|
||||
const child = new EventEmitter() as MockChild;
|
||||
child.connected = true;
|
||||
child.exitCode = null;
|
||||
child.pid = pid;
|
||||
child.send = vi.fn((msg: unknown) => {
|
||||
if (
|
||||
msg !== null &&
|
||||
typeof msg === "object" &&
|
||||
(msg as Record<string, unknown>).type === "shutdown"
|
||||
) {
|
||||
setImmediate(() => {
|
||||
child.exitCode = 0;
|
||||
child.connected = false;
|
||||
child.emit("exit", 0, null);
|
||||
});
|
||||
}
|
||||
});
|
||||
child.kill = vi.fn((_signal?: string) => {
|
||||
child.exitCode = 1;
|
||||
child.connected = false;
|
||||
child.emit("exit", null, _signal ?? "SIGKILL");
|
||||
});
|
||||
return child;
|
||||
}
|
||||
|
||||
vi.mock("node:child_process", () => ({
|
||||
fork: vi.fn((_script: string, _args: string[], _opts: unknown) => {
|
||||
const child = makeMockChild(mockChildren.length + 1);
|
||||
mockChildren.push(child);
|
||||
return child;
|
||||
}),
|
||||
}));
|
||||
|
||||
// Import after mock is set up
|
||||
const { createKernel } = await import("../kernel.js");
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function makeLogStore() {
|
||||
return {
|
||||
append: vi.fn(),
|
||||
query: vi.fn(() => []),
|
||||
getMeta: vi.fn(() => null),
|
||||
setMeta: vi.fn(),
|
||||
upsertWorkflowRun: vi.fn(),
|
||||
appendWithWorkflowUpdate: vi.fn(),
|
||||
getWorkflowRun: vi.fn(() => null),
|
||||
getActiveWorkflowRuns: vi.fn(() => []),
|
||||
close: vi.fn(),
|
||||
};
|
||||
}
|
||||
|
||||
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
return {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("kernel + workflowManager integration", () => {
|
||||
beforeEach(() => {
|
||||
mockChildren.length = 0;
|
||||
vi.useFakeTimers({ shouldAdvanceTime: true });
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
vi.useRealTimers();
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
describe("sense signal triggers workflow via reflex", () => {
|
||||
it("calls workflowManager.startWorkflow when a sense signal fires on a workflow reflex", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [{ kind: "workflow", workflow: "my-workflow", on: ["cpu-usage"] }],
|
||||
workflows: { "my-workflow": { concurrency: 2, overflow: "drop" } },
|
||||
});
|
||||
|
||||
const kernel = createKernel(config, "/tmp/nerve-test", {
|
||||
workerScript: "fake-worker.js",
|
||||
logStore,
|
||||
});
|
||||
|
||||
// Emit a signal from "cpu-usage" on the bus
|
||||
const { createSignalBus } = await import("../signal-bus.js");
|
||||
void createSignalBus; // ensure import resolves
|
||||
kernel.bus.emit({ id: 1, senseId: "cpu-usage", payload: { value: 80 }, ts: Date.now() });
|
||||
|
||||
// The workflow worker should be spawned (one for the sense group, one for workflow)
|
||||
// The sense group worker is mockChildren[0]; the workflow worker is mockChildren[1]
|
||||
// We need to check that a start-thread message was sent to the workflow worker
|
||||
const workflowWorker = mockChildren.find((c) =>
|
||||
(c.send as ReturnType<typeof vi.fn>).mock.calls.some(
|
||||
([msg]: [unknown]) =>
|
||||
msg !== null &&
|
||||
typeof msg === "object" &&
|
||||
(msg as Record<string, unknown>).type === "start-thread",
|
||||
),
|
||||
);
|
||||
expect(workflowWorker).toBeDefined();
|
||||
|
||||
const stopPromise = kernel.stop();
|
||||
await vi.runAllTimersAsync();
|
||||
await stopPromise;
|
||||
});
|
||||
|
||||
it("passes the signal payload as triggerPayload to the workflow", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [{ kind: "workflow", workflow: "alert-workflow", on: ["cpu-usage"] }],
|
||||
workflows: { "alert-workflow": { concurrency: 1, overflow: "drop" } },
|
||||
});
|
||||
|
||||
const kernel = createKernel(config, "/tmp/nerve-test", {
|
||||
workerScript: "fake-worker.js",
|
||||
logStore,
|
||||
});
|
||||
|
||||
const payload = { level: "critical", value: 99 };
|
||||
kernel.bus.emit({ id: 1, senseId: "cpu-usage", payload, ts: Date.now() });
|
||||
|
||||
// Find the start-thread call and verify triggerPayload
|
||||
const startThreadCall = mockChildren
|
||||
.flatMap((c) => (c.send as ReturnType<typeof vi.fn>).mock.calls as [unknown][])
|
||||
.find(
|
||||
([msg]) =>
|
||||
msg !== null &&
|
||||
typeof msg === "object" &&
|
||||
(msg as Record<string, unknown>).type === "start-thread",
|
||||
);
|
||||
|
||||
expect(startThreadCall).toBeDefined();
|
||||
expect(startThreadCall?.[0]).toMatchObject({
|
||||
type: "start-thread",
|
||||
workflow: "alert-workflow",
|
||||
triggerPayload: payload,
|
||||
});
|
||||
|
||||
const stopPromise = kernel.stop();
|
||||
await vi.runAllTimersAsync();
|
||||
await stopPromise;
|
||||
});
|
||||
|
||||
it("does not trigger workflow when signal senseId is not in 'on' list", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"disk-io": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [{ kind: "workflow", workflow: "my-workflow", on: ["disk-io"] }],
|
||||
workflows: { "my-workflow": { concurrency: 1, overflow: "drop" } },
|
||||
});
|
||||
|
||||
const kernel = createKernel(config, "/tmp/nerve-test", {
|
||||
workerScript: "fake-worker.js",
|
||||
logStore,
|
||||
});
|
||||
|
||||
// Emit signal from cpu-usage — NOT in the workflow's "on" list
|
||||
kernel.bus.emit({ id: 1, senseId: "cpu-usage", payload: 50, ts: Date.now() });
|
||||
|
||||
// No workflow worker should have been spawned (only the sense group worker)
|
||||
const workflowWorkerSpawned = mockChildren.some((c) =>
|
||||
(c.send as ReturnType<typeof vi.fn>).mock.calls.some(
|
||||
([msg]: [unknown]) =>
|
||||
msg !== null &&
|
||||
typeof msg === "object" &&
|
||||
(msg as Record<string, unknown>).type === "start-thread",
|
||||
),
|
||||
);
|
||||
expect(workflowWorkerSpawned).toBe(false);
|
||||
|
||||
const stopPromise = kernel.stop();
|
||||
await vi.runAllTimersAsync();
|
||||
await stopPromise;
|
||||
});
|
||||
});
|
||||
|
||||
describe("workflow events are logged", () => {
|
||||
it("logs a 'started' event when workflow thread is triggered", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [{ kind: "workflow", workflow: "log-test-workflow", on: ["cpu-usage"] }],
|
||||
workflows: { "log-test-workflow": { concurrency: 2, overflow: "drop" } },
|
||||
});
|
||||
|
||||
const kernel = createKernel(config, "/tmp/nerve-test", {
|
||||
workerScript: "fake-worker.js",
|
||||
logStore,
|
||||
});
|
||||
|
||||
kernel.bus.emit({ id: 1, senseId: "cpu-usage", payload: null, ts: Date.now() });
|
||||
|
||||
expect(logStore.upsertWorkflowRun).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ source: "workflow", type: "started" }),
|
||||
expect.objectContaining({ workflow: "log-test-workflow", status: "started" }),
|
||||
);
|
||||
|
||||
const stopPromise = kernel.stop();
|
||||
await vi.runAllTimersAsync();
|
||||
await stopPromise;
|
||||
});
|
||||
});
|
||||
|
||||
describe("reloadConfig handles workflow changes", () => {
|
||||
it("new workflow reflexes are active after reloadConfig", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const initialConfig = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
});
|
||||
|
||||
const kernel = createKernel(initialConfig, "/tmp/nerve-test", {
|
||||
workerScript: "fake-worker.js",
|
||||
logStore,
|
||||
});
|
||||
|
||||
// Reload with a workflow reflex added
|
||||
const newConfig: NerveConfig = {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [{ kind: "workflow", workflow: "new-workflow", on: ["cpu-usage"] }],
|
||||
workflows: { "new-workflow": { concurrency: 1, overflow: "drop" } },
|
||||
};
|
||||
kernel.reloadConfig(newConfig);
|
||||
|
||||
// Now emit a signal — should trigger the new workflow
|
||||
kernel.bus.emit({ id: 2, senseId: "cpu-usage", payload: "reload-test", ts: Date.now() });
|
||||
|
||||
const startThreadCall = mockChildren
|
||||
.flatMap((c) => (c.send as ReturnType<typeof vi.fn>).mock.calls as [unknown][])
|
||||
.find(
|
||||
([msg]) =>
|
||||
msg !== null &&
|
||||
typeof msg === "object" &&
|
||||
(msg as Record<string, unknown>).type === "start-thread" &&
|
||||
(msg as Record<string, unknown>).workflow === "new-workflow",
|
||||
);
|
||||
|
||||
expect(startThreadCall).toBeDefined();
|
||||
|
||||
const stopPromise = kernel.stop();
|
||||
await vi.runAllTimersAsync();
|
||||
await stopPromise;
|
||||
});
|
||||
|
||||
it("old workflow reflexes are removed after reloadConfig", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const initialConfig = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [{ kind: "workflow", workflow: "old-workflow", on: ["cpu-usage"] }],
|
||||
workflows: { "old-workflow": { concurrency: 1, overflow: "drop" } },
|
||||
});
|
||||
|
||||
const kernel = createKernel(initialConfig, "/tmp/nerve-test", {
|
||||
workerScript: "fake-worker.js",
|
||||
logStore,
|
||||
});
|
||||
|
||||
// Reload with the workflow reflex removed
|
||||
const newConfig: NerveConfig = {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
};
|
||||
kernel.reloadConfig(newConfig);
|
||||
|
||||
// Clear send history
|
||||
for (const c of mockChildren) {
|
||||
(c.send as ReturnType<typeof vi.fn>).mockClear();
|
||||
}
|
||||
|
||||
// Emit a signal — old-workflow should NOT be triggered
|
||||
kernel.bus.emit({ id: 3, senseId: "cpu-usage", payload: "after-reload", ts: Date.now() });
|
||||
|
||||
const startThreadCall = mockChildren
|
||||
.flatMap((c) => (c.send as ReturnType<typeof vi.fn>).mock.calls as [unknown][])
|
||||
.find(
|
||||
([msg]) =>
|
||||
msg !== null &&
|
||||
typeof msg === "object" &&
|
||||
(msg as Record<string, unknown>).type === "start-thread",
|
||||
);
|
||||
|
||||
expect(startThreadCall).toBeUndefined();
|
||||
|
||||
const stopPromise = kernel.stop();
|
||||
await vi.runAllTimersAsync();
|
||||
await stopPromise;
|
||||
});
|
||||
});
|
||||
|
||||
describe("graceful shutdown stops workflow workers", () => {
|
||||
it("stop() resolves after workflow workers exit", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [{ kind: "workflow", workflow: "shutdown-test", on: ["cpu-usage"] }],
|
||||
workflows: { "shutdown-test": { concurrency: 1, overflow: "drop" } },
|
||||
});
|
||||
|
||||
const kernel = createKernel(config, "/tmp/nerve-test", {
|
||||
workerScript: "fake-worker.js",
|
||||
logStore,
|
||||
});
|
||||
|
||||
// Trigger a workflow so a worker is spawned
|
||||
kernel.bus.emit({ id: 1, senseId: "cpu-usage", payload: null, ts: Date.now() });
|
||||
|
||||
const stopPromise = kernel.stop();
|
||||
await vi.runAllTimersAsync();
|
||||
await expect(stopPromise).resolves.toBeUndefined();
|
||||
});
|
||||
|
||||
it("workflowManager is exposed on kernel", () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeConfig({
|
||||
workflows: { "my-wf": { concurrency: 1, overflow: "drop" } },
|
||||
});
|
||||
|
||||
const kernel = createKernel(config, "/tmp/nerve-test", {
|
||||
workerScript: "fake-worker.js",
|
||||
logStore,
|
||||
});
|
||||
|
||||
expect(kernel.workflowManager).toBeDefined();
|
||||
expect(typeof kernel.workflowManager.startWorkflow).toBe("function");
|
||||
expect(typeof kernel.workflowManager.activeCount).toBe("function");
|
||||
expect(typeof kernel.workflowManager.stop).toBe("function");
|
||||
|
||||
kernel.stop().catch(() => {});
|
||||
});
|
||||
|
||||
it("getHealth includes activeWorkflows count", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [{ kind: "workflow", workflow: "health-wf", on: ["cpu-usage"] }],
|
||||
workflows: { "health-wf": { concurrency: 2, overflow: "drop" } },
|
||||
});
|
||||
|
||||
const kernel = createKernel(config, "/tmp/nerve-test", {
|
||||
workerScript: "fake-worker.js",
|
||||
logStore,
|
||||
});
|
||||
|
||||
const health = kernel.getHealth();
|
||||
expect(health).toHaveProperty("activeWorkflows");
|
||||
expect(typeof health.activeWorkflows).toBe("number");
|
||||
|
||||
const stopPromise = kernel.stop();
|
||||
await vi.runAllTimersAsync();
|
||||
await stopPromise;
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -31,12 +31,15 @@ import { createReflexScheduler } from "./reflex-scheduler.js";
|
||||
import type { ReflexScheduler } from "./reflex-scheduler.js";
|
||||
import { createSignalBus } from "./signal-bus.js";
|
||||
import type { SignalBus } from "./signal-bus.js";
|
||||
import { createWorkflowManager } from "./workflow-manager.js";
|
||||
import type { WorkflowManager } from "./workflow-manager.js";
|
||||
|
||||
export type KernelHealth = {
|
||||
uptime: number;
|
||||
activeSenses: number;
|
||||
activeGroups: number;
|
||||
pendingComputes: number;
|
||||
activeWorkflows: number;
|
||||
memoryUsage: NodeJS.MemoryUsage;
|
||||
};
|
||||
|
||||
@@ -46,6 +49,7 @@ export type Kernel = {
|
||||
senseCount: number;
|
||||
bus: SignalBus;
|
||||
logStore: LogStore;
|
||||
workflowManager: WorkflowManager;
|
||||
/** Resolves when all workers have sent their initial "ready" message. */
|
||||
ready: Promise<void>;
|
||||
/** Returns the PID of the worker process for a given group, or null if not found. */
|
||||
@@ -143,6 +147,8 @@ export function createKernel(
|
||||
return _signalIdCounter;
|
||||
}
|
||||
|
||||
const workflowManager = createWorkflowManager(nerveRoot, config, logStore);
|
||||
|
||||
const groups = new Set<string>();
|
||||
for (const senseConfig of Object.values(config.senses)) {
|
||||
groups.add(senseConfig.group);
|
||||
@@ -267,7 +273,12 @@ export function createKernel(
|
||||
sendCompute(entry.process, senseName);
|
||||
}
|
||||
|
||||
scheduler = createReflexScheduler(config, bus, triggerFn, { logStore });
|
||||
scheduler = createReflexScheduler(config, bus, triggerFn, {
|
||||
logStore,
|
||||
workflowTriggerFn: (workflowName, payload) => {
|
||||
workflowManager.startWorkflow(workflowName, payload);
|
||||
},
|
||||
});
|
||||
|
||||
if (groups.size === 0) {
|
||||
readyResolve?.();
|
||||
@@ -350,7 +361,13 @@ export function createKernel(
|
||||
// Note: pending/throttled computes in the old scheduler are silently dropped here.
|
||||
// In-flight state is not preserved across reloadConfig.
|
||||
scheduler.stop();
|
||||
scheduler = createReflexScheduler(config, bus, triggerFn, { logStore });
|
||||
scheduler = createReflexScheduler(config, bus, triggerFn, {
|
||||
logStore,
|
||||
workflowTriggerFn: (workflowName, payload) => {
|
||||
workflowManager.startWorkflow(workflowName, payload);
|
||||
},
|
||||
});
|
||||
workflowManager.updateConfig(newConfig);
|
||||
const newGroups = collectGroups(newConfig);
|
||||
removeStaleGroups(oldGroups, newGroups);
|
||||
addNewGroups(oldGroups, newGroups);
|
||||
@@ -378,6 +395,7 @@ export function createKernel(
|
||||
activeSenses: Object.keys(config.senses).length,
|
||||
activeGroups: workers.size,
|
||||
pendingComputes: 0,
|
||||
activeWorkflows: workflowManager.totalActiveCount(),
|
||||
memoryUsage: process.memoryUsage(),
|
||||
};
|
||||
}
|
||||
@@ -441,6 +459,7 @@ export function createKernel(
|
||||
fileWatcher = null;
|
||||
}
|
||||
scheduler.stop();
|
||||
await workflowManager.stop();
|
||||
const exitPromises: Promise<void>[] = [];
|
||||
for (const entry of workers.values()) {
|
||||
sendShutdown(entry.process);
|
||||
@@ -469,6 +488,7 @@ export function createKernel(
|
||||
senseCount,
|
||||
bus,
|
||||
logStore,
|
||||
workflowManager,
|
||||
ready,
|
||||
getWorkerPid,
|
||||
triggerCompute: triggerFn,
|
||||
|
||||
@@ -16,6 +16,9 @@ import type { SignalBus, Unsubscribe } from "./signal-bus.js";
|
||||
/** Sends a compute message to the worker responsible for the given sense. */
|
||||
export type TriggerFn = (senseName: string) => void;
|
||||
|
||||
/** Triggers a workflow run in response to a signal. */
|
||||
export type WorkflowTriggerFn = (workflowName: string, payload: unknown) => void;
|
||||
|
||||
/** Per-sense mutable state tracked by the scheduler. */
|
||||
type SenseState = {
|
||||
lastComputeAt: number;
|
||||
@@ -37,6 +40,7 @@ function makeSenseState(): SenseState {
|
||||
|
||||
export type ReflexSchedulerOptions = {
|
||||
logStore?: LogStore;
|
||||
workflowTriggerFn?: WorkflowTriggerFn;
|
||||
};
|
||||
|
||||
/**
|
||||
@@ -153,6 +157,21 @@ export function createReflexScheduler(
|
||||
}
|
||||
|
||||
for (const reflex of config.reflexes) {
|
||||
if (reflex.kind === "workflow") {
|
||||
if (opts?.workflowTriggerFn !== undefined && reflex.on !== null && reflex.on.length > 0) {
|
||||
const workflowTriggerFn = opts.workflowTriggerFn;
|
||||
const workflowName = reflex.workflow;
|
||||
const watchedSenses = new Set(reflex.on);
|
||||
const unsub = bus.subscribe((signal) => {
|
||||
if (watchedSenses.has(signal.senseId)) {
|
||||
workflowTriggerFn(workflowName, signal.payload);
|
||||
}
|
||||
});
|
||||
unsubscribers.push(unsub);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if (reflex.kind !== "sense") continue;
|
||||
const senseReflex = reflex;
|
||||
const senseName = senseReflex.sense;
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { readFileSync, readdirSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
import { mkdirSync, readFileSync, readdirSync } from "node:fs";
|
||||
import { dirname, join } from "node:path";
|
||||
|
||||
import Database from "better-sqlite3";
|
||||
import { drizzle } from "drizzle-orm/better-sqlite3";
|
||||
@@ -128,6 +128,7 @@ export function openSenseDb(
|
||||
let sqlite: Database.Database;
|
||||
|
||||
try {
|
||||
mkdirSync(dirname(dbPath), { recursive: true });
|
||||
sqlite = new Database(dbPath);
|
||||
// WAL mode for better concurrent read performance
|
||||
sqlite.pragma("journal_mode = WAL");
|
||||
|
||||
@@ -24,6 +24,10 @@ export type WorkflowManager = {
|
||||
activeCount: (workflowName: string) => number;
|
||||
/** Number of pending queued threads waiting to run for a workflow. */
|
||||
queueLength: (workflowName: string) => number;
|
||||
/** Total active workflow threads across all workflows. */
|
||||
totalActiveCount: () => number;
|
||||
/** Update the config reference (e.g. after hot reload). Active workers are unaffected. */
|
||||
updateConfig: (newConfig: NerveConfig) => void;
|
||||
/** Gracefully shut down all workflow workers. */
|
||||
stop: () => Promise<void>;
|
||||
};
|
||||
@@ -97,7 +101,7 @@ function waitForExit(child: ChildProcess, timeoutMs: number): Promise<void> {
|
||||
|
||||
export function createWorkflowManager(
|
||||
nerveRoot: string,
|
||||
config: NerveConfig,
|
||||
initialConfig: NerveConfig,
|
||||
logStore: LogStore,
|
||||
): WorkflowManager {
|
||||
const workerScript = resolveWorkerScript();
|
||||
@@ -105,6 +109,7 @@ export function createWorkflowManager(
|
||||
const states = new Map<string, WorkflowState>();
|
||||
const workers = new Map<string, WorkerEntry>();
|
||||
let stopped = false;
|
||||
let config = initialConfig;
|
||||
|
||||
function getOrCreateState(workflowName: string): WorkflowState {
|
||||
let state = states.get(workflowName);
|
||||
@@ -348,6 +353,18 @@ export function createWorkflowManager(
|
||||
return states.get(workflowName)?.queue.length ?? 0;
|
||||
}
|
||||
|
||||
function totalActiveCount(): number {
|
||||
let total = 0;
|
||||
for (const state of states.values()) {
|
||||
total += state.active.size;
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
function updateConfig(newConfig: NerveConfig): void {
|
||||
config = newConfig;
|
||||
}
|
||||
|
||||
async function stop(): Promise<void> {
|
||||
stopped = true;
|
||||
const exitPromises: Promise<void>[] = [];
|
||||
@@ -359,5 +376,5 @@ export function createWorkflowManager(
|
||||
workers.clear();
|
||||
}
|
||||
|
||||
return { startWorkflow, activeCount, queueLength, stop };
|
||||
return { startWorkflow, activeCount, queueLength, totalActiveCount, updateConfig, stop };
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user