feat(core): sense trigger supports arbitrary shell commands #316

Merged
xiaomo merged 1 commits from feat/315-shell-trigger into main 2026-05-02 10:20:03 +00:00
11 changed files with 233 additions and 66 deletions
+4 -2
View File
@@ -104,10 +104,12 @@ type SenseConfig = {
For mutually exclusive fields, use discriminated unions:
```typescript
// ✅ Good — sense modules return explicit next state + optional workflow trigger
import type { SenseTrigger } from "@uncaged/nerve-core";
// ✅ Good — sense modules return explicit next state + optional trigger (workflow or shell)
type SenseComputeReturn<S> = {
state: S;
workflow: WorkflowTrigger | null;
workflow: SenseTrigger | null;
};
```
@@ -133,6 +133,7 @@ export async function compute(state) {
return {
state: { launched: true, idleTicks: state.idleTicks },
workflow: {
kind: "workflow",
name: "noop",
maxRounds: 3,
prompt: "e2e-archive",
@@ -1,10 +1,11 @@
import { describe, expect, it } from "vitest";
import { parseWorkflowTrigger } from "../sense.js";
import { parseSenseTrigger } from "../sense.js";
describe("parseWorkflowTrigger", () => {
it("accepts a valid trigger object", () => {
const r = parseWorkflowTrigger({
describe("parseSenseTrigger", () => {
it("accepts a valid workflow trigger", () => {
const r = parseSenseTrigger({
kind: "workflow",
name: "my-wf",
maxRounds: 3,
prompt: "go",
@@ -12,11 +13,18 @@ describe("parseWorkflowTrigger", () => {
});
expect(r.ok).toBe(true);
if (!r.ok) return;
expect(r.value).toEqual({ name: "my-wf", maxRounds: 3, prompt: "go", dryRun: true });
expect(r.value).toEqual({
kind: "workflow",
name: "my-wf",
maxRounds: 3,
prompt: "go",
dryRun: true,
});
});
it("trims workflow name", () => {
const r = parseWorkflowTrigger({
const r = parseSenseTrigger({
kind: "workflow",
name: " spaced ",
maxRounds: 1,
prompt: "",
@@ -24,16 +32,45 @@ describe("parseWorkflowTrigger", () => {
});
expect(r.ok).toBe(true);
if (!r.ok) return;
expect(r.value.kind).toBe("workflow");
if (r.value.kind !== "workflow") return;
expect(r.value.name).toBe("spaced");
});
it("rejects empty name", () => {
const r = parseWorkflowTrigger({ name: "", maxRounds: 1, prompt: "x", dryRun: false });
it("accepts a valid shell trigger", () => {
const r = parseSenseTrigger({
kind: "shell",
command: " echo hi ",
});
expect(r.ok).toBe(true);
if (!r.ok) return;
expect(r.value).toEqual({ kind: "shell", command: "echo hi" });
});
it("rejects workflow without kind", () => {
const r = parseSenseTrigger({
name: "my-wf",
maxRounds: 1,
prompt: "x",
dryRun: false,
});
expect(r.ok).toBe(false);
});
it("rejects empty workflow name", () => {
const r = parseSenseTrigger({
kind: "workflow",
name: "",
maxRounds: 1,
prompt: "x",
dryRun: false,
});
expect(r.ok).toBe(false);
});
it("rejects non-integer maxRounds", () => {
const r = parseWorkflowTrigger({
const r = parseSenseTrigger({
kind: "workflow",
name: "w",
maxRounds: 1.5,
prompt: "",
@@ -43,12 +80,19 @@ describe("parseWorkflowTrigger", () => {
});
it("rejects maxRounds < 1", () => {
const r = parseWorkflowTrigger({ name: "w", maxRounds: 0, prompt: "", dryRun: false });
const r = parseSenseTrigger({
kind: "workflow",
name: "w",
maxRounds: 0,
prompt: "",
dryRun: false,
});
expect(r.ok).toBe(false);
});
it("rejects non-boolean dryRun", () => {
const r = parseWorkflowTrigger({
const r = parseSenseTrigger({
kind: "workflow",
name: "w",
maxRounds: 1,
prompt: "",
@@ -56,4 +100,14 @@ describe("parseWorkflowTrigger", () => {
});
expect(r.ok).toBe(false);
});
it("rejects empty shell command", () => {
const r = parseSenseTrigger({ kind: "shell", command: "" });
expect(r.ok).toBe(false);
});
it("rejects unknown kind", () => {
const r = parseSenseTrigger({ kind: "other", x: 1 });
expect(r.ok).toBe(false);
});
});
+10
View File
@@ -54,12 +54,22 @@ export type ExtractConfig = {
/** Parameters for starting a workflow from a Sense compute result (or CLI trigger). */
export type WorkflowTrigger = {
kind: "workflow";
name: string;
maxRounds: number;
prompt: string;
dryRun: boolean;
};
/** Run a shell command from a Sense compute result (daemon executes in the sense worker). */
export type ShellTrigger = {
kind: "shell";
command: string;
};
/** Optional side effect requested by `compute()` — workflow launch or shell command. */
export type SenseTrigger = WorkflowTrigger | ShellTrigger;
export type NerveConfig = {
/** Engine-wide default max moderator rounds (e.g. CLI workflow trigger when omitted). */
maxRounds: number;
+3 -1
View File
@@ -8,6 +8,8 @@ export type {
ExtractConfig,
NerveConfig,
WorkflowTrigger,
ShellTrigger,
SenseTrigger,
} from "./config.js";
export type { SenseInfo } from "./sense.js";
export type { SenseComputeFn, SenseModule } from "./sense.js";
@@ -44,7 +46,7 @@ export type { KnowledgeConfig } from "./config.js";
export { parseKnowledgeYaml } from "./config.js";
export { isPlainRecord } from "./util.js";
export { parseWorkflowTrigger } from "./sense.js";
export { parseSenseTrigger } from "./sense.js";
export { isSenseInfo, isWorkflowStatus } from "./daemon.js";
export type {
+36 -11
View File
@@ -1,4 +1,4 @@
import type { SenseConfig, WorkflowTrigger } from "./config.js";
import type { SenseConfig, SenseTrigger, ShellTrigger, WorkflowTrigger } from "./config.js";
import { type Result, err, isPlainRecord, ok } from "./util.js";
/** Runtime metadata for a sense (e.g. daemon list-senses IPC). */
@@ -16,11 +16,11 @@ export type SenseInfo = {
* `compute` export.
*
* Pure: no DB, no peers.
* Returns the next sense state and an optional workflow to start (`workflow: null` means no workflow).
* Returns the next sense state and an optional trigger (`workflow: null` means no side effect).
*/
export type SenseComputeFn<S = unknown> = (
state: S,
) => Promise<{ state: S; workflow: WorkflowTrigger | null }>;
) => Promise<{ state: S; workflow: SenseTrigger | null }>;
/**
* The full shape a sense module (`src/index.ts`) must export.
@@ -69,13 +69,7 @@ export function senseTriggerLabels(
return [labelSenseTrigger({ interval: sc.interval, on: sc.on })];
}
/**
* Validates a structured workflow trigger object from Sense compute or IPC.
*/
export function parseWorkflowTrigger(value: unknown): Result<WorkflowTrigger> {
if (!isPlainRecord(value)) {
return err(new Error("workflow trigger must be a plain object"));
}
function parseWorkflowTriggerBranch(value: Record<string, unknown>): Result<WorkflowTrigger> {
const nameRaw = value.name;
if (typeof nameRaw !== "string" || nameRaw.trim().length === 0) {
return err(new Error('workflow trigger: "name" must be a non-empty string'));
@@ -92,5 +86,36 @@ export function parseWorkflowTrigger(value: unknown): Result<WorkflowTrigger> {
if (typeof dryRun !== "boolean") {
return err(new Error('workflow trigger: "dryRun" must be a boolean'));
}
return ok({ name: nameRaw.trim(), maxRounds, prompt, dryRun });
return ok({
kind: "workflow",
name: nameRaw.trim(),
maxRounds,
prompt,
dryRun,
});
}
function parseShellTriggerBranch(value: Record<string, unknown>): Result<ShellTrigger> {
const command = value.command;
if (typeof command !== "string" || command.trim().length === 0) {
return err(new Error('shell trigger: "command" must be a non-empty string'));
}
return ok({ kind: "shell", command: command.trim() });
}
/**
* Validates a structured sense trigger from Sense compute or IPC (`workflow` field).
*/
export function parseSenseTrigger(value: unknown): Result<SenseTrigger> {
if (!isPlainRecord(value)) {
return err(new Error("sense trigger must be a plain object"));
}
const kind = value.kind;
if (kind === "workflow") {
return parseWorkflowTriggerBranch(value);
}
if (kind === "shell") {
return parseShellTriggerBranch(value);
}
return err(new Error('sense trigger: "kind" must be "workflow" or "shell"'));
}
@@ -184,6 +184,7 @@ describe("kernel + workflowManager integration", () => {
sense: "cpu-usage",
state: { reason: "test" },
workflow: {
kind: "workflow",
name: "my-workflow",
maxRounds: 10,
prompt: "run this workflow",
@@ -240,6 +241,7 @@ describe("kernel + workflowManager integration", () => {
sense: "cpu-usage",
state: { level: "critical" },
workflow: {
kind: "workflow",
name: "alert-workflow",
maxRounds: 5,
prompt: "handle critical alert",
@@ -294,6 +296,7 @@ describe("kernel + workflowManager integration", () => {
sense: "cpu-usage",
state: { seq: 1 },
workflow: {
kind: "workflow",
name: "order-wf",
maxRounds: 2,
prompt: "p",
@@ -374,6 +377,51 @@ describe("kernel + workflowManager integration", () => {
await vi.runAllTimersAsync();
await stopPromise;
});
it("logs shell-launch and does not start a workflow for shell triggers", async () => {
const logStore = makeLogStore();
const config = makeConfig({ workflows: {} });
const kernel = createKernel(config, nerveRoot, {
workerScript: "fake-worker.js",
logStore,
});
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
const workerPool = mockChildren[0];
if (workerPool) {
workerPool.emit("message", {
type: "compute-result",
sense: "cpu-usage",
state: {},
workflow: {
kind: "shell",
command: "echo nerve-shell-test",
},
});
}
await vi.runAllTimersAsync();
const shellLaunch = logStore.append.mock.calls
.map((c) => c[0] as { source: string; type: string })
.find((e) => e.type === "shell-launch");
expect(shellLaunch).toBeDefined();
const startThread = mockChildren
.flatMap((c) => (c.send as ReturnType<typeof vi.fn>).mock.calls as [unknown][])
.some(
([msg]) =>
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "start-thread",
);
expect(startThread).toBe(false);
const stopPromise = kernel.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
});
describe("workflow events are logged", () => {
@@ -407,6 +455,7 @@ describe("kernel + workflowManager integration", () => {
sense: "cpu-usage",
state: { note: "log" },
workflow: {
kind: "workflow",
name: "log-test-workflow",
maxRounds: 10,
prompt: "test prompt",
@@ -479,6 +528,7 @@ describe("kernel + workflowManager integration", () => {
sense: "cpu-usage",
state: { phase: "reload" },
workflow: {
kind: "workflow",
name: "new-workflow",
maxRounds: 10,
prompt: "reload test",
@@ -560,6 +610,7 @@ describe("kernel + workflowManager integration", () => {
sense: "cpu-usage",
state: { stale: true },
workflow: {
kind: "workflow",
name: "old-workflow",
maxRounds: 10,
prompt: "should not work",
@@ -618,6 +669,7 @@ describe("kernel + workflowManager integration", () => {
sense: "cpu-usage",
state: { shutdownCase: true },
workflow: {
kind: "workflow",
name: "shutdown-test",
maxRounds: 10,
prompt: "test",
+11 -20
View File
@@ -3,8 +3,8 @@
* Protocol per RFC §5.2: hub-and-spoke, all messages through engine.
*/
import type { Result, WorkflowTrigger } from "@uncaged/nerve-core";
import { err, isPlainRecord, ok, parseWorkflowTrigger } from "@uncaged/nerve-core";
import type { Result, SenseTrigger, WorkflowTrigger } from "@uncaged/nerve-core";
import { err, isPlainRecord, ok, parseSenseTrigger } from "@uncaged/nerve-core";
/** Parent → Worker: trigger one compute cycle for a sense */
export type ComputeMessage = {
@@ -70,7 +70,7 @@ export type ComputeResultMessage = {
type: "compute-result";
sense: string;
state: unknown;
workflow: WorkflowTrigger | null;
workflow: SenseTrigger | null;
};
/** Worker → Parent: sense compute result includes a workflow to start */
@@ -262,11 +262,11 @@ function parseComputeResultMsg(obj: Record<string, unknown>): Result<WorkerToPar
if (wfRaw !== null && !isPlainRecord(wfRaw)) {
return err(new Error("Worker 'compute-result' workflow must be an object or null"));
}
let workflow: WorkflowTrigger | null;
let workflow: SenseTrigger | null;
if (wfRaw === null) {
workflow = null;
} else {
const parsed = parseWorkflowTrigger(wfRaw);
const parsed = parseSenseTrigger(wfRaw);
if (!parsed.ok) return err(parsed.error);
workflow = parsed.value;
}
@@ -412,24 +412,15 @@ function parseSenseWorkflowTriggerMsg(obj: Record<string, unknown>): Result<Work
new Error("Worker 'sense-workflow-trigger' message missing object 'workflow' field"),
);
}
const wf = obj.workflow;
if (typeof wf.name !== "string")
return err(new Error("Worker 'sense-workflow-trigger' workflow missing string 'name'"));
if (typeof wf.maxRounds !== "number")
return err(new Error("Worker 'sense-workflow-trigger' workflow missing number 'maxRounds'"));
if (typeof wf.prompt !== "string")
return err(new Error("Worker 'sense-workflow-trigger' workflow missing string 'prompt'"));
if (typeof wf.dryRun !== "boolean")
return err(new Error("Worker 'sense-workflow-trigger' workflow missing boolean 'dryRun'"));
const parsed = parseSenseTrigger(obj.workflow);
if (!parsed.ok) return err(parsed.error);
if (parsed.value.kind !== "workflow") {
return err(new Error("Worker 'sense-workflow-trigger' expects kind \"workflow\""));
}
return ok({
type: "sense-workflow-trigger",
sense: obj.sense,
workflow: {
name: wf.name,
maxRounds: wf.maxRounds,
prompt: wf.prompt,
dryRun: wf.dryRun,
},
workflow: parsed.value,
});
}
+24 -14
View File
@@ -12,7 +12,7 @@ import {
type HealthInfo,
type NerveConfig,
type SenseInfo,
type WorkflowTrigger,
type SenseTrigger,
senseTriggerLabels,
} from "@uncaged/nerve-core";
@@ -145,7 +145,7 @@ export function createKernel(
}
}
function handleComputeResult(senseName: string, workflow: WorkflowTrigger | null): void {
function handleComputeResult(senseName: string, workflow: SenseTrigger | null): void {
logStore.append({
source: "sense",
type: "compute-complete",
@@ -155,18 +155,28 @@ export function createKernel(
});
if (workflow !== null) {
workflowManager.startWorkflow(workflow.name, {
prompt: workflow.prompt,
maxRounds: workflow.maxRounds,
dryRun: workflow.dryRun,
});
logStore.append({
source: "sense",
type: "workflow-launch",
refId: senseName,
payload: JSON.stringify(workflow),
timestamp: Date.now(),
});
if (workflow.kind === "workflow") {
workflowManager.startWorkflow(workflow.name, {
prompt: workflow.prompt,
maxRounds: workflow.maxRounds,
dryRun: workflow.dryRun,
});
logStore.append({
source: "sense",
type: "workflow-launch",
refId: senseName,
payload: JSON.stringify(workflow),
timestamp: Date.now(),
});
} else {
logStore.append({
source: "sense",
type: "shell-launch",
refId: senseName,
payload: JSON.stringify(workflow),
timestamp: Date.now(),
});
}
}
scheduler.onComputeComplete(senseName);
scheduler.onSenseCompleted(senseName);
+5 -3
View File
@@ -1,7 +1,7 @@
import { existsSync, mkdirSync, readFileSync, renameSync, writeFileSync } from "node:fs";
import { dirname, join } from "node:path";
import type { Result, SenseComputeFn, WorkflowTrigger } from "@uncaged/nerve-core";
import type { Result, SenseComputeFn, SenseTrigger } from "@uncaged/nerve-core";
import { err, isPlainRecord, ok } from "@uncaged/nerve-core";
/** All state held for one sense inside a worker */
@@ -19,7 +19,9 @@ export function readState(statePath: string, initialState: unknown): unknown {
return JSON.parse(raw);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[sense-runtime] warning: failed to read state from "${statePath}": ${msg} — using initialState\n`);
process.stderr.write(
`[sense-runtime] warning: failed to read state from "${statePath}": ${msg} — using initialState\n`,
);
return initialState;
}
}
@@ -72,7 +74,7 @@ export async function loadSenseModule(
export async function executeCompute(
runtime: SenseRuntime,
timeoutMs?: number,
): Promise<Result<{ state: unknown; workflow: WorkflowTrigger | null }>> {
): Promise<Result<{ state: unknown; workflow: SenseTrigger | null }>> {
const controller = new AbortController();
let timer: ReturnType<typeof setTimeout> | undefined;
+22 -4
View File
@@ -14,11 +14,12 @@
import "./experimental-warning-suppression.js";
import { spawn } from "node:child_process";
import { readFileSync } from "node:fs";
import { join, resolve } from "node:path";
import { parseNerveConfig } from "@uncaged/nerve-core";
import type { NerveConfig, WorkflowTrigger } from "@uncaged/nerve-core";
import type { NerveConfig, SenseTrigger } from "@uncaged/nerve-core";
import type { WorkerToParentMessage } from "./ipc.js";
import { parseParentMessage } from "./ipc.js";
@@ -42,11 +43,25 @@ function sendReady(): void {
function sendComputeResult(
sense: string,
value: { state: unknown; workflow: WorkflowTrigger | null },
value: { state: unknown; workflow: SenseTrigger | null },
): void {
send({ type: "compute-result", sense, state: value.state, workflow: value.workflow });
}
function executeShellTriggerIfNeeded(nerveRoot: string, trigger: SenseTrigger | null): void {
if (trigger === null || trigger.kind !== "shell") return;
const child = spawn(trigger.command, {
shell: true,
cwd: nerveRoot,
detached: true,
stdio: "ignore",
});
child.on("error", (err) => {
process.stderr.write(`[sense-worker] shell trigger failed: ${err.message}\n`);
});
child.unref();
}
function sendError(sense: string, error: string): void {
send({ type: "error", sense, error });
}
@@ -132,6 +147,7 @@ async function runCompute(
runtime: SenseRuntime,
timeoutMs: number,
gracePeriodMs: number | null,
nerveRoot: string,
): Promise<void> {
try {
const result = await executeCompute(runtime, timeoutMs);
@@ -143,6 +159,7 @@ async function runCompute(
return;
}
clearGracePeriodTimer(senseName);
executeShellTriggerIfNeeded(nerveRoot, result.value.workflow);
sendComputeResult(senseName, result.value);
} catch (e: unknown) {
const errMsg = e instanceof Error ? e.message : String(e);
@@ -160,6 +177,7 @@ function handleMessage(
group: string,
senseConfigs: Map<string, { timeout: number | null; gracePeriod: number | null }>,
inFlight: Map<string, Promise<void>>,
nerveRoot: string,
): void {
const parseResult = parseParentMessage(raw);
if (!parseResult.ok) {
@@ -196,7 +214,7 @@ function handleMessage(
const previous = inFlight.get(msg.sense) ?? Promise.resolve();
const next = previous
.then(() => runCompute(msg.sense, runtime, timeoutMs, gracePeriodMs))
.then(() => runCompute(msg.sense, runtime, timeoutMs, gracePeriodMs, nerveRoot))
.catch((e: unknown) => {
const errMsg = e instanceof Error ? e.message : String(e);
sendError(msg.sense, errMsg);
@@ -257,7 +275,7 @@ async function bootstrap(nerveRoot: string, group: string): Promise<void> {
sendReady();
process.on("message", (raw: unknown) => {
handleMessage(raw, runtimes, group, senseConfigs, inFlight);
handleMessage(raw, runtimes, group, senseConfigs, inFlight, nerveRoot);
});
}