refactor: redesign workflow trigger — signal entails workflow (#204)

Breaking change: compute() returns null | { signal: T; workflow: WorkflowTrigger | null }
- WorkflowTrigger is a structured type (name, maxRounds, prompt, dryRun)
- Signal is always emitted before workflow launch (causal chain)
- CLI: nerve workflow trigger <name> --max-rounds N --prompt '...' --dry-run
- Remove pipe-separated directive format

Fixes #204
This commit is contained in:
2026-04-27 13:04:35 +00:00
parent 6808228c07
commit e159a9b7ca
18 changed files with 427 additions and 172 deletions
+3 -4
View File
@@ -8,10 +8,9 @@ import { samples } from "./schema.js";
* Read the 1-minute CPU load average, persist it, and emit a Signal.
*
* Returns `null` only if `loadavg` is unavailable (non-POSIX platforms).
* On every successful read a row is inserted and the value is returned,
* which causes the engine to emit a Signal.
* On every successful read a row is inserted and a Signal is emitted with the load value.
*/
export async function compute(db: DrizzleDB, _peers: PeerMap): Promise<number | null> {
export async function compute(db: DrizzleDB, _peers: PeerMap) {
const [oneMin] = loadavg();
if (typeof oneMin !== "number" || Number.isNaN(oneMin)) {
@@ -19,5 +18,5 @@ export async function compute(db: DrizzleDB, _peers: PeerMap): Promise<number |
}
await db.insert(samples).values({ ts: Date.now(), value: oneMin });
return oneMin;
return { signal: oneMin, workflow: null };
}
+2 -2
View File
@@ -22,9 +22,9 @@ export type NerveHealth = {
workerUptime: number;
};
export async function compute(): Promise<NerveHealth | null> {
export async function compute() {
const health = await requestHealthFromKernel();
return health;
return { signal: health, workflow: null };
}
function requestHealthFromKernel(): Promise<NerveHealth> {
+11 -3
View File
@@ -134,7 +134,7 @@ SELECT 1;
const counterIndexJs = `let _count = 0;
export async function compute(_db, _peers, _options) {
_count += 1;
return { count: _count };
return { signal: { count: _count }, workflow: null };
}
`;
@@ -143,9 +143,17 @@ const counterIndexJsWithNoopWorkflow = `let _launched = false;
export async function compute(_db, _peers, _options) {
if (!_launched) {
_launched = true;
return { workflow: "noop|3|e2e-archive" };
return {
signal: { launched: true },
workflow: {
name: "noop",
maxRounds: 3,
prompt: "e2e-archive",
dryRun: false,
},
};
}
return { idle: true };
return { signal: { idle: true }, workflow: null };
}
`;
@@ -10,9 +10,6 @@ import { afterEach, describe, expect, it } from "vitest";
import { type TestDaemonHandle, runCli, startTestDaemon, stopTestDaemon } from "./e2e-harness.js";
const PAYLOAD_A = JSON.stringify({ prompt: "alpha-e2e", maxRounds: 10 });
const PAYLOAD_B = JSON.stringify({ prompt: "beta-e2e", maxRounds: 10 });
async function waitForCompletedEchoRuns(
logsDbPath: string,
minCount: number,
@@ -56,11 +53,27 @@ describe("e2e workflow CLI (real daemon)", () => {
daemon = await startTestDaemon();
const logsDb = join(daemon.nerveRoot, "data", "logs.db");
const t1 = await runCli(daemon, ["workflow", "trigger", "echo", "--payload", PAYLOAD_A]);
const t1 = await runCli(daemon, [
"workflow",
"trigger",
"echo",
"--prompt",
"alpha-e2e",
"--max-rounds",
"10",
]);
expect(t1.exitCode).toBe(0);
expect(t1.stdout).toContain("Triggered");
const t2 = await runCli(daemon, ["workflow", "trigger", "echo", "--payload", PAYLOAD_B]);
const t2 = await runCli(daemon, [
"workflow",
"trigger",
"echo",
"--prompt",
"beta-e2e",
"--max-rounds",
"10",
]);
expect(t2.exitCode).toBe(0);
const activeRightAfter = await runCli(daemon, ["thread", "list"]);
+5 -2
View File
@@ -74,8 +74,11 @@ export function buildSenseIndexJs(senseId: string): string {
*/
export async function compute(db, peers, options) {
return {
label: "${senseId}",
ts: Date.now(),
signal: {
label: "${senseId}",
ts: Date.now(),
},
workflow: null,
};
}
`;
+6 -3
View File
@@ -97,9 +97,12 @@ export async function compute() {
const loadPercent = totalTick === 0 ? 0 : ((totalTick - totalIdle) / totalTick) * 100;
return {
model: cpuList[0]?.model ?? "unknown",
loadPercent: Math.round(loadPercent * 100) / 100,
ts: Date.now(),
signal: {
model: cpuList[0]?.model ?? "unknown",
loadPercent: Math.round(loadPercent * 100) / 100,
ts: Date.now(),
},
workflow: null,
};
}
`;
+35 -23
View File
@@ -224,12 +224,12 @@ export function partitionWorkflowMessage(msg: {
}): PartitionedMessage {
const roleStr = msg.role;
const contentBody = msg.content;
const meta: Record<string, unknown> =
const meta =
msg.meta !== null && msg.meta !== undefined && typeof msg.meta === "object"
? isPlainRecord(msg.meta)
? msg.meta
: (msg.meta as Record<string, unknown>)
: {};
: ({} as Record<string, unknown>);
return { roleStr, contentBody, meta };
}
@@ -408,6 +408,21 @@ const workflowStatusCommand = defineCommand({
// nerve workflow trigger <name>
// ---------------------------------------------------------------------------
function readWorkspaceDefaultMaxRounds(): number {
const configPath = join(getNerveRoot(), "nerve.yaml");
let raw: string;
try {
raw = readFileSync(configPath, "utf8");
} catch {
return DEFAULT_ENGINE_MAX_ROUNDS;
}
const result = parseNerveConfig(raw);
if (!result.ok) {
return DEFAULT_ENGINE_MAX_ROUNDS;
}
return result.value.maxRounds;
}
const workflowTriggerCommand = defineCommand({
meta: {
name: "trigger",
@@ -418,31 +433,28 @@ const workflowTriggerCommand = defineCommand({
type: "positional",
description: "The workflow name to trigger",
},
payload: {
maxRounds: {
type: "string",
description:
'JSON with optional "prompt" (string), "maxRounds" (number), and "dryRun" (boolean) for the workflow run (default: {})',
default: "{}",
description: "Max moderator rounds (default: nerve.yaml maxRounds)",
default: "",
},
prompt: {
type: "string",
description: "Initial prompt for the workflow run",
default: "",
},
dryRun: {
type: "boolean",
description: "Run the workflow in dry-run mode",
default: false,
},
},
async run({ args }) {
let triggerPayload: unknown = {};
try {
triggerPayload = JSON.parse(args.payload) as unknown;
} catch {
process.stderr.write(`❌ --payload must be valid JSON. Got: ${args.payload}\n`);
process.exit(1);
}
let prompt = "";
let maxRounds = DEFAULT_ENGINE_MAX_ROUNDS;
let dryRun = false;
if (isPlainRecord(triggerPayload)) {
const p = triggerPayload;
if (typeof p.prompt === "string") prompt = p.prompt;
if (typeof p.maxRounds === "number") maxRounds = p.maxRounds;
if (typeof p.dryRun === "boolean") dryRun = p.dryRun;
}
const prompt = args.prompt;
const defaultMax = readWorkspaceDefaultMaxRounds();
const maxRounds =
args.maxRounds.length > 0 ? parseIntArg(args.maxRounds, defaultMax) : defaultMax;
const dryRun = args.dryRun;
if (!isRemoteDaemonCli() && !isRunning()) {
process.stderr.write("❌ Nerve daemon is not running. Start it with `nerve daemon start`.\n");
@@ -0,0 +1,116 @@
import { describe, expect, it } from "vitest";
import { parseWorkflowTrigger, routeSenseComputeOutput } from "../sense-workflow-directive.js";
describe("parseWorkflowTrigger", () => {
it("accepts a valid trigger object", () => {
const r = parseWorkflowTrigger({
name: "my-wf",
maxRounds: 3,
prompt: "go",
dryRun: true,
});
expect(r.ok).toBe(true);
if (!r.ok) return;
expect(r.value).toEqual({ name: "my-wf", maxRounds: 3, prompt: "go", dryRun: true });
});
it("trims workflow name", () => {
const r = parseWorkflowTrigger({
name: " spaced ",
maxRounds: 1,
prompt: "",
dryRun: false,
});
expect(r.ok).toBe(true);
if (!r.ok) return;
expect(r.value.name).toBe("spaced");
});
it("rejects empty name", () => {
const r = parseWorkflowTrigger({ name: "", maxRounds: 1, prompt: "x", dryRun: false });
expect(r.ok).toBe(false);
});
it("rejects non-integer maxRounds", () => {
const r = parseWorkflowTrigger({
name: "w",
maxRounds: 1.5,
prompt: "",
dryRun: false,
});
expect(r.ok).toBe(false);
});
it("rejects maxRounds < 1", () => {
const r = parseWorkflowTrigger({ name: "w", maxRounds: 0, prompt: "", dryRun: false });
expect(r.ok).toBe(false);
});
it("rejects non-boolean dryRun", () => {
const r = parseWorkflowTrigger({
name: "w",
maxRounds: 1,
prompt: "",
dryRun: "no" as unknown as boolean,
});
expect(r.ok).toBe(false);
});
});
describe("routeSenseComputeOutput", () => {
it("wraps non-record values as signal-only", () => {
const r = routeSenseComputeOutput(99);
expect(r.ok).toBe(true);
if (!r.ok) return;
expect(r.value).toEqual({ signal: 99, workflow: null });
});
it("wraps plain objects without signal key as signal-only", () => {
const r = routeSenseComputeOutput({ count: 2 });
expect(r.ok).toBe(true);
if (!r.ok) return;
expect(r.value).toEqual({ signal: { count: 2 }, workflow: null });
});
it("parses explicit signal with null workflow", () => {
const r = routeSenseComputeOutput({ signal: { a: 1 }, workflow: null });
expect(r.ok).toBe(true);
if (!r.ok) return;
expect(r.value).toEqual({ signal: { a: 1 }, workflow: null });
});
it("parses explicit signal with workflow trigger", () => {
const r = routeSenseComputeOutput({
signal: { x: true },
workflow: { name: "wf", maxRounds: 2, prompt: "p", dryRun: false },
});
expect(r.ok).toBe(true);
if (!r.ok) return;
expect(r.value.signal).toEqual({ x: true });
expect(r.value.workflow).toEqual({
name: "wf",
maxRounds: 2,
prompt: "p",
dryRun: false,
});
});
it("defaults missing workflow key to null", () => {
const r = routeSenseComputeOutput({ signal: 7 });
expect(r.ok).toBe(true);
if (!r.ok) return;
expect(r.value).toEqual({ signal: 7, workflow: null });
});
it("degrades to signal-only when workflow object is invalid", () => {
const r = routeSenseComputeOutput({
signal: { v: 1 },
workflow: { name: "w", maxRounds: 0, prompt: "", dryRun: false },
});
expect(r.ok).toBe(true);
if (!r.ok) return;
expect(r.value.signal).toEqual({ v: 1 });
expect(r.value.workflow).toBeNull();
});
});
+14
View File
@@ -36,6 +36,20 @@ export type NerveApiConfig = {
host: string;
};
/** Parameters for starting a workflow from a Sense compute result (or CLI trigger). */
export type WorkflowTrigger = {
name: string;
maxRounds: number;
prompt: string;
dryRun: boolean;
};
/**
* Sense `compute()` return: silence, or a signal payload with an optional workflow to start.
* `workflow: null` means signal only; signal is always emitted first when non-null.
*/
export type ComputeResult<T> = null | { signal: T; workflow: WorkflowTrigger | null };
export type NerveConfig = {
/** Engine-wide default max moderator rounds (e.g. CLI workflow trigger when omitted). */
maxRounds: number;
+5 -9
View File
@@ -6,8 +6,10 @@ export type {
WorkflowConfig,
NerveApiConfig,
NerveConfig,
WorkflowTrigger,
ComputeResult,
} from "./config.js";
export type { Signal, SenseInfo, SenseResult } from "./sense.js";
export type { Signal, SenseInfo } from "./sense.js";
export { labelSenseTrigger, senseTriggerLabels } from "./sense-trigger-labels.js";
export type {
WorkflowMessage,
@@ -26,14 +28,8 @@ export { ok, err } from "./result.js";
export { parseNerveConfig } from "./parse-nerve-config.js";
export { isPlainRecord } from "./is-plain-record.js";
export type {
ParsedSenseWorkflowDirective,
SenseComputeRoute,
} from "./sense-workflow-directive.js";
export {
parseSenseWorkflowDirective,
routeSenseComputeOutput,
} from "./sense-workflow-directive.js";
export type { RoutedSenseOutput } from "./sense-workflow-directive.js";
export { parseWorkflowTrigger, routeSenseComputeOutput } from "./sense-workflow-directive.js";
export { isSenseInfo, isWorkflowStatus } from "./daemon-payload-guards.js";
export type {
+38 -59
View File
@@ -1,77 +1,56 @@
import type { WorkflowTrigger } from "./config.js";
import { isPlainRecord } from "./is-plain-record.js";
import type { Result } from "./result.js";
import { err, ok } from "./result.js";
/** Parsed `workflow-name|maxRounds|prompt` from a Sense compute return value. */
export type ParsedSenseWorkflowDirective = {
workflowName: string;
maxRounds: number;
prompt: string;
/** Normalized non-null compute output for the kernel (unknown signal payload). */
export type RoutedSenseOutput = {
signal: unknown;
workflow: WorkflowTrigger | null;
};
/**
* Parses the pipe-separated `workflow` field from a Sense compute result.
* `prompt` may contain `|` — only the first two pipes delimit name and rounds.
* Validates a structured workflow trigger object from Sense compute or IPC.
*/
export function parseSenseWorkflowDirective(field: string): Result<ParsedSenseWorkflowDirective> {
const trimmed = field.trim();
if (trimmed.length === 0) {
return err(new Error("workflow directive is empty"));
export function parseWorkflowTrigger(value: unknown): Result<WorkflowTrigger> {
if (!isPlainRecord(value)) {
return err(new Error("workflow trigger must be a plain object"));
}
const parts = trimmed.split("|");
if (parts.length < 3) {
return err(
new Error(
`workflow directive must be "name|maxRounds|prompt" (got ${String(parts.length)} segment(s))`,
),
);
const nameRaw = value.name;
if (typeof nameRaw !== "string" || nameRaw.trim().length === 0) {
return err(new Error('workflow trigger: "name" must be a non-empty string'));
}
const workflowName = (parts[0] ?? "").trim();
if (workflowName.length === 0) {
return err(new Error("workflow directive: empty workflow name"));
const maxRounds = value.maxRounds;
if (typeof maxRounds !== "number" || !Number.isInteger(maxRounds) || maxRounds < 1) {
return err(new Error('workflow trigger: "maxRounds" must be an integer >= 1'));
}
const roundsRaw = (parts[1] ?? "").trim();
const maxRounds = Number.parseInt(roundsRaw, 10);
if (!Number.isInteger(maxRounds) || maxRounds < 1) {
return err(new Error(`workflow directive: invalid maxRounds "${roundsRaw}"`));
const prompt = value.prompt;
if (typeof prompt !== "string") {
return err(new Error('workflow trigger: "prompt" must be a string'));
}
const prompt = parts.slice(2).join("|");
return ok({ workflowName, maxRounds, prompt });
}
export type SenseComputeRoute =
| { kind: "launch"; launch: ParsedSenseWorkflowDirective }
| { kind: "signal"; payload: unknown };
function stripWorkflowKey(payload: Record<string, unknown>): Record<string, unknown> {
const { workflow: _drop, ...rest } = payload;
return rest;
const dryRun = value.dryRun;
if (typeof dryRun !== "boolean") {
return err(new Error('workflow trigger: "dryRun" must be a boolean'));
}
return ok({ name: nameRaw.trim(), maxRounds, prompt, dryRun });
}
/**
* Interprets a Sense compute non-null return value for the engine:
* - `workflow` missing → normal signal with full payload
* - `workflow: null` or `""` → normal signal; `workflow` key stripped from emitted payload
* - `workflow: "name|n|prompt"` → launch workflow; no Signal is emitted to the bus
* Interprets a Sense compute non-null return value for the engine.
* - Explicit `{ signal, workflow }` (workflow may be null): validates `workflow` when non-null.
* - Any other value: treated as `{ signal: payload, workflow: null }` (shorthand).
*/
export function routeSenseComputeOutput(payload: unknown): SenseComputeRoute {
if (!isPlainRecord(payload)) {
return { kind: "signal", payload };
export function routeSenseComputeOutput(payload: unknown): Result<RoutedSenseOutput> {
if (isPlainRecord(payload) && Object.hasOwn(payload, "signal")) {
const wfRaw = Object.hasOwn(payload, "workflow") ? payload.workflow : null;
if (wfRaw === null) {
return ok({ signal: payload.signal, workflow: null });
}
const parsed = parseWorkflowTrigger(wfRaw);
if (!parsed.ok) {
return ok({ signal: payload.signal, workflow: null });
}
return ok({ signal: payload.signal, workflow: parsed.value });
}
const obj = payload;
if (!Object.hasOwn(obj, "workflow")) {
return { kind: "signal", payload };
}
const w = obj.workflow;
if (w === null || w === "") {
return { kind: "signal", payload: stripWorkflowKey(obj) };
}
if (typeof w !== "string") {
return { kind: "signal", payload };
}
const parsed = parseSenseWorkflowDirective(w);
if (!parsed.ok) {
return { kind: "signal", payload: stripWorkflowKey(obj) };
}
return { kind: "launch", launch: parsed.value };
return ok({ signal: payload, workflow: null });
}
-6
View File
@@ -15,9 +15,3 @@ export type SenseInfo = {
triggers: string[];
lastSignalTimestamp: number | null;
};
/** The result of a Sense compute — payload plus optional workflow directive. */
export type SenseResult<T = unknown> = {
payload: T;
workflow: string | null;
};
@@ -79,7 +79,7 @@ describe("createFileWatcher", () => {
await new Promise((r) => setTimeout(r, 100));
writeFileSync(
join(root, "senses", "cpu-usage", "index.ts"),
"export async function compute() { return 42; }",
"export async function compute() { return { signal: 42, workflow: null }; }",
);
await waitFor(() => changes.length > 0, 3000);
@@ -159,11 +159,18 @@ describe("kernel + workflowManager integration", () => {
// and uses routeSenseComputeOutput to detect workflow launches
const workerPool = mockChildren[0];
if (workerPool) {
// Simulate the worker sending a signal message with workflow field
workerPool.emit("message", {
type: "signal",
sense: "cpu-usage",
payload: { workflow: "my-workflow|10|run this workflow" },
payload: {
signal: { reason: "test" },
workflow: {
name: "my-workflow",
maxRounds: 10,
prompt: "run this workflow",
dryRun: false,
},
},
});
}
@@ -205,13 +212,21 @@ describe("kernel + workflowManager integration", () => {
logStore,
});
// Simulate sense worker returning a workflow launch
// Simulate sense worker returning a signal plus workflow launch
const workerPool = mockChildren[0];
if (workerPool) {
workerPool.emit("message", {
type: "signal",
sense: "cpu-usage",
payload: { workflow: "alert-workflow|5|handle critical alert" },
payload: {
signal: { level: "critical" },
workflow: {
name: "alert-workflow",
maxRounds: 5,
prompt: "handle critical alert",
dryRun: false,
},
},
});
}
@@ -239,6 +254,48 @@ describe("kernel + workflowManager integration", () => {
await stopPromise;
});
it("logs sense signal before workflow-launch when both are present", async () => {
const logStore = makeLogStore();
const config = makeConfig({
workflows: { "order-wf": { concurrency: 1, overflow: "drop" } },
});
const kernel = createKernel(config, nerveRoot, {
workerScript: "fake-worker.js",
logStore,
});
const workerPool = mockChildren[0];
if (workerPool) {
workerPool.emit("message", {
type: "signal",
sense: "cpu-usage",
payload: {
signal: { seq: 1 },
workflow: {
name: "order-wf",
maxRounds: 2,
prompt: "p",
dryRun: true,
},
},
});
}
const senseEntries = logStore.append.mock.calls
.map((c) => c[0] as { source: string; type: string; refId: string | null })
.filter((e) => e.source === "sense" && e.refId === "cpu-usage");
const typeOrder = senseEntries.map((e) => e.type);
const sigAt = typeOrder.indexOf("signal");
const launchAt = typeOrder.indexOf("workflow-launch");
expect(sigAt).toBeGreaterThanOrEqual(0);
expect(launchAt).toBeGreaterThan(sigAt);
const stopPromise = kernel.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
it("does not trigger workflow when signal senseId is not in 'on' list", async () => {
const logStore = makeLogStore();
const config = makeConfig({
@@ -270,7 +327,7 @@ describe("kernel + workflowManager integration", () => {
logStore,
});
// Emit a regular signal (no workflow field) — should NOT trigger any workflow
// Emit a regular signal (shorthand payload) — should NOT trigger any workflow
const workerPool = mockChildren[0];
if (workerPool) {
workerPool.emit("message", {
@@ -320,13 +377,21 @@ describe("kernel + workflowManager integration", () => {
logStore,
});
// Simulate sense compute returning a workflow launch
// Simulate sense compute returning a signal plus workflow launch
const workerPool = mockChildren[0];
if (workerPool) {
workerPool.emit("message", {
type: "signal",
sense: "cpu-usage",
payload: { workflow: "log-test-workflow|10|test prompt" },
payload: {
signal: { note: "log" },
workflow: {
name: "log-test-workflow",
maxRounds: 10,
prompt: "test prompt",
dryRun: false,
},
},
});
}
@@ -384,13 +449,21 @@ describe("kernel + workflowManager integration", () => {
};
kernel.reloadConfig(newConfig);
// Simulate sense compute returning a workflow launch for the new workflow
// Simulate sense compute returning a signal plus workflow for the new workflow
const workerPool = mockChildren[0];
if (workerPool) {
workerPool.emit("message", {
type: "signal",
sense: "cpu-usage",
payload: { workflow: "new-workflow|10|reload test" },
payload: {
signal: { phase: "reload" },
workflow: {
name: "new-workflow",
maxRounds: 10,
prompt: "reload test",
dryRun: false,
},
},
});
}
@@ -457,13 +530,21 @@ describe("kernel + workflowManager integration", () => {
(c.send as ReturnType<typeof vi.fn>).mockClear();
}
// Simulate sense compute trying to launch the old workflow — it should still not start
// Simulate sense compute trying to launch the removed workflow — it should not start
const workerPool = mockChildren[0];
if (workerPool) {
workerPool.emit("message", {
type: "signal",
sense: "cpu-usage",
payload: { workflow: "old-workflow|10|should not work" },
payload: {
signal: { stale: true },
workflow: {
name: "old-workflow",
maxRounds: 10,
prompt: "should not work",
dryRun: false,
},
},
});
}
@@ -513,7 +594,15 @@ describe("kernel + workflowManager integration", () => {
workerPool.emit("message", {
type: "signal",
sense: "cpu-usage",
payload: { workflow: "shutdown-test|10|test" },
payload: {
signal: { shutdownCase: true },
workflow: {
name: "shutdown-test",
maxRounds: 10,
prompt: "test",
dryRun: false,
},
},
});
}
@@ -201,16 +201,16 @@ describe("executeCompute", () => {
const emptyPeers: PeerMap = {};
it("returns the payload when compute returns a non-null value", async () => {
it("returns the compute result when compute returns a non-null value", async () => {
const { runtime, sqlite } = makeRuntime(async (db) => {
await db.insert(samples).values({ ts: Date.now(), value: 0.5 });
return 0.5;
return { signal: 0.5, workflow: null };
});
const result = await executeCompute(runtime, emptyPeers);
expect(result.ok).toBe(true);
if (!result.ok) return;
expect(result.value).toBe(0.5);
expect(result.value).toEqual({ signal: 0.5, workflow: null });
const rows = sqlite.prepare("SELECT * FROM samples").all();
expect(rows).toHaveLength(1);
@@ -253,13 +253,13 @@ describe("executeCompute", () => {
const { runtime, sqlite } = makeRuntime(async (_db, p) => {
const rows = await p["other-sense"].select().from(samples).all();
return rows.length > 0 ? rows[0].value : null;
return rows.length > 0 ? { signal: rows[0].value, workflow: null } : null;
});
const result = await executeCompute(runtime, peers);
expect(result.ok).toBe(true);
if (!result.ok) return;
expect(result.value).toBe(3.14);
expect(result.value).toEqual({ signal: 3.14, workflow: null });
peerSqlite.close();
sqlite.close();
@@ -273,7 +273,7 @@ describe("executeCompute", () => {
const { runtime, sqlite } = makeRuntime(async (db) => {
await db.insert(samples).values({ ts: 999, value: 9.9 });
return 9.9;
return { signal: 9.9, workflow: null };
});
await executeCompute(runtime, peers);
@@ -303,7 +303,7 @@ describe("executeCompute", () => {
db,
compute: async (d) => {
await d.insert(samples).values({ ts: 1000, value: 1.23 });
return 1.23;
return { signal: 1.23, workflow: null };
},
persistSignal: () => {},
};
@@ -341,11 +341,11 @@ describe("executeCompute", () => {
});
it("completes within timeout when compute is fast", async () => {
const { runtime, sqlite } = makeRuntime(async () => 42);
const { runtime, sqlite } = makeRuntime(async () => ({ signal: 42, workflow: null }));
const result = await executeCompute(runtime, emptyPeers, 5_000);
expect(result.ok).toBe(true);
if (!result.ok) return;
expect(result.value).toBe(42);
expect(result.value).toEqual({ signal: 42, workflow: null });
sqlite.close();
});
+51 -28
View File
@@ -156,6 +156,56 @@ export function createKernel(
}
}
function handleSenseWorkerSignal(senseName: string, payload: unknown): void {
const routeResult = routeSenseComputeOutput(payload);
if (!routeResult.ok) {
process.stderr.write(
`[kernel] sense "${senseName}" invalid compute payload: ${routeResult.error.message}\n`,
);
logStore.append({
source: "sense",
type: "error",
refId: senseName,
payload: JSON.stringify({ error: routeResult.error.message }),
timestamp: Date.now(),
});
scheduler.onComputeComplete(senseName);
return;
}
const { signal: signalPayload, workflow } = routeResult.value;
const signal: Signal = {
id: nextSignalId(),
senseId: senseName,
payload: signalPayload,
timestamp: Date.now(),
};
logStore.append({
source: "sense",
type: "signal",
refId: senseName,
payload: JSON.stringify(signalPayload),
timestamp: signal.timestamp,
});
bus.emit(signal);
if (workflow !== null) {
workflowManager.startWorkflow(workflow.name, {
prompt: workflow.prompt,
maxRounds: workflow.maxRounds,
dryRun: workflow.dryRun,
});
logStore.append({
source: "sense",
type: "workflow-launch",
refId: senseName,
payload: JSON.stringify(workflow),
timestamp: Date.now(),
});
}
scheduler.onComputeComplete(senseName);
}
function handleWorkerMessage(raw: unknown): void {
const result = parseWorkerMessage(raw);
if (!result.ok) {
@@ -186,34 +236,7 @@ export function createKernel(
}
if (msg.type === "signal") {
const route = routeSenseComputeOutput(msg.payload);
if (route.kind === "launch") {
const { workflowName, maxRounds, prompt } = route.launch;
workflowManager.startWorkflow(workflowName, { prompt, maxRounds, dryRun: false });
logStore.append({
source: "sense",
type: "workflow-launch",
refId: msg.sense,
payload: JSON.stringify(route.launch),
timestamp: Date.now(),
});
} else {
const signal: Signal = {
id: nextSignalId(),
senseId: msg.sense,
payload: route.payload,
timestamp: Date.now(),
};
logStore.append({
source: "sense",
type: "signal",
refId: msg.sense,
payload: JSON.stringify(route.payload),
timestamp: signal.timestamp,
});
bus.emit(signal);
}
scheduler.onComputeComplete(msg.sense);
handleSenseWorkerSignal(msg.sense, msg.payload);
}
}
+5 -4
View File
@@ -5,7 +5,7 @@ import { DatabaseSync } from "node:sqlite";
import { drizzle } from "drizzle-orm/node-sqlite";
import type { NodeSQLiteDatabase } from "drizzle-orm/node-sqlite";
import type { Result } from "@uncaged/nerve-core";
import type { ComputeResult, Result } from "@uncaged/nerve-core";
import { DEFAULT_SENSE_SIGNAL_RETENTION, err, isPlainRecord, ok } from "@uncaged/nerve-core";
import type { BlobStore } from "@uncaged/nerve-store";
@@ -27,13 +27,14 @@ export type ComputeOptions = {
* The shape every sense's index.ts must export.
* Engine injects `db` (read-write), `peers` (read-only), and `options`
* (`signal`, and `blobs` when running in the sense worker — RFC-001 §8 CAS).
* Returns T when a signal should be emitted, null for silence.
* Returns a structured result when a signal should be emitted (and optionally a workflow),
* or null for silence.
*/
export type ComputeFn<T = unknown> = (
db: DrizzleDB,
peers: PeerMap,
options?: ComputeOptions,
) => Promise<T | null>;
) => Promise<ComputeResult<T>>;
/** All state held for one sense inside a worker */
export type SenseRuntime = {
@@ -234,7 +235,7 @@ export async function executeCompute(
peers: PeerMap,
timeoutMs?: number,
blobStore?: BlobStore,
): Promise<Result<unknown | null>> {
): Promise<Result<ComputeResult<unknown>>> {
const controller = new AbortController();
const options: ComputeOptions =
blobStore !== undefined
+7 -2
View File
@@ -19,7 +19,7 @@ import "./experimental-warning-suppression.js";
import { readFileSync } from "node:fs";
import { join, resolve } from "node:path";
import { parseNerveConfig } from "@uncaged/nerve-core";
import { parseNerveConfig, routeSenseComputeOutput } from "@uncaged/nerve-core";
import type { NerveConfig } from "@uncaged/nerve-core";
import { createBlobStore } from "@uncaged/nerve-store";
@@ -189,7 +189,12 @@ async function runCompute(
}
clearGracePeriodTimer(senseName);
if (result.value != null) {
runtime.persistSignal(result.value);
const routeResult = routeSenseComputeOutput(result.value);
if (!routeResult.ok) {
sendError(senseName, routeResult.error.message);
return;
}
runtime.persistSignal(routeResult.value.signal);
sendSignal(senseName, result.value);
}
} catch (e: unknown) {