feat(core): sense trigger supports arbitrary shell commands

Extend SenseComputeReturn to support shell triggers in addition to workflow
triggers via a discriminated union (kind: 'shell' | 'workflow').

Shell triggers execute a command string in the sense worker subprocess
(spawned detached). The kernel logs 'shell-launch' events without involving
the workflow manager.

Breaking change: WorkflowTrigger now requires kind: 'workflow'.
New ShellTrigger type: { kind: 'shell', command: string }.
SenseTrigger = WorkflowTrigger | ShellTrigger.

Closes #315
This commit is contained in:
2026-05-02 09:58:24 +00:00
parent 6b8c917358
commit b9b804eac5
11 changed files with 233 additions and 66 deletions
@@ -184,6 +184,7 @@ describe("kernel + workflowManager integration", () => {
sense: "cpu-usage",
state: { reason: "test" },
workflow: {
kind: "workflow",
name: "my-workflow",
maxRounds: 10,
prompt: "run this workflow",
@@ -240,6 +241,7 @@ describe("kernel + workflowManager integration", () => {
sense: "cpu-usage",
state: { level: "critical" },
workflow: {
kind: "workflow",
name: "alert-workflow",
maxRounds: 5,
prompt: "handle critical alert",
@@ -294,6 +296,7 @@ describe("kernel + workflowManager integration", () => {
sense: "cpu-usage",
state: { seq: 1 },
workflow: {
kind: "workflow",
name: "order-wf",
maxRounds: 2,
prompt: "p",
@@ -374,6 +377,51 @@ describe("kernel + workflowManager integration", () => {
await vi.runAllTimersAsync();
await stopPromise;
});
it("logs shell-launch and does not start a workflow for shell triggers", async () => {
const logStore = makeLogStore();
const config = makeConfig({ workflows: {} });
const kernel = createKernel(config, nerveRoot, {
workerScript: "fake-worker.js",
logStore,
});
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
const workerPool = mockChildren[0];
if (workerPool) {
workerPool.emit("message", {
type: "compute-result",
sense: "cpu-usage",
state: {},
workflow: {
kind: "shell",
command: "echo nerve-shell-test",
},
});
}
await vi.runAllTimersAsync();
const shellLaunch = logStore.append.mock.calls
.map((c) => c[0] as { source: string; type: string })
.find((e) => e.type === "shell-launch");
expect(shellLaunch).toBeDefined();
const startThread = mockChildren
.flatMap((c) => (c.send as ReturnType<typeof vi.fn>).mock.calls as [unknown][])
.some(
([msg]) =>
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "start-thread",
);
expect(startThread).toBe(false);
const stopPromise = kernel.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
});
describe("workflow events are logged", () => {
@@ -407,6 +455,7 @@ describe("kernel + workflowManager integration", () => {
sense: "cpu-usage",
state: { note: "log" },
workflow: {
kind: "workflow",
name: "log-test-workflow",
maxRounds: 10,
prompt: "test prompt",
@@ -479,6 +528,7 @@ describe("kernel + workflowManager integration", () => {
sense: "cpu-usage",
state: { phase: "reload" },
workflow: {
kind: "workflow",
name: "new-workflow",
maxRounds: 10,
prompt: "reload test",
@@ -560,6 +610,7 @@ describe("kernel + workflowManager integration", () => {
sense: "cpu-usage",
state: { stale: true },
workflow: {
kind: "workflow",
name: "old-workflow",
maxRounds: 10,
prompt: "should not work",
@@ -618,6 +669,7 @@ describe("kernel + workflowManager integration", () => {
sense: "cpu-usage",
state: { shutdownCase: true },
workflow: {
kind: "workflow",
name: "shutdown-test",
maxRounds: 10,
prompt: "test",
+11 -20
View File
@@ -3,8 +3,8 @@
* Protocol per RFC §5.2: hub-and-spoke, all messages through engine.
*/
import type { Result, WorkflowTrigger } from "@uncaged/nerve-core";
import { err, isPlainRecord, ok, parseWorkflowTrigger } from "@uncaged/nerve-core";
import type { Result, SenseTrigger, WorkflowTrigger } from "@uncaged/nerve-core";
import { err, isPlainRecord, ok, parseSenseTrigger } from "@uncaged/nerve-core";
/** Parent → Worker: trigger one compute cycle for a sense */
export type ComputeMessage = {
@@ -70,7 +70,7 @@ export type ComputeResultMessage = {
type: "compute-result";
sense: string;
state: unknown;
workflow: WorkflowTrigger | null;
workflow: SenseTrigger | null;
};
/** Worker → Parent: sense compute result includes a workflow to start */
@@ -262,11 +262,11 @@ function parseComputeResultMsg(obj: Record<string, unknown>): Result<WorkerToPar
if (wfRaw !== null && !isPlainRecord(wfRaw)) {
return err(new Error("Worker 'compute-result' workflow must be an object or null"));
}
let workflow: WorkflowTrigger | null;
let workflow: SenseTrigger | null;
if (wfRaw === null) {
workflow = null;
} else {
const parsed = parseWorkflowTrigger(wfRaw);
const parsed = parseSenseTrigger(wfRaw);
if (!parsed.ok) return err(parsed.error);
workflow = parsed.value;
}
@@ -412,24 +412,15 @@ function parseSenseWorkflowTriggerMsg(obj: Record<string, unknown>): Result<Work
new Error("Worker 'sense-workflow-trigger' message missing object 'workflow' field"),
);
}
const wf = obj.workflow;
if (typeof wf.name !== "string")
return err(new Error("Worker 'sense-workflow-trigger' workflow missing string 'name'"));
if (typeof wf.maxRounds !== "number")
return err(new Error("Worker 'sense-workflow-trigger' workflow missing number 'maxRounds'"));
if (typeof wf.prompt !== "string")
return err(new Error("Worker 'sense-workflow-trigger' workflow missing string 'prompt'"));
if (typeof wf.dryRun !== "boolean")
return err(new Error("Worker 'sense-workflow-trigger' workflow missing boolean 'dryRun'"));
const parsed = parseSenseTrigger(obj.workflow);
if (!parsed.ok) return err(parsed.error);
if (parsed.value.kind !== "workflow") {
return err(new Error("Worker 'sense-workflow-trigger' expects kind \"workflow\""));
}
return ok({
type: "sense-workflow-trigger",
sense: obj.sense,
workflow: {
name: wf.name,
maxRounds: wf.maxRounds,
prompt: wf.prompt,
dryRun: wf.dryRun,
},
workflow: parsed.value,
});
}
+24 -14
View File
@@ -12,7 +12,7 @@ import {
type HealthInfo,
type NerveConfig,
type SenseInfo,
type WorkflowTrigger,
type SenseTrigger,
senseTriggerLabels,
} from "@uncaged/nerve-core";
@@ -145,7 +145,7 @@ export function createKernel(
}
}
function handleComputeResult(senseName: string, workflow: WorkflowTrigger | null): void {
function handleComputeResult(senseName: string, workflow: SenseTrigger | null): void {
logStore.append({
source: "sense",
type: "compute-complete",
@@ -155,18 +155,28 @@ export function createKernel(
});
if (workflow !== null) {
workflowManager.startWorkflow(workflow.name, {
prompt: workflow.prompt,
maxRounds: workflow.maxRounds,
dryRun: workflow.dryRun,
});
logStore.append({
source: "sense",
type: "workflow-launch",
refId: senseName,
payload: JSON.stringify(workflow),
timestamp: Date.now(),
});
if (workflow.kind === "workflow") {
workflowManager.startWorkflow(workflow.name, {
prompt: workflow.prompt,
maxRounds: workflow.maxRounds,
dryRun: workflow.dryRun,
});
logStore.append({
source: "sense",
type: "workflow-launch",
refId: senseName,
payload: JSON.stringify(workflow),
timestamp: Date.now(),
});
} else {
logStore.append({
source: "sense",
type: "shell-launch",
refId: senseName,
payload: JSON.stringify(workflow),
timestamp: Date.now(),
});
}
}
scheduler.onComputeComplete(senseName);
scheduler.onSenseCompleted(senseName);
+5 -3
View File
@@ -1,7 +1,7 @@
import { existsSync, mkdirSync, readFileSync, renameSync, writeFileSync } from "node:fs";
import { dirname, join } from "node:path";
import type { Result, SenseComputeFn, WorkflowTrigger } from "@uncaged/nerve-core";
import type { Result, SenseComputeFn, SenseTrigger } from "@uncaged/nerve-core";
import { err, isPlainRecord, ok } from "@uncaged/nerve-core";
/** All state held for one sense inside a worker */
@@ -19,7 +19,9 @@ export function readState(statePath: string, initialState: unknown): unknown {
return JSON.parse(raw);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[sense-runtime] warning: failed to read state from "${statePath}": ${msg} — using initialState\n`);
process.stderr.write(
`[sense-runtime] warning: failed to read state from "${statePath}": ${msg} — using initialState\n`,
);
return initialState;
}
}
@@ -72,7 +74,7 @@ export async function loadSenseModule(
export async function executeCompute(
runtime: SenseRuntime,
timeoutMs?: number,
): Promise<Result<{ state: unknown; workflow: WorkflowTrigger | null }>> {
): Promise<Result<{ state: unknown; workflow: SenseTrigger | null }>> {
const controller = new AbortController();
let timer: ReturnType<typeof setTimeout> | undefined;
+22 -4
View File
@@ -14,11 +14,12 @@
import "./experimental-warning-suppression.js";
import { spawn } from "node:child_process";
import { readFileSync } from "node:fs";
import { join, resolve } from "node:path";
import { parseNerveConfig } from "@uncaged/nerve-core";
import type { NerveConfig, WorkflowTrigger } from "@uncaged/nerve-core";
import type { NerveConfig, SenseTrigger } from "@uncaged/nerve-core";
import type { WorkerToParentMessage } from "./ipc.js";
import { parseParentMessage } from "./ipc.js";
@@ -42,11 +43,25 @@ function sendReady(): void {
function sendComputeResult(
sense: string,
value: { state: unknown; workflow: WorkflowTrigger | null },
value: { state: unknown; workflow: SenseTrigger | null },
): void {
send({ type: "compute-result", sense, state: value.state, workflow: value.workflow });
}
function executeShellTriggerIfNeeded(nerveRoot: string, trigger: SenseTrigger | null): void {
if (trigger === null || trigger.kind !== "shell") return;
const child = spawn(trigger.command, {
shell: true,
cwd: nerveRoot,
detached: true,
stdio: "ignore",
});
child.on("error", (err) => {
process.stderr.write(`[sense-worker] shell trigger failed: ${err.message}\n`);
});
child.unref();
}
function sendError(sense: string, error: string): void {
send({ type: "error", sense, error });
}
@@ -132,6 +147,7 @@ async function runCompute(
runtime: SenseRuntime,
timeoutMs: number,
gracePeriodMs: number | null,
nerveRoot: string,
): Promise<void> {
try {
const result = await executeCompute(runtime, timeoutMs);
@@ -143,6 +159,7 @@ async function runCompute(
return;
}
clearGracePeriodTimer(senseName);
executeShellTriggerIfNeeded(nerveRoot, result.value.workflow);
sendComputeResult(senseName, result.value);
} catch (e: unknown) {
const errMsg = e instanceof Error ? e.message : String(e);
@@ -160,6 +177,7 @@ function handleMessage(
group: string,
senseConfigs: Map<string, { timeout: number | null; gracePeriod: number | null }>,
inFlight: Map<string, Promise<void>>,
nerveRoot: string,
): void {
const parseResult = parseParentMessage(raw);
if (!parseResult.ok) {
@@ -196,7 +214,7 @@ function handleMessage(
const previous = inFlight.get(msg.sense) ?? Promise.resolve();
const next = previous
.then(() => runCompute(msg.sense, runtime, timeoutMs, gracePeriodMs))
.then(() => runCompute(msg.sense, runtime, timeoutMs, gracePeriodMs, nerveRoot))
.catch((e: unknown) => {
const errMsg = e instanceof Error ? e.message : String(e);
sendError(msg.sense, errMsg);
@@ -257,7 +275,7 @@ async function bootstrap(nerveRoot: string, group: string): Promise<void> {
sendReady();
process.on("message", (raw: unknown) => {
handleMessage(raw, runtimes, group, senseConfigs, inFlight);
handleMessage(raw, runtimes, group, senseConfigs, inFlight, nerveRoot);
});
}