diff --git a/examples/cpu-usage/index.ts b/examples/cpu-usage/index.ts index e063f54..690781c 100644 --- a/examples/cpu-usage/index.ts +++ b/examples/cpu-usage/index.ts @@ -8,10 +8,9 @@ import { samples } from "./schema.js"; * Read the 1-minute CPU load average, persist it, and emit a Signal. * * Returns `null` only if `loadavg` is unavailable (non-POSIX platforms). - * On every successful read a row is inserted and the value is returned, - * which causes the engine to emit a Signal. + * On every successful read a row is inserted and a Signal is emitted with the load value. */ -export async function compute(db: DrizzleDB, _peers: PeerMap): Promise { +export async function compute(db: DrizzleDB, _peers: PeerMap) { const [oneMin] = loadavg(); if (typeof oneMin !== "number" || Number.isNaN(oneMin)) { @@ -19,5 +18,5 @@ export async function compute(db: DrizzleDB, _peers: PeerMap): Promise { +export async function compute() { const health = await requestHealthFromKernel(); - return health; + return { signal: health, workflow: null }; } function requestHealthFromKernel(): Promise { diff --git a/packages/cli/src/__tests__/e2e-harness.ts b/packages/cli/src/__tests__/e2e-harness.ts index 87f3b26..2425ca6 100644 --- a/packages/cli/src/__tests__/e2e-harness.ts +++ b/packages/cli/src/__tests__/e2e-harness.ts @@ -134,7 +134,7 @@ SELECT 1; const counterIndexJs = `let _count = 0; export async function compute(_db, _peers, _options) { _count += 1; - return { count: _count }; + return { signal: { count: _count }, workflow: null }; } `; @@ -143,9 +143,17 @@ const counterIndexJsWithNoopWorkflow = `let _launched = false; export async function compute(_db, _peers, _options) { if (!_launched) { _launched = true; - return { workflow: "noop|3|e2e-archive" }; + return { + signal: { launched: true }, + workflow: { + name: "noop", + maxRounds: 3, + prompt: "e2e-archive", + dryRun: false, + }, + }; } - return { idle: true }; + return { signal: { idle: true }, workflow: null }; } `; diff --git a/packages/cli/src/__tests__/e2e-workflow.test.ts b/packages/cli/src/__tests__/e2e-workflow.test.ts index 9d44d4a..f28fe53 100644 --- a/packages/cli/src/__tests__/e2e-workflow.test.ts +++ b/packages/cli/src/__tests__/e2e-workflow.test.ts @@ -10,9 +10,6 @@ import { afterEach, describe, expect, it } from "vitest"; import { type TestDaemonHandle, runCli, startTestDaemon, stopTestDaemon } from "./e2e-harness.js"; -const PAYLOAD_A = JSON.stringify({ prompt: "alpha-e2e", maxRounds: 10 }); -const PAYLOAD_B = JSON.stringify({ prompt: "beta-e2e", maxRounds: 10 }); - async function waitForCompletedEchoRuns( logsDbPath: string, minCount: number, @@ -56,11 +53,27 @@ describe("e2e workflow CLI (real daemon)", () => { daemon = await startTestDaemon(); const logsDb = join(daemon.nerveRoot, "data", "logs.db"); - const t1 = await runCli(daemon, ["workflow", "trigger", "echo", "--payload", PAYLOAD_A]); + const t1 = await runCli(daemon, [ + "workflow", + "trigger", + "echo", + "--prompt", + "alpha-e2e", + "--max-rounds", + "10", + ]); expect(t1.exitCode).toBe(0); expect(t1.stdout).toContain("Triggered"); - const t2 = await runCli(daemon, ["workflow", "trigger", "echo", "--payload", PAYLOAD_B]); + const t2 = await runCli(daemon, [ + "workflow", + "trigger", + "echo", + "--prompt", + "beta-e2e", + "--max-rounds", + "10", + ]); expect(t2.exitCode).toBe(0); const activeRightAfter = await runCli(daemon, ["thread", "list"]); diff --git a/packages/cli/src/commands/create.ts b/packages/cli/src/commands/create.ts index 86f4021..36b1c1d 100644 --- a/packages/cli/src/commands/create.ts +++ b/packages/cli/src/commands/create.ts @@ -74,8 +74,11 @@ export function buildSenseIndexJs(senseId: string): string { */ export async function compute(db, peers, options) { return { - label: "${senseId}", - ts: Date.now(), + signal: { + label: "${senseId}", + ts: Date.now(), + }, + workflow: null, }; } `; diff --git a/packages/cli/src/commands/init.ts b/packages/cli/src/commands/init.ts index 7049da6..11471c3 100644 --- a/packages/cli/src/commands/init.ts +++ b/packages/cli/src/commands/init.ts @@ -97,9 +97,12 @@ export async function compute() { const loadPercent = totalTick === 0 ? 0 : ((totalTick - totalIdle) / totalTick) * 100; return { - model: cpuList[0]?.model ?? "unknown", - loadPercent: Math.round(loadPercent * 100) / 100, - ts: Date.now(), + signal: { + model: cpuList[0]?.model ?? "unknown", + loadPercent: Math.round(loadPercent * 100) / 100, + ts: Date.now(), + }, + workflow: null, }; } `; diff --git a/packages/cli/src/commands/workflow.ts b/packages/cli/src/commands/workflow.ts index 166340a..5613a36 100644 --- a/packages/cli/src/commands/workflow.ts +++ b/packages/cli/src/commands/workflow.ts @@ -224,12 +224,12 @@ export function partitionWorkflowMessage(msg: { }): PartitionedMessage { const roleStr = msg.role; const contentBody = msg.content; - const meta: Record = + const meta = msg.meta !== null && msg.meta !== undefined && typeof msg.meta === "object" ? isPlainRecord(msg.meta) ? msg.meta : (msg.meta as Record) - : {}; + : ({} as Record); return { roleStr, contentBody, meta }; } @@ -408,6 +408,21 @@ const workflowStatusCommand = defineCommand({ // nerve workflow trigger // --------------------------------------------------------------------------- +function readWorkspaceDefaultMaxRounds(): number { + const configPath = join(getNerveRoot(), "nerve.yaml"); + let raw: string; + try { + raw = readFileSync(configPath, "utf8"); + } catch { + return DEFAULT_ENGINE_MAX_ROUNDS; + } + const result = parseNerveConfig(raw); + if (!result.ok) { + return DEFAULT_ENGINE_MAX_ROUNDS; + } + return result.value.maxRounds; +} + const workflowTriggerCommand = defineCommand({ meta: { name: "trigger", @@ -418,31 +433,28 @@ const workflowTriggerCommand = defineCommand({ type: "positional", description: "The workflow name to trigger", }, - payload: { + maxRounds: { type: "string", - description: - 'JSON with optional "prompt" (string), "maxRounds" (number), and "dryRun" (boolean) for the workflow run (default: {})', - default: "{}", + description: "Max moderator rounds (default: nerve.yaml maxRounds)", + default: "", + }, + prompt: { + type: "string", + description: "Initial prompt for the workflow run", + default: "", + }, + dryRun: { + type: "boolean", + description: "Run the workflow in dry-run mode", + default: false, }, }, async run({ args }) { - let triggerPayload: unknown = {}; - try { - triggerPayload = JSON.parse(args.payload) as unknown; - } catch { - process.stderr.write(`❌ --payload must be valid JSON. Got: ${args.payload}\n`); - process.exit(1); - } - - let prompt = ""; - let maxRounds = DEFAULT_ENGINE_MAX_ROUNDS; - let dryRun = false; - if (isPlainRecord(triggerPayload)) { - const p = triggerPayload; - if (typeof p.prompt === "string") prompt = p.prompt; - if (typeof p.maxRounds === "number") maxRounds = p.maxRounds; - if (typeof p.dryRun === "boolean") dryRun = p.dryRun; - } + const prompt = args.prompt; + const defaultMax = readWorkspaceDefaultMaxRounds(); + const maxRounds = + args.maxRounds.length > 0 ? parseIntArg(args.maxRounds, defaultMax) : defaultMax; + const dryRun = args.dryRun; if (!isRemoteDaemonCli() && !isRunning()) { process.stderr.write("❌ Nerve daemon is not running. Start it with `nerve daemon start`.\n"); diff --git a/packages/core/src/__tests__/sense-workflow-directive.test.ts b/packages/core/src/__tests__/sense-workflow-directive.test.ts new file mode 100644 index 0000000..f54fab4 --- /dev/null +++ b/packages/core/src/__tests__/sense-workflow-directive.test.ts @@ -0,0 +1,116 @@ +import { describe, expect, it } from "vitest"; + +import { parseWorkflowTrigger, routeSenseComputeOutput } from "../sense-workflow-directive.js"; + +describe("parseWorkflowTrigger", () => { + it("accepts a valid trigger object", () => { + const r = parseWorkflowTrigger({ + name: "my-wf", + maxRounds: 3, + prompt: "go", + dryRun: true, + }); + expect(r.ok).toBe(true); + if (!r.ok) return; + expect(r.value).toEqual({ name: "my-wf", maxRounds: 3, prompt: "go", dryRun: true }); + }); + + it("trims workflow name", () => { + const r = parseWorkflowTrigger({ + name: " spaced ", + maxRounds: 1, + prompt: "", + dryRun: false, + }); + expect(r.ok).toBe(true); + if (!r.ok) return; + expect(r.value.name).toBe("spaced"); + }); + + it("rejects empty name", () => { + const r = parseWorkflowTrigger({ name: "", maxRounds: 1, prompt: "x", dryRun: false }); + expect(r.ok).toBe(false); + }); + + it("rejects non-integer maxRounds", () => { + const r = parseWorkflowTrigger({ + name: "w", + maxRounds: 1.5, + prompt: "", + dryRun: false, + }); + expect(r.ok).toBe(false); + }); + + it("rejects maxRounds < 1", () => { + const r = parseWorkflowTrigger({ name: "w", maxRounds: 0, prompt: "", dryRun: false }); + expect(r.ok).toBe(false); + }); + + it("rejects non-boolean dryRun", () => { + const r = parseWorkflowTrigger({ + name: "w", + maxRounds: 1, + prompt: "", + dryRun: "no" as unknown as boolean, + }); + expect(r.ok).toBe(false); + }); +}); + +describe("routeSenseComputeOutput", () => { + it("wraps non-record values as signal-only", () => { + const r = routeSenseComputeOutput(99); + expect(r.ok).toBe(true); + if (!r.ok) return; + expect(r.value).toEqual({ signal: 99, workflow: null }); + }); + + it("wraps plain objects without signal key as signal-only", () => { + const r = routeSenseComputeOutput({ count: 2 }); + expect(r.ok).toBe(true); + if (!r.ok) return; + expect(r.value).toEqual({ signal: { count: 2 }, workflow: null }); + }); + + it("parses explicit signal with null workflow", () => { + const r = routeSenseComputeOutput({ signal: { a: 1 }, workflow: null }); + expect(r.ok).toBe(true); + if (!r.ok) return; + expect(r.value).toEqual({ signal: { a: 1 }, workflow: null }); + }); + + it("parses explicit signal with workflow trigger", () => { + const r = routeSenseComputeOutput({ + signal: { x: true }, + workflow: { name: "wf", maxRounds: 2, prompt: "p", dryRun: false }, + }); + expect(r.ok).toBe(true); + if (!r.ok) return; + expect(r.value.signal).toEqual({ x: true }); + expect(r.value.workflow).toEqual({ + name: "wf", + maxRounds: 2, + prompt: "p", + dryRun: false, + }); + }); + + it("defaults missing workflow key to null", () => { + const r = routeSenseComputeOutput({ signal: 7 }); + expect(r.ok).toBe(true); + if (!r.ok) return; + expect(r.value).toEqual({ signal: 7, workflow: null }); + }); + + it("degrades to signal-only when workflow object is invalid", () => { + const r = routeSenseComputeOutput({ + signal: { v: 1 }, + workflow: { name: "w", maxRounds: 0, prompt: "", dryRun: false }, + }); + expect(r.ok).toBe(true); + if (!r.ok) return; + expect(r.value.signal).toEqual({ v: 1 }); + expect(r.value.workflow).toBeNull(); + }); +}); diff --git a/packages/core/src/config.ts b/packages/core/src/config.ts index eac2560..3f93ef9 100644 --- a/packages/core/src/config.ts +++ b/packages/core/src/config.ts @@ -36,6 +36,20 @@ export type NerveApiConfig = { host: string; }; +/** Parameters for starting a workflow from a Sense compute result (or CLI trigger). */ +export type WorkflowTrigger = { + name: string; + maxRounds: number; + prompt: string; + dryRun: boolean; +}; + +/** + * Sense `compute()` return: silence, or a signal payload with an optional workflow to start. + * `workflow: null` means signal only; signal is always emitted first when non-null. + */ +export type ComputeResult = null | { signal: T; workflow: WorkflowTrigger | null }; + export type NerveConfig = { /** Engine-wide default max moderator rounds (e.g. CLI workflow trigger when omitted). */ maxRounds: number; diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index f3b4aaf..acf951f 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -6,8 +6,10 @@ export type { WorkflowConfig, NerveApiConfig, NerveConfig, + WorkflowTrigger, + ComputeResult, } from "./config.js"; -export type { Signal, SenseInfo, SenseResult } from "./sense.js"; +export type { Signal, SenseInfo } from "./sense.js"; export { labelSenseTrigger, senseTriggerLabels } from "./sense-trigger-labels.js"; export type { WorkflowMessage, @@ -26,14 +28,8 @@ export { ok, err } from "./result.js"; export { parseNerveConfig } from "./parse-nerve-config.js"; export { isPlainRecord } from "./is-plain-record.js"; -export type { - ParsedSenseWorkflowDirective, - SenseComputeRoute, -} from "./sense-workflow-directive.js"; -export { - parseSenseWorkflowDirective, - routeSenseComputeOutput, -} from "./sense-workflow-directive.js"; +export type { RoutedSenseOutput } from "./sense-workflow-directive.js"; +export { parseWorkflowTrigger, routeSenseComputeOutput } from "./sense-workflow-directive.js"; export { isSenseInfo, isWorkflowStatus } from "./daemon-payload-guards.js"; export type { diff --git a/packages/core/src/sense-workflow-directive.ts b/packages/core/src/sense-workflow-directive.ts index cb01bc0..bb0ecdf 100644 --- a/packages/core/src/sense-workflow-directive.ts +++ b/packages/core/src/sense-workflow-directive.ts @@ -1,77 +1,56 @@ +import type { WorkflowTrigger } from "./config.js"; import { isPlainRecord } from "./is-plain-record.js"; import type { Result } from "./result.js"; import { err, ok } from "./result.js"; -/** Parsed `workflow-name|maxRounds|prompt` from a Sense compute return value. */ -export type ParsedSenseWorkflowDirective = { - workflowName: string; - maxRounds: number; - prompt: string; +/** Normalized non-null compute output for the kernel (unknown signal payload). */ +export type RoutedSenseOutput = { + signal: unknown; + workflow: WorkflowTrigger | null; }; /** - * Parses the pipe-separated `workflow` field from a Sense compute result. - * `prompt` may contain `|` — only the first two pipes delimit name and rounds. + * Validates a structured workflow trigger object from Sense compute or IPC. */ -export function parseSenseWorkflowDirective(field: string): Result { - const trimmed = field.trim(); - if (trimmed.length === 0) { - return err(new Error("workflow directive is empty")); +export function parseWorkflowTrigger(value: unknown): Result { + if (!isPlainRecord(value)) { + return err(new Error("workflow trigger must be a plain object")); } - const parts = trimmed.split("|"); - if (parts.length < 3) { - return err( - new Error( - `workflow directive must be "name|maxRounds|prompt" (got ${String(parts.length)} segment(s))`, - ), - ); + const nameRaw = value.name; + if (typeof nameRaw !== "string" || nameRaw.trim().length === 0) { + return err(new Error('workflow trigger: "name" must be a non-empty string')); } - const workflowName = (parts[0] ?? "").trim(); - if (workflowName.length === 0) { - return err(new Error("workflow directive: empty workflow name")); + const maxRounds = value.maxRounds; + if (typeof maxRounds !== "number" || !Number.isInteger(maxRounds) || maxRounds < 1) { + return err(new Error('workflow trigger: "maxRounds" must be an integer >= 1')); } - const roundsRaw = (parts[1] ?? "").trim(); - const maxRounds = Number.parseInt(roundsRaw, 10); - if (!Number.isInteger(maxRounds) || maxRounds < 1) { - return err(new Error(`workflow directive: invalid maxRounds "${roundsRaw}"`)); + const prompt = value.prompt; + if (typeof prompt !== "string") { + return err(new Error('workflow trigger: "prompt" must be a string')); } - const prompt = parts.slice(2).join("|"); - return ok({ workflowName, maxRounds, prompt }); -} - -export type SenseComputeRoute = - | { kind: "launch"; launch: ParsedSenseWorkflowDirective } - | { kind: "signal"; payload: unknown }; - -function stripWorkflowKey(payload: Record): Record { - const { workflow: _drop, ...rest } = payload; - return rest; + const dryRun = value.dryRun; + if (typeof dryRun !== "boolean") { + return err(new Error('workflow trigger: "dryRun" must be a boolean')); + } + return ok({ name: nameRaw.trim(), maxRounds, prompt, dryRun }); } /** - * Interprets a Sense compute non-null return value for the engine: - * - `workflow` missing → normal signal with full payload - * - `workflow: null` or `""` → normal signal; `workflow` key stripped from emitted payload - * - `workflow: "name|n|prompt"` → launch workflow; no Signal is emitted to the bus + * Interprets a Sense compute non-null return value for the engine. + * - Explicit `{ signal, workflow }` (workflow may be null): validates `workflow` when non-null. + * - Any other value: treated as `{ signal: payload, workflow: null }` (shorthand). */ -export function routeSenseComputeOutput(payload: unknown): SenseComputeRoute { - if (!isPlainRecord(payload)) { - return { kind: "signal", payload }; +export function routeSenseComputeOutput(payload: unknown): Result { + if (isPlainRecord(payload) && Object.hasOwn(payload, "signal")) { + const wfRaw = Object.hasOwn(payload, "workflow") ? payload.workflow : null; + if (wfRaw === null) { + return ok({ signal: payload.signal, workflow: null }); + } + const parsed = parseWorkflowTrigger(wfRaw); + if (!parsed.ok) { + return ok({ signal: payload.signal, workflow: null }); + } + return ok({ signal: payload.signal, workflow: parsed.value }); } - const obj = payload; - if (!Object.hasOwn(obj, "workflow")) { - return { kind: "signal", payload }; - } - const w = obj.workflow; - if (w === null || w === "") { - return { kind: "signal", payload: stripWorkflowKey(obj) }; - } - if (typeof w !== "string") { - return { kind: "signal", payload }; - } - const parsed = parseSenseWorkflowDirective(w); - if (!parsed.ok) { - return { kind: "signal", payload: stripWorkflowKey(obj) }; - } - return { kind: "launch", launch: parsed.value }; + return ok({ signal: payload, workflow: null }); } diff --git a/packages/core/src/sense.ts b/packages/core/src/sense.ts index d863733..2d2709f 100644 --- a/packages/core/src/sense.ts +++ b/packages/core/src/sense.ts @@ -15,9 +15,3 @@ export type SenseInfo = { triggers: string[]; lastSignalTimestamp: number | null; }; - -/** The result of a Sense compute — payload plus optional workflow directive. */ -export type SenseResult = { - payload: T; - workflow: string | null; -}; diff --git a/packages/daemon/src/__tests__/file-watcher.test.ts b/packages/daemon/src/__tests__/file-watcher.test.ts index cbbc79b..1da98a6 100644 --- a/packages/daemon/src/__tests__/file-watcher.test.ts +++ b/packages/daemon/src/__tests__/file-watcher.test.ts @@ -79,7 +79,7 @@ describe("createFileWatcher", () => { await new Promise((r) => setTimeout(r, 100)); writeFileSync( join(root, "senses", "cpu-usage", "index.ts"), - "export async function compute() { return 42; }", + "export async function compute() { return { signal: 42, workflow: null }; }", ); await waitFor(() => changes.length > 0, 3000); diff --git a/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts b/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts index 39f4695..adc0f66 100644 --- a/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts +++ b/packages/daemon/src/__tests__/kernel-workflow-integration.test.ts @@ -159,11 +159,18 @@ describe("kernel + workflowManager integration", () => { // and uses routeSenseComputeOutput to detect workflow launches const workerPool = mockChildren[0]; if (workerPool) { - // Simulate the worker sending a signal message with workflow field workerPool.emit("message", { type: "signal", sense: "cpu-usage", - payload: { workflow: "my-workflow|10|run this workflow" }, + payload: { + signal: { reason: "test" }, + workflow: { + name: "my-workflow", + maxRounds: 10, + prompt: "run this workflow", + dryRun: false, + }, + }, }); } @@ -205,13 +212,21 @@ describe("kernel + workflowManager integration", () => { logStore, }); - // Simulate sense worker returning a workflow launch + // Simulate sense worker returning a signal plus workflow launch const workerPool = mockChildren[0]; if (workerPool) { workerPool.emit("message", { type: "signal", sense: "cpu-usage", - payload: { workflow: "alert-workflow|5|handle critical alert" }, + payload: { + signal: { level: "critical" }, + workflow: { + name: "alert-workflow", + maxRounds: 5, + prompt: "handle critical alert", + dryRun: false, + }, + }, }); } @@ -239,6 +254,48 @@ describe("kernel + workflowManager integration", () => { await stopPromise; }); + it("logs sense signal before workflow-launch when both are present", async () => { + const logStore = makeLogStore(); + const config = makeConfig({ + workflows: { "order-wf": { concurrency: 1, overflow: "drop" } }, + }); + + const kernel = createKernel(config, nerveRoot, { + workerScript: "fake-worker.js", + logStore, + }); + + const workerPool = mockChildren[0]; + if (workerPool) { + workerPool.emit("message", { + type: "signal", + sense: "cpu-usage", + payload: { + signal: { seq: 1 }, + workflow: { + name: "order-wf", + maxRounds: 2, + prompt: "p", + dryRun: true, + }, + }, + }); + } + + const senseEntries = logStore.append.mock.calls + .map((c) => c[0] as { source: string; type: string; refId: string | null }) + .filter((e) => e.source === "sense" && e.refId === "cpu-usage"); + const typeOrder = senseEntries.map((e) => e.type); + const sigAt = typeOrder.indexOf("signal"); + const launchAt = typeOrder.indexOf("workflow-launch"); + expect(sigAt).toBeGreaterThanOrEqual(0); + expect(launchAt).toBeGreaterThan(sigAt); + + const stopPromise = kernel.stop(); + await vi.runAllTimersAsync(); + await stopPromise; + }); + it("does not trigger workflow when signal senseId is not in 'on' list", async () => { const logStore = makeLogStore(); const config = makeConfig({ @@ -270,7 +327,7 @@ describe("kernel + workflowManager integration", () => { logStore, }); - // Emit a regular signal (no workflow field) — should NOT trigger any workflow + // Emit a regular signal (shorthand payload) — should NOT trigger any workflow const workerPool = mockChildren[0]; if (workerPool) { workerPool.emit("message", { @@ -320,13 +377,21 @@ describe("kernel + workflowManager integration", () => { logStore, }); - // Simulate sense compute returning a workflow launch + // Simulate sense compute returning a signal plus workflow launch const workerPool = mockChildren[0]; if (workerPool) { workerPool.emit("message", { type: "signal", sense: "cpu-usage", - payload: { workflow: "log-test-workflow|10|test prompt" }, + payload: { + signal: { note: "log" }, + workflow: { + name: "log-test-workflow", + maxRounds: 10, + prompt: "test prompt", + dryRun: false, + }, + }, }); } @@ -384,13 +449,21 @@ describe("kernel + workflowManager integration", () => { }; kernel.reloadConfig(newConfig); - // Simulate sense compute returning a workflow launch for the new workflow + // Simulate sense compute returning a signal plus workflow for the new workflow const workerPool = mockChildren[0]; if (workerPool) { workerPool.emit("message", { type: "signal", sense: "cpu-usage", - payload: { workflow: "new-workflow|10|reload test" }, + payload: { + signal: { phase: "reload" }, + workflow: { + name: "new-workflow", + maxRounds: 10, + prompt: "reload test", + dryRun: false, + }, + }, }); } @@ -457,13 +530,21 @@ describe("kernel + workflowManager integration", () => { (c.send as ReturnType).mockClear(); } - // Simulate sense compute trying to launch the old workflow — it should still not start + // Simulate sense compute trying to launch the removed workflow — it should not start const workerPool = mockChildren[0]; if (workerPool) { workerPool.emit("message", { type: "signal", sense: "cpu-usage", - payload: { workflow: "old-workflow|10|should not work" }, + payload: { + signal: { stale: true }, + workflow: { + name: "old-workflow", + maxRounds: 10, + prompt: "should not work", + dryRun: false, + }, + }, }); } @@ -513,7 +594,15 @@ describe("kernel + workflowManager integration", () => { workerPool.emit("message", { type: "signal", sense: "cpu-usage", - payload: { workflow: "shutdown-test|10|test" }, + payload: { + signal: { shutdownCase: true }, + workflow: { + name: "shutdown-test", + maxRounds: 10, + prompt: "test", + dryRun: false, + }, + }, }); } diff --git a/packages/daemon/src/__tests__/sense-runtime.test.ts b/packages/daemon/src/__tests__/sense-runtime.test.ts index b95d750..e5749a4 100644 --- a/packages/daemon/src/__tests__/sense-runtime.test.ts +++ b/packages/daemon/src/__tests__/sense-runtime.test.ts @@ -201,16 +201,16 @@ describe("executeCompute", () => { const emptyPeers: PeerMap = {}; - it("returns the payload when compute returns a non-null value", async () => { + it("returns the compute result when compute returns a non-null value", async () => { const { runtime, sqlite } = makeRuntime(async (db) => { await db.insert(samples).values({ ts: Date.now(), value: 0.5 }); - return 0.5; + return { signal: 0.5, workflow: null }; }); const result = await executeCompute(runtime, emptyPeers); expect(result.ok).toBe(true); if (!result.ok) return; - expect(result.value).toBe(0.5); + expect(result.value).toEqual({ signal: 0.5, workflow: null }); const rows = sqlite.prepare("SELECT * FROM samples").all(); expect(rows).toHaveLength(1); @@ -253,13 +253,13 @@ describe("executeCompute", () => { const { runtime, sqlite } = makeRuntime(async (_db, p) => { const rows = await p["other-sense"].select().from(samples).all(); - return rows.length > 0 ? rows[0].value : null; + return rows.length > 0 ? { signal: rows[0].value, workflow: null } : null; }); const result = await executeCompute(runtime, peers); expect(result.ok).toBe(true); if (!result.ok) return; - expect(result.value).toBe(3.14); + expect(result.value).toEqual({ signal: 3.14, workflow: null }); peerSqlite.close(); sqlite.close(); @@ -273,7 +273,7 @@ describe("executeCompute", () => { const { runtime, sqlite } = makeRuntime(async (db) => { await db.insert(samples).values({ ts: 999, value: 9.9 }); - return 9.9; + return { signal: 9.9, workflow: null }; }); await executeCompute(runtime, peers); @@ -303,7 +303,7 @@ describe("executeCompute", () => { db, compute: async (d) => { await d.insert(samples).values({ ts: 1000, value: 1.23 }); - return 1.23; + return { signal: 1.23, workflow: null }; }, persistSignal: () => {}, }; @@ -341,11 +341,11 @@ describe("executeCompute", () => { }); it("completes within timeout when compute is fast", async () => { - const { runtime, sqlite } = makeRuntime(async () => 42); + const { runtime, sqlite } = makeRuntime(async () => ({ signal: 42, workflow: null })); const result = await executeCompute(runtime, emptyPeers, 5_000); expect(result.ok).toBe(true); if (!result.ok) return; - expect(result.value).toBe(42); + expect(result.value).toEqual({ signal: 42, workflow: null }); sqlite.close(); }); diff --git a/packages/daemon/src/kernel.ts b/packages/daemon/src/kernel.ts index a2d64f3..d48f2fd 100644 --- a/packages/daemon/src/kernel.ts +++ b/packages/daemon/src/kernel.ts @@ -156,6 +156,56 @@ export function createKernel( } } + function handleSenseWorkerSignal(senseName: string, payload: unknown): void { + const routeResult = routeSenseComputeOutput(payload); + if (!routeResult.ok) { + process.stderr.write( + `[kernel] sense "${senseName}" invalid compute payload: ${routeResult.error.message}\n`, + ); + logStore.append({ + source: "sense", + type: "error", + refId: senseName, + payload: JSON.stringify({ error: routeResult.error.message }), + timestamp: Date.now(), + }); + scheduler.onComputeComplete(senseName); + return; + } + const { signal: signalPayload, workflow } = routeResult.value; + + const signal: Signal = { + id: nextSignalId(), + senseId: senseName, + payload: signalPayload, + timestamp: Date.now(), + }; + logStore.append({ + source: "sense", + type: "signal", + refId: senseName, + payload: JSON.stringify(signalPayload), + timestamp: signal.timestamp, + }); + bus.emit(signal); + + if (workflow !== null) { + workflowManager.startWorkflow(workflow.name, { + prompt: workflow.prompt, + maxRounds: workflow.maxRounds, + dryRun: workflow.dryRun, + }); + logStore.append({ + source: "sense", + type: "workflow-launch", + refId: senseName, + payload: JSON.stringify(workflow), + timestamp: Date.now(), + }); + } + scheduler.onComputeComplete(senseName); + } + function handleWorkerMessage(raw: unknown): void { const result = parseWorkerMessage(raw); if (!result.ok) { @@ -186,34 +236,7 @@ export function createKernel( } if (msg.type === "signal") { - const route = routeSenseComputeOutput(msg.payload); - if (route.kind === "launch") { - const { workflowName, maxRounds, prompt } = route.launch; - workflowManager.startWorkflow(workflowName, { prompt, maxRounds, dryRun: false }); - logStore.append({ - source: "sense", - type: "workflow-launch", - refId: msg.sense, - payload: JSON.stringify(route.launch), - timestamp: Date.now(), - }); - } else { - const signal: Signal = { - id: nextSignalId(), - senseId: msg.sense, - payload: route.payload, - timestamp: Date.now(), - }; - logStore.append({ - source: "sense", - type: "signal", - refId: msg.sense, - payload: JSON.stringify(route.payload), - timestamp: signal.timestamp, - }); - bus.emit(signal); - } - scheduler.onComputeComplete(msg.sense); + handleSenseWorkerSignal(msg.sense, msg.payload); } } diff --git a/packages/daemon/src/sense-runtime.ts b/packages/daemon/src/sense-runtime.ts index d65b237..66258dc 100644 --- a/packages/daemon/src/sense-runtime.ts +++ b/packages/daemon/src/sense-runtime.ts @@ -5,7 +5,7 @@ import { DatabaseSync } from "node:sqlite"; import { drizzle } from "drizzle-orm/node-sqlite"; import type { NodeSQLiteDatabase } from "drizzle-orm/node-sqlite"; -import type { Result } from "@uncaged/nerve-core"; +import type { ComputeResult, Result } from "@uncaged/nerve-core"; import { DEFAULT_SENSE_SIGNAL_RETENTION, err, isPlainRecord, ok } from "@uncaged/nerve-core"; import type { BlobStore } from "@uncaged/nerve-store"; @@ -27,13 +27,14 @@ export type ComputeOptions = { * The shape every sense's index.ts must export. * Engine injects `db` (read-write), `peers` (read-only), and `options` * (`signal`, and `blobs` when running in the sense worker — RFC-001 §8 CAS). - * Returns T when a signal should be emitted, null for silence. + * Returns a structured result when a signal should be emitted (and optionally a workflow), + * or null for silence. */ export type ComputeFn = ( db: DrizzleDB, peers: PeerMap, options?: ComputeOptions, -) => Promise; +) => Promise>; /** All state held for one sense inside a worker */ export type SenseRuntime = { @@ -234,7 +235,7 @@ export async function executeCompute( peers: PeerMap, timeoutMs?: number, blobStore?: BlobStore, -): Promise> { +): Promise>> { const controller = new AbortController(); const options: ComputeOptions = blobStore !== undefined diff --git a/packages/daemon/src/sense-worker.ts b/packages/daemon/src/sense-worker.ts index 25665a3..8fd7ab1 100644 --- a/packages/daemon/src/sense-worker.ts +++ b/packages/daemon/src/sense-worker.ts @@ -19,7 +19,7 @@ import "./experimental-warning-suppression.js"; import { readFileSync } from "node:fs"; import { join, resolve } from "node:path"; -import { parseNerveConfig } from "@uncaged/nerve-core"; +import { parseNerveConfig, routeSenseComputeOutput } from "@uncaged/nerve-core"; import type { NerveConfig } from "@uncaged/nerve-core"; import { createBlobStore } from "@uncaged/nerve-store"; @@ -189,7 +189,12 @@ async function runCompute( } clearGracePeriodTimer(senseName); if (result.value != null) { - runtime.persistSignal(result.value); + const routeResult = routeSenseComputeOutput(result.value); + if (!routeResult.ok) { + sendError(senseName, routeResult.error.message); + return; + } + runtime.persistSignal(routeResult.value.signal); sendSignal(senseName, result.value); } } catch (e: unknown) {