From f12f187d8df9b7b3f3282455605340d58ac136d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Wed, 22 Apr 2026 12:40:57 +0000 Subject: [PATCH] =?UTF-8?q?feat(daemon):=20Phase=202=20=E2=80=94=20kernel?= =?UTF-8?q?=20=E2=86=94=20workflow=20integration?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - kernel.ts: create WorkflowManager, expose on Kernel, wire into ReflexScheduler via workflowTriggerFn, add activeWorkflows to KernelHealth, graceful shutdown ordering - reflex-scheduler.ts: handle kind:'workflow' reflexes — subscribe to bus signals and delegate to workflowTriggerFn - workflow-manager.ts: add totalActiveCount(), updateConfig() for hot reload support All 140 tests pass. 小橘 🍊(NEKO Team) --- .../kernel-workflow-integration.test.ts | 418 ++++++++++++++++++ packages/daemon/src/kernel.ts | 24 +- packages/daemon/src/reflex-scheduler.ts | 19 + packages/daemon/src/workflow-manager.ts | 21 +- 4 files changed, 478 insertions(+), 4 deletions(-) create mode 100644 packages/daemon/src/__tests__/kernel-workflow-integration.test.ts diff --git a/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts b/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts new file mode 100644 index 0000000..35e83d3 --- /dev/null +++ b/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts @@ -0,0 +1,418 @@ +/** + * Integration tests for Kernel + WorkflowManager integration. + * + * Verifies that sense signals trigger workflow runs via workflow reflexes, + * that workflow events are logged, that reloadConfig handles workflow changes, + * and that graceful shutdown stops workflow workers. + * + * Uses mocked child_process.fork to avoid real subprocesses. + */ + +import { EventEmitter } from "node:events"; + +import type { NerveConfig } from "@uncaged/nerve-core"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +// --------------------------------------------------------------------------- +// Mock child_process.fork before importing kernel +// --------------------------------------------------------------------------- + +type MockChild = EventEmitter & { + send: ReturnType; + kill: ReturnType; + connected: boolean; + exitCode: number | null; + pid: number; +}; + +const mockChildren: MockChild[] = []; + +function makeMockChild(pid = 1): MockChild { + const child = new EventEmitter() as MockChild; + child.connected = true; + child.exitCode = null; + child.pid = pid; + child.send = vi.fn((msg: unknown) => { + if ( + msg !== null && + typeof msg === "object" && + (msg as Record).type === "shutdown" + ) { + setImmediate(() => { + child.exitCode = 0; + child.connected = false; + child.emit("exit", 0, null); + }); + } + }); + child.kill = vi.fn((_signal?: string) => { + child.exitCode = 1; + child.connected = false; + child.emit("exit", null, _signal ?? "SIGKILL"); + }); + return child; +} + +vi.mock("node:child_process", () => ({ + fork: vi.fn((_script: string, _args: string[], _opts: unknown) => { + const child = makeMockChild(mockChildren.length + 1); + mockChildren.push(child); + return child; + }), +})); + +// Import after mock is set up +const { createKernel } = await import("../kernel.js"); + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function makeLogStore() { + return { + append: vi.fn(), + query: vi.fn(() => []), + getMeta: vi.fn(() => null), + setMeta: vi.fn(), + upsertWorkflowRun: vi.fn(), + appendWithWorkflowUpdate: vi.fn(), + getWorkflowRun: vi.fn(() => null), + getActiveWorkflowRuns: vi.fn(() => []), + close: vi.fn(), + }; +} + +function makeConfig(overrides: Partial = {}): NerveConfig { + return { + senses: { + "cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + }, + reflexes: [], + workflows: null, + ...overrides, + }; +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe("kernel + workflowManager integration", () => { + beforeEach(() => { + mockChildren.length = 0; + vi.useFakeTimers({ shouldAdvanceTime: true }); + }); + + afterEach(async () => { + vi.useRealTimers(); + vi.clearAllMocks(); + }); + + describe("sense signal triggers workflow via reflex", () => { + it("calls workflowManager.startWorkflow when a sense signal fires on a workflow reflex", async () => { + const logStore = makeLogStore(); + const config = makeConfig({ + senses: { + "cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + }, + reflexes: [{ kind: "workflow", workflow: "my-workflow", on: ["cpu-usage"] }], + workflows: { "my-workflow": { concurrency: 2, overflow: "drop" } }, + }); + + const kernel = createKernel(config, "/tmp/nerve-test", { + workerScript: "fake-worker.js", + logStore, + }); + + // Emit a signal from "cpu-usage" on the bus + const { createSignalBus } = await import("../signal-bus.js"); + void createSignalBus; // ensure import resolves + kernel.bus.emit({ id: 1, senseId: "cpu-usage", payload: { value: 80 }, ts: Date.now() }); + + // The workflow worker should be spawned (one for the sense group, one for workflow) + // The sense group worker is mockChildren[0]; the workflow worker is mockChildren[1] + // We need to check that a start-thread message was sent to the workflow worker + const workflowWorker = mockChildren.find((c) => + (c.send as ReturnType).mock.calls.some( + ([msg]: [unknown]) => + msg !== null && + typeof msg === "object" && + (msg as Record).type === "start-thread", + ), + ); + expect(workflowWorker).toBeDefined(); + + const stopPromise = kernel.stop(); + await vi.runAllTimersAsync(); + await stopPromise; + }); + + it("passes the signal payload as triggerPayload to the workflow", async () => { + const logStore = makeLogStore(); + const config = makeConfig({ + senses: { + "cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + }, + reflexes: [{ kind: "workflow", workflow: "alert-workflow", on: ["cpu-usage"] }], + workflows: { "alert-workflow": { concurrency: 1, overflow: "drop" } }, + }); + + const kernel = createKernel(config, "/tmp/nerve-test", { + workerScript: "fake-worker.js", + logStore, + }); + + const payload = { level: "critical", value: 99 }; + kernel.bus.emit({ id: 1, senseId: "cpu-usage", payload, ts: Date.now() }); + + // Find the start-thread call and verify triggerPayload + const startThreadCall = mockChildren + .flatMap((c) => (c.send as ReturnType).mock.calls as [unknown][]) + .find( + ([msg]) => + msg !== null && + typeof msg === "object" && + (msg as Record).type === "start-thread", + ); + + expect(startThreadCall).toBeDefined(); + expect(startThreadCall?.[0]).toMatchObject({ + type: "start-thread", + workflow: "alert-workflow", + triggerPayload: payload, + }); + + const stopPromise = kernel.stop(); + await vi.runAllTimersAsync(); + await stopPromise; + }); + + it("does not trigger workflow when signal senseId is not in 'on' list", async () => { + const logStore = makeLogStore(); + const config = makeConfig({ + senses: { + "cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + "disk-io": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + }, + reflexes: [{ kind: "workflow", workflow: "my-workflow", on: ["disk-io"] }], + workflows: { "my-workflow": { concurrency: 1, overflow: "drop" } }, + }); + + const kernel = createKernel(config, "/tmp/nerve-test", { + workerScript: "fake-worker.js", + logStore, + }); + + // Emit signal from cpu-usage — NOT in the workflow's "on" list + kernel.bus.emit({ id: 1, senseId: "cpu-usage", payload: 50, ts: Date.now() }); + + // No workflow worker should have been spawned (only the sense group worker) + const workflowWorkerSpawned = mockChildren.some((c) => + (c.send as ReturnType).mock.calls.some( + ([msg]: [unknown]) => + msg !== null && + typeof msg === "object" && + (msg as Record).type === "start-thread", + ), + ); + expect(workflowWorkerSpawned).toBe(false); + + const stopPromise = kernel.stop(); + await vi.runAllTimersAsync(); + await stopPromise; + }); + }); + + describe("workflow events are logged", () => { + it("logs a 'started' event when workflow thread is triggered", async () => { + const logStore = makeLogStore(); + const config = makeConfig({ + senses: { + "cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + }, + reflexes: [{ kind: "workflow", workflow: "log-test-workflow", on: ["cpu-usage"] }], + workflows: { "log-test-workflow": { concurrency: 2, overflow: "drop" } }, + }); + + const kernel = createKernel(config, "/tmp/nerve-test", { + workerScript: "fake-worker.js", + logStore, + }); + + kernel.bus.emit({ id: 1, senseId: "cpu-usage", payload: null, ts: Date.now() }); + + expect(logStore.upsertWorkflowRun).toHaveBeenCalledWith( + expect.objectContaining({ source: "workflow", type: "started" }), + expect.objectContaining({ workflow: "log-test-workflow", status: "started" }), + ); + + const stopPromise = kernel.stop(); + await vi.runAllTimersAsync(); + await stopPromise; + }); + }); + + describe("reloadConfig handles workflow changes", () => { + it("new workflow reflexes are active after reloadConfig", async () => { + const logStore = makeLogStore(); + const initialConfig = makeConfig({ + senses: { + "cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + }, + reflexes: [], + workflows: null, + }); + + const kernel = createKernel(initialConfig, "/tmp/nerve-test", { + workerScript: "fake-worker.js", + logStore, + }); + + // Reload with a workflow reflex added + const newConfig: NerveConfig = { + senses: { + "cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + }, + reflexes: [{ kind: "workflow", workflow: "new-workflow", on: ["cpu-usage"] }], + workflows: { "new-workflow": { concurrency: 1, overflow: "drop" } }, + }; + kernel.reloadConfig(newConfig); + + // Now emit a signal — should trigger the new workflow + kernel.bus.emit({ id: 2, senseId: "cpu-usage", payload: "reload-test", ts: Date.now() }); + + const startThreadCall = mockChildren + .flatMap((c) => (c.send as ReturnType).mock.calls as [unknown][]) + .find( + ([msg]) => + msg !== null && + typeof msg === "object" && + (msg as Record).type === "start-thread" && + (msg as Record).workflow === "new-workflow", + ); + + expect(startThreadCall).toBeDefined(); + + const stopPromise = kernel.stop(); + await vi.runAllTimersAsync(); + await stopPromise; + }); + + it("old workflow reflexes are removed after reloadConfig", async () => { + const logStore = makeLogStore(); + const initialConfig = makeConfig({ + senses: { + "cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + }, + reflexes: [{ kind: "workflow", workflow: "old-workflow", on: ["cpu-usage"] }], + workflows: { "old-workflow": { concurrency: 1, overflow: "drop" } }, + }); + + const kernel = createKernel(initialConfig, "/tmp/nerve-test", { + workerScript: "fake-worker.js", + logStore, + }); + + // Reload with the workflow reflex removed + const newConfig: NerveConfig = { + senses: { + "cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + }, + reflexes: [], + workflows: null, + }; + kernel.reloadConfig(newConfig); + + // Clear send history + for (const c of mockChildren) { + (c.send as ReturnType).mockClear(); + } + + // Emit a signal — old-workflow should NOT be triggered + kernel.bus.emit({ id: 3, senseId: "cpu-usage", payload: "after-reload", ts: Date.now() }); + + const startThreadCall = mockChildren + .flatMap((c) => (c.send as ReturnType).mock.calls as [unknown][]) + .find( + ([msg]) => + msg !== null && + typeof msg === "object" && + (msg as Record).type === "start-thread", + ); + + expect(startThreadCall).toBeUndefined(); + + const stopPromise = kernel.stop(); + await vi.runAllTimersAsync(); + await stopPromise; + }); + }); + + describe("graceful shutdown stops workflow workers", () => { + it("stop() resolves after workflow workers exit", async () => { + const logStore = makeLogStore(); + const config = makeConfig({ + senses: { + "cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + }, + reflexes: [{ kind: "workflow", workflow: "shutdown-test", on: ["cpu-usage"] }], + workflows: { "shutdown-test": { concurrency: 1, overflow: "drop" } }, + }); + + const kernel = createKernel(config, "/tmp/nerve-test", { + workerScript: "fake-worker.js", + logStore, + }); + + // Trigger a workflow so a worker is spawned + kernel.bus.emit({ id: 1, senseId: "cpu-usage", payload: null, ts: Date.now() }); + + const stopPromise = kernel.stop(); + await vi.runAllTimersAsync(); + await expect(stopPromise).resolves.toBeUndefined(); + }); + + it("workflowManager is exposed on kernel", () => { + const logStore = makeLogStore(); + const config = makeConfig({ + workflows: { "my-wf": { concurrency: 1, overflow: "drop" } }, + }); + + const kernel = createKernel(config, "/tmp/nerve-test", { + workerScript: "fake-worker.js", + logStore, + }); + + expect(kernel.workflowManager).toBeDefined(); + expect(typeof kernel.workflowManager.startWorkflow).toBe("function"); + expect(typeof kernel.workflowManager.activeCount).toBe("function"); + expect(typeof kernel.workflowManager.stop).toBe("function"); + + kernel.stop().catch(() => {}); + }); + + it("getHealth includes activeWorkflows count", async () => { + const logStore = makeLogStore(); + const config = makeConfig({ + senses: { + "cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null }, + }, + reflexes: [{ kind: "workflow", workflow: "health-wf", on: ["cpu-usage"] }], + workflows: { "health-wf": { concurrency: 2, overflow: "drop" } }, + }); + + const kernel = createKernel(config, "/tmp/nerve-test", { + workerScript: "fake-worker.js", + logStore, + }); + + const health = kernel.getHealth(); + expect(health).toHaveProperty("activeWorkflows"); + expect(typeof health.activeWorkflows).toBe("number"); + + const stopPromise = kernel.stop(); + await vi.runAllTimersAsync(); + await stopPromise; + }); + }); +}); diff --git a/packages/daemon/src/kernel.ts b/packages/daemon/src/kernel.ts index 6ef4d04..1fd65d4 100644 --- a/packages/daemon/src/kernel.ts +++ b/packages/daemon/src/kernel.ts @@ -31,12 +31,15 @@ import { createReflexScheduler } from "./reflex-scheduler.js"; import type { ReflexScheduler } from "./reflex-scheduler.js"; import { createSignalBus } from "./signal-bus.js"; import type { SignalBus } from "./signal-bus.js"; +import { createWorkflowManager } from "./workflow-manager.js"; +import type { WorkflowManager } from "./workflow-manager.js"; export type KernelHealth = { uptime: number; activeSenses: number; activeGroups: number; pendingComputes: number; + activeWorkflows: number; memoryUsage: NodeJS.MemoryUsage; }; @@ -46,6 +49,7 @@ export type Kernel = { senseCount: number; bus: SignalBus; logStore: LogStore; + workflowManager: WorkflowManager; /** Resolves when all workers have sent their initial "ready" message. */ ready: Promise; /** Returns the PID of the worker process for a given group, or null if not found. */ @@ -143,6 +147,8 @@ export function createKernel( return _signalIdCounter; } + const workflowManager = createWorkflowManager(nerveRoot, config, logStore); + const groups = new Set(); for (const senseConfig of Object.values(config.senses)) { groups.add(senseConfig.group); @@ -267,7 +273,12 @@ export function createKernel( sendCompute(entry.process, senseName); } - scheduler = createReflexScheduler(config, bus, triggerFn, { logStore }); + scheduler = createReflexScheduler(config, bus, triggerFn, { + logStore, + workflowTriggerFn: (workflowName, payload) => { + workflowManager.startWorkflow(workflowName, payload); + }, + }); if (groups.size === 0) { readyResolve?.(); @@ -350,7 +361,13 @@ export function createKernel( // Note: pending/throttled computes in the old scheduler are silently dropped here. // In-flight state is not preserved across reloadConfig. scheduler.stop(); - scheduler = createReflexScheduler(config, bus, triggerFn, { logStore }); + scheduler = createReflexScheduler(config, bus, triggerFn, { + logStore, + workflowTriggerFn: (workflowName, payload) => { + workflowManager.startWorkflow(workflowName, payload); + }, + }); + workflowManager.updateConfig(newConfig); const newGroups = collectGroups(newConfig); removeStaleGroups(oldGroups, newGroups); addNewGroups(oldGroups, newGroups); @@ -378,6 +395,7 @@ export function createKernel( activeSenses: Object.keys(config.senses).length, activeGroups: workers.size, pendingComputes: 0, + activeWorkflows: workflowManager.totalActiveCount(), memoryUsage: process.memoryUsage(), }; } @@ -441,6 +459,7 @@ export function createKernel( fileWatcher = null; } scheduler.stop(); + await workflowManager.stop(); const exitPromises: Promise[] = []; for (const entry of workers.values()) { sendShutdown(entry.process); @@ -469,6 +488,7 @@ export function createKernel( senseCount, bus, logStore, + workflowManager, ready, getWorkerPid, triggerCompute: triggerFn, diff --git a/packages/daemon/src/reflex-scheduler.ts b/packages/daemon/src/reflex-scheduler.ts index d1c416d..f6d9253 100644 --- a/packages/daemon/src/reflex-scheduler.ts +++ b/packages/daemon/src/reflex-scheduler.ts @@ -16,6 +16,9 @@ import type { SignalBus, Unsubscribe } from "./signal-bus.js"; /** Sends a compute message to the worker responsible for the given sense. */ export type TriggerFn = (senseName: string) => void; +/** Triggers a workflow run in response to a signal. */ +export type WorkflowTriggerFn = (workflowName: string, payload: unknown) => void; + /** Per-sense mutable state tracked by the scheduler. */ type SenseState = { lastComputeAt: number; @@ -37,6 +40,7 @@ function makeSenseState(): SenseState { export type ReflexSchedulerOptions = { logStore?: LogStore; + workflowTriggerFn?: WorkflowTriggerFn; }; /** @@ -153,6 +157,21 @@ export function createReflexScheduler( } for (const reflex of config.reflexes) { + if (reflex.kind === "workflow") { + if (opts?.workflowTriggerFn !== undefined && reflex.on !== null && reflex.on.length > 0) { + const workflowTriggerFn = opts.workflowTriggerFn; + const workflowName = reflex.workflow; + const watchedSenses = new Set(reflex.on); + const unsub = bus.subscribe((signal) => { + if (watchedSenses.has(signal.senseId)) { + workflowTriggerFn(workflowName, signal.payload); + } + }); + unsubscribers.push(unsub); + } + continue; + } + if (reflex.kind !== "sense") continue; const senseReflex = reflex; const senseName = senseReflex.sense; diff --git a/packages/daemon/src/workflow-manager.ts b/packages/daemon/src/workflow-manager.ts index a963e0f..eda2113 100644 --- a/packages/daemon/src/workflow-manager.ts +++ b/packages/daemon/src/workflow-manager.ts @@ -24,6 +24,10 @@ export type WorkflowManager = { activeCount: (workflowName: string) => number; /** Number of pending queued threads waiting to run for a workflow. */ queueLength: (workflowName: string) => number; + /** Total active workflow threads across all workflows. */ + totalActiveCount: () => number; + /** Update the config reference (e.g. after hot reload). Active workers are unaffected. */ + updateConfig: (newConfig: NerveConfig) => void; /** Gracefully shut down all workflow workers. */ stop: () => Promise; }; @@ -97,7 +101,7 @@ function waitForExit(child: ChildProcess, timeoutMs: number): Promise { export function createWorkflowManager( nerveRoot: string, - config: NerveConfig, + initialConfig: NerveConfig, logStore: LogStore, ): WorkflowManager { const workerScript = resolveWorkerScript(); @@ -105,6 +109,7 @@ export function createWorkflowManager( const states = new Map(); const workers = new Map(); let stopped = false; + let config = initialConfig; function getOrCreateState(workflowName: string): WorkflowState { let state = states.get(workflowName); @@ -348,6 +353,18 @@ export function createWorkflowManager( return states.get(workflowName)?.queue.length ?? 0; } + function totalActiveCount(): number { + let total = 0; + for (const state of states.values()) { + total += state.active.size; + } + return total; + } + + function updateConfig(newConfig: NerveConfig): void { + config = newConfig; + } + async function stop(): Promise { stopped = true; const exitPromises: Promise[] = []; @@ -359,5 +376,5 @@ export function createWorkflowManager( workers.clear(); } - return { startWorkflow, activeCount, queueLength, stop }; + return { startWorkflow, activeCount, queueLength, totalActiveCount, updateConfig, stop }; } -- 2.43.0