feat: Workflow Engine Phase 2 — Kernel Integration #21

Merged
xiaomo merged 1 commits from feat/workflow-engine-phase2 into main 2026-04-22 12:45:44 +00:00
4 changed files with 478 additions and 4 deletions
@@ -0,0 +1,418 @@
/**
* Integration tests for Kernel + WorkflowManager integration.
*
* Verifies that sense signals trigger workflow runs via workflow reflexes,
* that workflow events are logged, that reloadConfig handles workflow changes,
* and that graceful shutdown stops workflow workers.
*
* Uses mocked child_process.fork to avoid real subprocesses.
*/
import { EventEmitter } from "node:events";
import type { NerveConfig } from "@uncaged/nerve-core";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
// ---------------------------------------------------------------------------
// Mock child_process.fork before importing kernel
// ---------------------------------------------------------------------------
type MockChild = EventEmitter & {
send: ReturnType<typeof vi.fn>;
kill: ReturnType<typeof vi.fn>;
connected: boolean;
exitCode: number | null;
pid: number;
};
const mockChildren: MockChild[] = [];
function makeMockChild(pid = 1): MockChild {
const child = new EventEmitter() as MockChild;
child.connected = true;
child.exitCode = null;
child.pid = pid;
child.send = vi.fn((msg: unknown) => {
if (
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "shutdown"
) {
setImmediate(() => {
child.exitCode = 0;
child.connected = false;
child.emit("exit", 0, null);
});
}
});
child.kill = vi.fn((_signal?: string) => {
child.exitCode = 1;
child.connected = false;
child.emit("exit", null, _signal ?? "SIGKILL");
});
return child;
}
vi.mock("node:child_process", () => ({
fork: vi.fn((_script: string, _args: string[], _opts: unknown) => {
const child = makeMockChild(mockChildren.length + 1);
mockChildren.push(child);
return child;
}),
}));
// Import after mock is set up
const { createKernel } = await import("../kernel.js");
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function makeLogStore() {
return {
append: vi.fn(),
query: vi.fn(() => []),
getMeta: vi.fn(() => null),
setMeta: vi.fn(),
upsertWorkflowRun: vi.fn(),
appendWithWorkflowUpdate: vi.fn(),
getWorkflowRun: vi.fn(() => null),
getActiveWorkflowRuns: vi.fn(() => []),
close: vi.fn(),
};
}
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
return {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
...overrides,
};
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describe("kernel + workflowManager integration", () => {
beforeEach(() => {
mockChildren.length = 0;
vi.useFakeTimers({ shouldAdvanceTime: true });
});
afterEach(async () => {
vi.useRealTimers();
vi.clearAllMocks();
});
describe("sense signal triggers workflow via reflex", () => {
it("calls workflowManager.startWorkflow when a sense signal fires on a workflow reflex", async () => {
const logStore = makeLogStore();
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "workflow", workflow: "my-workflow", on: ["cpu-usage"] }],
workflows: { "my-workflow": { concurrency: 2, overflow: "drop" } },
});
const kernel = createKernel(config, "/tmp/nerve-test", {
workerScript: "fake-worker.js",
logStore,
});
// Emit a signal from "cpu-usage" on the bus
const { createSignalBus } = await import("../signal-bus.js");
void createSignalBus; // ensure import resolves
kernel.bus.emit({ id: 1, senseId: "cpu-usage", payload: { value: 80 }, ts: Date.now() });
// The workflow worker should be spawned (one for the sense group, one for workflow)
// The sense group worker is mockChildren[0]; the workflow worker is mockChildren[1]
// We need to check that a start-thread message was sent to the workflow worker
const workflowWorker = mockChildren.find((c) =>
(c.send as ReturnType<typeof vi.fn>).mock.calls.some(
([msg]: [unknown]) =>
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "start-thread",
),
);
expect(workflowWorker).toBeDefined();
const stopPromise = kernel.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
it("passes the signal payload as triggerPayload to the workflow", async () => {
const logStore = makeLogStore();
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "workflow", workflow: "alert-workflow", on: ["cpu-usage"] }],
workflows: { "alert-workflow": { concurrency: 1, overflow: "drop" } },
});
const kernel = createKernel(config, "/tmp/nerve-test", {
workerScript: "fake-worker.js",
logStore,
});
const payload = { level: "critical", value: 99 };
kernel.bus.emit({ id: 1, senseId: "cpu-usage", payload, ts: Date.now() });
// Find the start-thread call and verify triggerPayload
const startThreadCall = mockChildren
.flatMap((c) => (c.send as ReturnType<typeof vi.fn>).mock.calls as [unknown][])
.find(
([msg]) =>
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "start-thread",
);
expect(startThreadCall).toBeDefined();
expect(startThreadCall?.[0]).toMatchObject({
type: "start-thread",
workflow: "alert-workflow",
triggerPayload: payload,
});
const stopPromise = kernel.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
it("does not trigger workflow when signal senseId is not in 'on' list", async () => {
const logStore = makeLogStore();
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"disk-io": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "workflow", workflow: "my-workflow", on: ["disk-io"] }],
workflows: { "my-workflow": { concurrency: 1, overflow: "drop" } },
});
const kernel = createKernel(config, "/tmp/nerve-test", {
workerScript: "fake-worker.js",
logStore,
});
// Emit signal from cpu-usage — NOT in the workflow's "on" list
kernel.bus.emit({ id: 1, senseId: "cpu-usage", payload: 50, ts: Date.now() });
// No workflow worker should have been spawned (only the sense group worker)
const workflowWorkerSpawned = mockChildren.some((c) =>
(c.send as ReturnType<typeof vi.fn>).mock.calls.some(
([msg]: [unknown]) =>
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "start-thread",
),
);
expect(workflowWorkerSpawned).toBe(false);
const stopPromise = kernel.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
});
describe("workflow events are logged", () => {
it("logs a 'started' event when workflow thread is triggered", async () => {
const logStore = makeLogStore();
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "workflow", workflow: "log-test-workflow", on: ["cpu-usage"] }],
workflows: { "log-test-workflow": { concurrency: 2, overflow: "drop" } },
});
const kernel = createKernel(config, "/tmp/nerve-test", {
workerScript: "fake-worker.js",
logStore,
});
kernel.bus.emit({ id: 1, senseId: "cpu-usage", payload: null, ts: Date.now() });
expect(logStore.upsertWorkflowRun).toHaveBeenCalledWith(
expect.objectContaining({ source: "workflow", type: "started" }),
expect.objectContaining({ workflow: "log-test-workflow", status: "started" }),
);
const stopPromise = kernel.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
});
describe("reloadConfig handles workflow changes", () => {
it("new workflow reflexes are active after reloadConfig", async () => {
const logStore = makeLogStore();
const initialConfig = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
});
const kernel = createKernel(initialConfig, "/tmp/nerve-test", {
workerScript: "fake-worker.js",
logStore,
});
// Reload with a workflow reflex added
const newConfig: NerveConfig = {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "workflow", workflow: "new-workflow", on: ["cpu-usage"] }],
workflows: { "new-workflow": { concurrency: 1, overflow: "drop" } },
};
kernel.reloadConfig(newConfig);
// Now emit a signal — should trigger the new workflow
kernel.bus.emit({ id: 2, senseId: "cpu-usage", payload: "reload-test", ts: Date.now() });
const startThreadCall = mockChildren
.flatMap((c) => (c.send as ReturnType<typeof vi.fn>).mock.calls as [unknown][])
.find(
([msg]) =>
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "start-thread" &&
(msg as Record<string, unknown>).workflow === "new-workflow",
);
expect(startThreadCall).toBeDefined();
const stopPromise = kernel.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
it("old workflow reflexes are removed after reloadConfig", async () => {
const logStore = makeLogStore();
const initialConfig = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "workflow", workflow: "old-workflow", on: ["cpu-usage"] }],
workflows: { "old-workflow": { concurrency: 1, overflow: "drop" } },
});
const kernel = createKernel(initialConfig, "/tmp/nerve-test", {
workerScript: "fake-worker.js",
logStore,
});
// Reload with the workflow reflex removed
const newConfig: NerveConfig = {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
};
kernel.reloadConfig(newConfig);
// Clear send history
for (const c of mockChildren) {
(c.send as ReturnType<typeof vi.fn>).mockClear();
}
// Emit a signal — old-workflow should NOT be triggered
kernel.bus.emit({ id: 3, senseId: "cpu-usage", payload: "after-reload", ts: Date.now() });
const startThreadCall = mockChildren
.flatMap((c) => (c.send as ReturnType<typeof vi.fn>).mock.calls as [unknown][])
.find(
([msg]) =>
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "start-thread",
);
expect(startThreadCall).toBeUndefined();
const stopPromise = kernel.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
});
describe("graceful shutdown stops workflow workers", () => {
it("stop() resolves after workflow workers exit", async () => {
const logStore = makeLogStore();
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "workflow", workflow: "shutdown-test", on: ["cpu-usage"] }],
workflows: { "shutdown-test": { concurrency: 1, overflow: "drop" } },
});
const kernel = createKernel(config, "/tmp/nerve-test", {
workerScript: "fake-worker.js",
logStore,
});
// Trigger a workflow so a worker is spawned
kernel.bus.emit({ id: 1, senseId: "cpu-usage", payload: null, ts: Date.now() });
const stopPromise = kernel.stop();
await vi.runAllTimersAsync();
await expect(stopPromise).resolves.toBeUndefined();
});
it("workflowManager is exposed on kernel", () => {
const logStore = makeLogStore();
const config = makeConfig({
workflows: { "my-wf": { concurrency: 1, overflow: "drop" } },
});
const kernel = createKernel(config, "/tmp/nerve-test", {
workerScript: "fake-worker.js",
logStore,
});
expect(kernel.workflowManager).toBeDefined();
expect(typeof kernel.workflowManager.startWorkflow).toBe("function");
expect(typeof kernel.workflowManager.activeCount).toBe("function");
expect(typeof kernel.workflowManager.stop).toBe("function");
kernel.stop().catch(() => {});
});
it("getHealth includes activeWorkflows count", async () => {
const logStore = makeLogStore();
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "workflow", workflow: "health-wf", on: ["cpu-usage"] }],
workflows: { "health-wf": { concurrency: 2, overflow: "drop" } },
});
const kernel = createKernel(config, "/tmp/nerve-test", {
workerScript: "fake-worker.js",
logStore,
});
const health = kernel.getHealth();
expect(health).toHaveProperty("activeWorkflows");
expect(typeof health.activeWorkflows).toBe("number");
const stopPromise = kernel.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
});
});
+22 -2
View File
@@ -31,12 +31,15 @@ import { createReflexScheduler } from "./reflex-scheduler.js";
import type { ReflexScheduler } from "./reflex-scheduler.js";
import { createSignalBus } from "./signal-bus.js";
import type { SignalBus } from "./signal-bus.js";
import { createWorkflowManager } from "./workflow-manager.js";
import type { WorkflowManager } from "./workflow-manager.js";
export type KernelHealth = {
uptime: number;
activeSenses: number;
activeGroups: number;
pendingComputes: number;
activeWorkflows: number;
memoryUsage: NodeJS.MemoryUsage;
};
@@ -46,6 +49,7 @@ export type Kernel = {
senseCount: number;
bus: SignalBus;
logStore: LogStore;
workflowManager: WorkflowManager;
/** Resolves when all workers have sent their initial "ready" message. */
ready: Promise<void>;
/** Returns the PID of the worker process for a given group, or null if not found. */
@@ -143,6 +147,8 @@ export function createKernel(
return _signalIdCounter;
}
const workflowManager = createWorkflowManager(nerveRoot, config, logStore);
const groups = new Set<string>();
for (const senseConfig of Object.values(config.senses)) {
groups.add(senseConfig.group);
@@ -267,7 +273,12 @@ export function createKernel(
sendCompute(entry.process, senseName);
}
scheduler = createReflexScheduler(config, bus, triggerFn, { logStore });
scheduler = createReflexScheduler(config, bus, triggerFn, {
logStore,
workflowTriggerFn: (workflowName, payload) => {
workflowManager.startWorkflow(workflowName, payload);
},
});
if (groups.size === 0) {
readyResolve?.();
@@ -350,7 +361,13 @@ export function createKernel(
// Note: pending/throttled computes in the old scheduler are silently dropped here.
// In-flight state is not preserved across reloadConfig.
scheduler.stop();
scheduler = createReflexScheduler(config, bus, triggerFn, { logStore });
scheduler = createReflexScheduler(config, bus, triggerFn, {
logStore,
workflowTriggerFn: (workflowName, payload) => {
workflowManager.startWorkflow(workflowName, payload);
},
});
workflowManager.updateConfig(newConfig);
const newGroups = collectGroups(newConfig);
removeStaleGroups(oldGroups, newGroups);
addNewGroups(oldGroups, newGroups);
@@ -378,6 +395,7 @@ export function createKernel(
activeSenses: Object.keys(config.senses).length,
activeGroups: workers.size,
pendingComputes: 0,
activeWorkflows: workflowManager.totalActiveCount(),
memoryUsage: process.memoryUsage(),
};
}
@@ -441,6 +459,7 @@ export function createKernel(
fileWatcher = null;
}
scheduler.stop();
await workflowManager.stop();
const exitPromises: Promise<void>[] = [];
for (const entry of workers.values()) {
sendShutdown(entry.process);
@@ -469,6 +488,7 @@ export function createKernel(
senseCount,
bus,
logStore,
workflowManager,
ready,
getWorkerPid,
triggerCompute: triggerFn,
+19
View File
@@ -16,6 +16,9 @@ import type { SignalBus, Unsubscribe } from "./signal-bus.js";
/** Sends a compute message to the worker responsible for the given sense. */
export type TriggerFn = (senseName: string) => void;
/** Triggers a workflow run in response to a signal. */
export type WorkflowTriggerFn = (workflowName: string, payload: unknown) => void;
/** Per-sense mutable state tracked by the scheduler. */
type SenseState = {
lastComputeAt: number;
@@ -37,6 +40,7 @@ function makeSenseState(): SenseState {
export type ReflexSchedulerOptions = {
logStore?: LogStore;
workflowTriggerFn?: WorkflowTriggerFn;
};
/**
@@ -153,6 +157,21 @@ export function createReflexScheduler(
}
for (const reflex of config.reflexes) {
if (reflex.kind === "workflow") {
if (opts?.workflowTriggerFn !== undefined && reflex.on !== null && reflex.on.length > 0) {
const workflowTriggerFn = opts.workflowTriggerFn;
const workflowName = reflex.workflow;
const watchedSenses = new Set(reflex.on);
const unsub = bus.subscribe((signal) => {
if (watchedSenses.has(signal.senseId)) {
workflowTriggerFn(workflowName, signal.payload);
}
});
unsubscribers.push(unsub);
}
continue;
}
if (reflex.kind !== "sense") continue;
const senseReflex = reflex;
const senseName = senseReflex.sense;
+19 -2
View File
@@ -24,6 +24,10 @@ export type WorkflowManager = {
activeCount: (workflowName: string) => number;
/** Number of pending queued threads waiting to run for a workflow. */
queueLength: (workflowName: string) => number;
/** Total active workflow threads across all workflows. */
totalActiveCount: () => number;
/** Update the config reference (e.g. after hot reload). Active workers are unaffected. */
updateConfig: (newConfig: NerveConfig) => void;
/** Gracefully shut down all workflow workers. */
stop: () => Promise<void>;
};
@@ -97,7 +101,7 @@ function waitForExit(child: ChildProcess, timeoutMs: number): Promise<void> {
export function createWorkflowManager(
nerveRoot: string,
config: NerveConfig,
initialConfig: NerveConfig,
logStore: LogStore,
): WorkflowManager {
const workerScript = resolveWorkerScript();
@@ -105,6 +109,7 @@ export function createWorkflowManager(
const states = new Map<string, WorkflowState>();
const workers = new Map<string, WorkerEntry>();
let stopped = false;
let config = initialConfig;
function getOrCreateState(workflowName: string): WorkflowState {
let state = states.get(workflowName);
@@ -348,6 +353,18 @@ export function createWorkflowManager(
return states.get(workflowName)?.queue.length ?? 0;
}
function totalActiveCount(): number {
let total = 0;
for (const state of states.values()) {
total += state.active.size;
}
return total;
}
function updateConfig(newConfig: NerveConfig): void {
config = newConfig;
}
async function stop(): Promise<void> {
stopped = true;
const exitPromises: Promise<void>[] = [];
@@ -359,5 +376,5 @@ export function createWorkflowManager(
workers.clear();
}
return { startWorkflow, activeCount, queueLength, stop };
return { startWorkflow, activeCount, queueLength, totalActiveCount, updateConfig, stop };
}