feat: sense compute returns ComputeResult<T> with workflow trigger support
- SenseComputeFn returns ComputeResult<T> = null | { signal, workflow }
- sense-runtime persists result.signal, not result itself
- sense-worker sends workflow trigger message when workflow is non-null
- New SenseWorkflowTriggerMessage in IPC protocol
- Knowledge card updated to match
— 小橘 🍊(NEKO Team)
This commit is contained in:
@@ -176,14 +176,14 @@ describe("executeCompute", () => {
|
||||
|
||||
it("returns non-null and inserts into table when compute returns data", async () => {
|
||||
const { runtime, sqlite } = makeRuntime(async () => ({
|
||||
ts: 1000,
|
||||
value: 0.5,
|
||||
signal: { ts: 1000, value: 0.5 },
|
||||
workflow: null,
|
||||
}));
|
||||
|
||||
const result = await executeCompute(runtime);
|
||||
expect(result.ok).toBe(true);
|
||||
if (!result.ok) return;
|
||||
expect(result.value).toEqual({ ts: 1000, value: 0.5 });
|
||||
expect(result.value).toEqual({ signal: { ts: 1000, value: 0.5 }, workflow: null });
|
||||
|
||||
const rows = sqlite.prepare("SELECT * FROM samples").all();
|
||||
expect(rows).toHaveLength(1);
|
||||
@@ -228,7 +228,7 @@ describe("executeCompute", () => {
|
||||
const runtime: SenseRuntime = {
|
||||
name: "cpu-usage",
|
||||
db,
|
||||
compute: async () => ({ ts: 1000, value: 1.23 }),
|
||||
compute: async () => ({ signal: { ts: 1000, value: 1.23 }, workflow: null }),
|
||||
table: samples,
|
||||
persistSignal: () => {},
|
||||
};
|
||||
@@ -259,11 +259,14 @@ describe("executeCompute", () => {
|
||||
});
|
||||
|
||||
it("completes within timeout when compute is fast", async () => {
|
||||
const { runtime, sqlite } = makeRuntime(async () => ({ ts: 1, value: 42 }));
|
||||
const { runtime, sqlite } = makeRuntime(async () => ({
|
||||
signal: { ts: 1, value: 42 },
|
||||
workflow: null,
|
||||
}));
|
||||
const result = await executeCompute(runtime, 5_000);
|
||||
expect(result.ok).toBe(true);
|
||||
if (!result.ok) return;
|
||||
expect(result.value).toEqual({ ts: 1, value: 42 });
|
||||
expect(result.value).toEqual({ signal: { ts: 1, value: 42 }, workflow: null });
|
||||
sqlite.close();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
* Protocol per RFC §5.2: hub-and-spoke, all messages through engine.
|
||||
*/
|
||||
|
||||
import type { Result } from "@uncaged/nerve-core";
|
||||
import type { Result, WorkflowTrigger } from "@uncaged/nerve-core";
|
||||
import { err, isPlainRecord, ok } from "@uncaged/nerve-core";
|
||||
|
||||
/** Parent → Worker: trigger one compute cycle for a sense */
|
||||
@@ -72,6 +72,13 @@ export type SignalMessage = {
|
||||
payload: unknown;
|
||||
};
|
||||
|
||||
/** Worker → Parent: sense compute result includes a workflow to start */
|
||||
export type SenseWorkflowTriggerMessage = {
|
||||
type: "sense-workflow-trigger";
|
||||
sense: string;
|
||||
workflow: WorkflowTrigger;
|
||||
};
|
||||
|
||||
/** Worker → Parent: compute threw or returned an unexpected error */
|
||||
export type ErrorMessage = {
|
||||
type: "error";
|
||||
@@ -139,7 +146,8 @@ export type WorkerToParentMessage =
|
||||
| HealthResponseMessage
|
||||
| ThreadEventMessage
|
||||
| WorkflowErrorMessage
|
||||
| ThreadWorkflowMessageMessage;
|
||||
| ThreadWorkflowMessageMessage
|
||||
| SenseWorkflowTriggerMessage;
|
||||
|
||||
const PARENT_MSG_TYPES = new Set([
|
||||
"compute",
|
||||
@@ -340,6 +348,7 @@ const WORKER_MSG_TYPES = new Set([
|
||||
"thread-event",
|
||||
"workflow-error",
|
||||
"thread-workflow-message",
|
||||
"sense-workflow-trigger",
|
||||
]);
|
||||
|
||||
function parseThreadWorkflowMessageMsg(
|
||||
@@ -377,6 +386,36 @@ function parseThreadWorkflowMessageMsg(
|
||||
});
|
||||
}
|
||||
|
||||
function parseSenseWorkflowTriggerMsg(obj: Record<string, unknown>): Result<WorkerToParentMessage> {
|
||||
if (typeof obj.sense !== "string") {
|
||||
return err(new Error("Worker 'sense-workflow-trigger' message missing string 'sense' field"));
|
||||
}
|
||||
if (!isPlainRecord(obj.workflow)) {
|
||||
return err(
|
||||
new Error("Worker 'sense-workflow-trigger' message missing object 'workflow' field"),
|
||||
);
|
||||
}
|
||||
const wf = obj.workflow;
|
||||
if (typeof wf.name !== "string")
|
||||
return err(new Error("Worker 'sense-workflow-trigger' workflow missing string 'name'"));
|
||||
if (typeof wf.maxRounds !== "number")
|
||||
return err(new Error("Worker 'sense-workflow-trigger' workflow missing number 'maxRounds'"));
|
||||
if (typeof wf.prompt !== "string")
|
||||
return err(new Error("Worker 'sense-workflow-trigger' workflow missing string 'prompt'"));
|
||||
if (typeof wf.dryRun !== "boolean")
|
||||
return err(new Error("Worker 'sense-workflow-trigger' workflow missing boolean 'dryRun'"));
|
||||
return ok({
|
||||
type: "sense-workflow-trigger",
|
||||
sense: obj.sense,
|
||||
workflow: {
|
||||
name: wf.name,
|
||||
maxRounds: wf.maxRounds,
|
||||
prompt: wf.prompt,
|
||||
dryRun: wf.dryRun,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
/** Validate and parse an unknown IPC message received from a worker process. */
|
||||
export function parseWorkerMessage(raw: unknown): Result<WorkerToParentMessage> {
|
||||
if (!isPlainRecord(raw)) {
|
||||
@@ -395,5 +434,6 @@ export function parseWorkerMessage(raw: unknown): Result<WorkerToParentMessage>
|
||||
if (obj.type === "thread-event") return parseThreadEventMsg(obj);
|
||||
if (obj.type === "workflow-error") return parseWorkflowErrorMsg(obj);
|
||||
if (obj.type === "thread-workflow-message") return parseThreadWorkflowMessageMsg(obj);
|
||||
if (obj.type === "sense-workflow-trigger") return parseSenseWorkflowTriggerMsg(obj);
|
||||
return ok({ type: "ready" });
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ import { drizzle } from "drizzle-orm/node-sqlite";
|
||||
import type { NodeSQLiteDatabase } from "drizzle-orm/node-sqlite";
|
||||
import type { SQLiteTable } from "drizzle-orm/sqlite-core";
|
||||
|
||||
import type { Result, SenseComputeFn } from "@uncaged/nerve-core";
|
||||
import type { ComputeResult, Result, SenseComputeFn } from "@uncaged/nerve-core";
|
||||
import { DEFAULT_SENSE_SIGNAL_RETENTION, err, isPlainRecord, ok } from "@uncaged/nerve-core";
|
||||
|
||||
/** A Drizzle DB instance (schema-generic) */
|
||||
@@ -201,13 +201,15 @@ export async function loadSenseModule(
|
||||
* Execute a sense's compute function with an optional soft timeout.
|
||||
* If timeoutMs is provided and compute takes longer, the AbortSignal is
|
||||
* triggered and an error Result is returned.
|
||||
* When compute returns non-null, the result is persisted to the sense's table
|
||||
* via `db.insert(table).values(result)`.
|
||||
* When compute returns non-null, `result.signal` is persisted to the sense's
|
||||
* table via `db.insert(table).values(result.signal)` and `persistSignal` is
|
||||
* called with `result.signal`. Returns the full `ComputeResult` so callers
|
||||
* can inspect the `workflow` field.
|
||||
*/
|
||||
export async function executeCompute(
|
||||
runtime: SenseRuntime,
|
||||
timeoutMs?: number,
|
||||
): Promise<Result<unknown | null>> {
|
||||
): Promise<Result<ComputeResult<unknown>>> {
|
||||
const controller = new AbortController();
|
||||
|
||||
let timer: ReturnType<typeof setTimeout> | undefined;
|
||||
@@ -229,7 +231,8 @@ export async function executeCompute(
|
||||
|
||||
if (result !== null) {
|
||||
// Cast required: DrizzleDB is schema-agnostic; the sense module guarantees shape compatibility.
|
||||
await runtime.db.insert(runtime.table).values(result as Record<string, unknown>);
|
||||
await runtime.db.insert(runtime.table).values(result.signal as Record<string, unknown>);
|
||||
runtime.persistSignal(result.signal);
|
||||
}
|
||||
|
||||
return ok(result);
|
||||
|
||||
@@ -18,8 +18,8 @@ import "./experimental-warning-suppression.js";
|
||||
import { readFileSync } from "node:fs";
|
||||
import { join, resolve } from "node:path";
|
||||
|
||||
import { parseNerveConfig, routeSenseComputeOutput } from "@uncaged/nerve-core";
|
||||
import type { NerveConfig } from "@uncaged/nerve-core";
|
||||
import { parseNerveConfig } from "@uncaged/nerve-core";
|
||||
import type { NerveConfig, WorkflowTrigger } from "@uncaged/nerve-core";
|
||||
|
||||
import type { WorkerToParentMessage } from "./ipc.js";
|
||||
import { parseParentMessage } from "./ipc.js";
|
||||
@@ -49,6 +49,10 @@ function sendError(sense: string, error: string): void {
|
||||
send({ type: "error", sense, error });
|
||||
}
|
||||
|
||||
function sendWorkflowTrigger(sense: string, workflow: WorkflowTrigger): void {
|
||||
send({ type: "sense-workflow-trigger", sense, workflow });
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Initialisation helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -150,13 +154,10 @@ async function runCompute(
|
||||
}
|
||||
clearGracePeriodTimer(senseName);
|
||||
if (result.value != null) {
|
||||
const routeResult = routeSenseComputeOutput(result.value);
|
||||
if (!routeResult.ok) {
|
||||
sendError(senseName, routeResult.error.message);
|
||||
return;
|
||||
sendSignal(senseName, result.value.signal);
|
||||
if (result.value.workflow !== null) {
|
||||
sendWorkflowTrigger(senseName, result.value.workflow);
|
||||
}
|
||||
runtime.persistSignal(routeResult.value.signal);
|
||||
sendSignal(senseName, result.value);
|
||||
}
|
||||
} catch (e: unknown) {
|
||||
const errMsg = e instanceof Error ? e.message : String(e);
|
||||
|
||||
Reference in New Issue
Block a user