Merge pull request 'feat(core): sense trigger supports arbitrary shell commands' (#316) from feat/315-shell-trigger into main
This commit was merged in pull request #316.
This commit is contained in:
@@ -104,10 +104,12 @@ type SenseConfig = {
|
|||||||
For mutually exclusive fields, use discriminated unions:
|
For mutually exclusive fields, use discriminated unions:
|
||||||
|
|
||||||
```typescript
|
```typescript
|
||||||
// ✅ Good — sense modules return explicit next state + optional workflow trigger
|
import type { SenseTrigger } from "@uncaged/nerve-core";
|
||||||
|
|
||||||
|
// ✅ Good — sense modules return explicit next state + optional trigger (workflow or shell)
|
||||||
type SenseComputeReturn<S> = {
|
type SenseComputeReturn<S> = {
|
||||||
state: S;
|
state: S;
|
||||||
workflow: WorkflowTrigger | null;
|
workflow: SenseTrigger | null;
|
||||||
};
|
};
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|||||||
@@ -133,6 +133,7 @@ export async function compute(state) {
|
|||||||
return {
|
return {
|
||||||
state: { launched: true, idleTicks: state.idleTicks },
|
state: { launched: true, idleTicks: state.idleTicks },
|
||||||
workflow: {
|
workflow: {
|
||||||
|
kind: "workflow",
|
||||||
name: "noop",
|
name: "noop",
|
||||||
maxRounds: 3,
|
maxRounds: 3,
|
||||||
prompt: "e2e-archive",
|
prompt: "e2e-archive",
|
||||||
|
|||||||
@@ -1,10 +1,11 @@
|
|||||||
import { describe, expect, it } from "vitest";
|
import { describe, expect, it } from "vitest";
|
||||||
|
|
||||||
import { parseWorkflowTrigger } from "../sense.js";
|
import { parseSenseTrigger } from "../sense.js";
|
||||||
|
|
||||||
describe("parseWorkflowTrigger", () => {
|
describe("parseSenseTrigger", () => {
|
||||||
it("accepts a valid trigger object", () => {
|
it("accepts a valid workflow trigger", () => {
|
||||||
const r = parseWorkflowTrigger({
|
const r = parseSenseTrigger({
|
||||||
|
kind: "workflow",
|
||||||
name: "my-wf",
|
name: "my-wf",
|
||||||
maxRounds: 3,
|
maxRounds: 3,
|
||||||
prompt: "go",
|
prompt: "go",
|
||||||
@@ -12,11 +13,18 @@ describe("parseWorkflowTrigger", () => {
|
|||||||
});
|
});
|
||||||
expect(r.ok).toBe(true);
|
expect(r.ok).toBe(true);
|
||||||
if (!r.ok) return;
|
if (!r.ok) return;
|
||||||
expect(r.value).toEqual({ name: "my-wf", maxRounds: 3, prompt: "go", dryRun: true });
|
expect(r.value).toEqual({
|
||||||
|
kind: "workflow",
|
||||||
|
name: "my-wf",
|
||||||
|
maxRounds: 3,
|
||||||
|
prompt: "go",
|
||||||
|
dryRun: true,
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
it("trims workflow name", () => {
|
it("trims workflow name", () => {
|
||||||
const r = parseWorkflowTrigger({
|
const r = parseSenseTrigger({
|
||||||
|
kind: "workflow",
|
||||||
name: " spaced ",
|
name: " spaced ",
|
||||||
maxRounds: 1,
|
maxRounds: 1,
|
||||||
prompt: "",
|
prompt: "",
|
||||||
@@ -24,16 +32,45 @@ describe("parseWorkflowTrigger", () => {
|
|||||||
});
|
});
|
||||||
expect(r.ok).toBe(true);
|
expect(r.ok).toBe(true);
|
||||||
if (!r.ok) return;
|
if (!r.ok) return;
|
||||||
|
expect(r.value.kind).toBe("workflow");
|
||||||
|
if (r.value.kind !== "workflow") return;
|
||||||
expect(r.value.name).toBe("spaced");
|
expect(r.value.name).toBe("spaced");
|
||||||
});
|
});
|
||||||
|
|
||||||
it("rejects empty name", () => {
|
it("accepts a valid shell trigger", () => {
|
||||||
const r = parseWorkflowTrigger({ name: "", maxRounds: 1, prompt: "x", dryRun: false });
|
const r = parseSenseTrigger({
|
||||||
|
kind: "shell",
|
||||||
|
command: " echo hi ",
|
||||||
|
});
|
||||||
|
expect(r.ok).toBe(true);
|
||||||
|
if (!r.ok) return;
|
||||||
|
expect(r.value).toEqual({ kind: "shell", command: "echo hi" });
|
||||||
|
});
|
||||||
|
|
||||||
|
it("rejects workflow without kind", () => {
|
||||||
|
const r = parseSenseTrigger({
|
||||||
|
name: "my-wf",
|
||||||
|
maxRounds: 1,
|
||||||
|
prompt: "x",
|
||||||
|
dryRun: false,
|
||||||
|
});
|
||||||
|
expect(r.ok).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("rejects empty workflow name", () => {
|
||||||
|
const r = parseSenseTrigger({
|
||||||
|
kind: "workflow",
|
||||||
|
name: "",
|
||||||
|
maxRounds: 1,
|
||||||
|
prompt: "x",
|
||||||
|
dryRun: false,
|
||||||
|
});
|
||||||
expect(r.ok).toBe(false);
|
expect(r.ok).toBe(false);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("rejects non-integer maxRounds", () => {
|
it("rejects non-integer maxRounds", () => {
|
||||||
const r = parseWorkflowTrigger({
|
const r = parseSenseTrigger({
|
||||||
|
kind: "workflow",
|
||||||
name: "w",
|
name: "w",
|
||||||
maxRounds: 1.5,
|
maxRounds: 1.5,
|
||||||
prompt: "",
|
prompt: "",
|
||||||
@@ -43,12 +80,19 @@ describe("parseWorkflowTrigger", () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
it("rejects maxRounds < 1", () => {
|
it("rejects maxRounds < 1", () => {
|
||||||
const r = parseWorkflowTrigger({ name: "w", maxRounds: 0, prompt: "", dryRun: false });
|
const r = parseSenseTrigger({
|
||||||
|
kind: "workflow",
|
||||||
|
name: "w",
|
||||||
|
maxRounds: 0,
|
||||||
|
prompt: "",
|
||||||
|
dryRun: false,
|
||||||
|
});
|
||||||
expect(r.ok).toBe(false);
|
expect(r.ok).toBe(false);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("rejects non-boolean dryRun", () => {
|
it("rejects non-boolean dryRun", () => {
|
||||||
const r = parseWorkflowTrigger({
|
const r = parseSenseTrigger({
|
||||||
|
kind: "workflow",
|
||||||
name: "w",
|
name: "w",
|
||||||
maxRounds: 1,
|
maxRounds: 1,
|
||||||
prompt: "",
|
prompt: "",
|
||||||
@@ -56,4 +100,14 @@ describe("parseWorkflowTrigger", () => {
|
|||||||
});
|
});
|
||||||
expect(r.ok).toBe(false);
|
expect(r.ok).toBe(false);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("rejects empty shell command", () => {
|
||||||
|
const r = parseSenseTrigger({ kind: "shell", command: "" });
|
||||||
|
expect(r.ok).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("rejects unknown kind", () => {
|
||||||
|
const r = parseSenseTrigger({ kind: "other", x: 1 });
|
||||||
|
expect(r.ok).toBe(false);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -54,12 +54,22 @@ export type ExtractConfig = {
|
|||||||
|
|
||||||
/** Parameters for starting a workflow from a Sense compute result (or CLI trigger). */
|
/** Parameters for starting a workflow from a Sense compute result (or CLI trigger). */
|
||||||
export type WorkflowTrigger = {
|
export type WorkflowTrigger = {
|
||||||
|
kind: "workflow";
|
||||||
name: string;
|
name: string;
|
||||||
maxRounds: number;
|
maxRounds: number;
|
||||||
prompt: string;
|
prompt: string;
|
||||||
dryRun: boolean;
|
dryRun: boolean;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/** Run a shell command from a Sense compute result (daemon executes in the sense worker). */
|
||||||
|
export type ShellTrigger = {
|
||||||
|
kind: "shell";
|
||||||
|
command: string;
|
||||||
|
};
|
||||||
|
|
||||||
|
/** Optional side effect requested by `compute()` — workflow launch or shell command. */
|
||||||
|
export type SenseTrigger = WorkflowTrigger | ShellTrigger;
|
||||||
|
|
||||||
export type NerveConfig = {
|
export type NerveConfig = {
|
||||||
/** Engine-wide default max moderator rounds (e.g. CLI workflow trigger when omitted). */
|
/** Engine-wide default max moderator rounds (e.g. CLI workflow trigger when omitted). */
|
||||||
maxRounds: number;
|
maxRounds: number;
|
||||||
|
|||||||
@@ -8,6 +8,8 @@ export type {
|
|||||||
ExtractConfig,
|
ExtractConfig,
|
||||||
NerveConfig,
|
NerveConfig,
|
||||||
WorkflowTrigger,
|
WorkflowTrigger,
|
||||||
|
ShellTrigger,
|
||||||
|
SenseTrigger,
|
||||||
} from "./config.js";
|
} from "./config.js";
|
||||||
export type { SenseInfo } from "./sense.js";
|
export type { SenseInfo } from "./sense.js";
|
||||||
export type { SenseComputeFn, SenseModule } from "./sense.js";
|
export type { SenseComputeFn, SenseModule } from "./sense.js";
|
||||||
@@ -44,7 +46,7 @@ export type { KnowledgeConfig } from "./config.js";
|
|||||||
export { parseKnowledgeYaml } from "./config.js";
|
export { parseKnowledgeYaml } from "./config.js";
|
||||||
export { isPlainRecord } from "./util.js";
|
export { isPlainRecord } from "./util.js";
|
||||||
|
|
||||||
export { parseWorkflowTrigger } from "./sense.js";
|
export { parseSenseTrigger } from "./sense.js";
|
||||||
|
|
||||||
export { isSenseInfo, isWorkflowStatus } from "./daemon.js";
|
export { isSenseInfo, isWorkflowStatus } from "./daemon.js";
|
||||||
export type {
|
export type {
|
||||||
|
|||||||
+36
-11
@@ -1,4 +1,4 @@
|
|||||||
import type { SenseConfig, WorkflowTrigger } from "./config.js";
|
import type { SenseConfig, SenseTrigger, ShellTrigger, WorkflowTrigger } from "./config.js";
|
||||||
import { type Result, err, isPlainRecord, ok } from "./util.js";
|
import { type Result, err, isPlainRecord, ok } from "./util.js";
|
||||||
|
|
||||||
/** Runtime metadata for a sense (e.g. daemon list-senses IPC). */
|
/** Runtime metadata for a sense (e.g. daemon list-senses IPC). */
|
||||||
@@ -16,11 +16,11 @@ export type SenseInfo = {
|
|||||||
* `compute` export.
|
* `compute` export.
|
||||||
*
|
*
|
||||||
* Pure: no DB, no peers.
|
* Pure: no DB, no peers.
|
||||||
* Returns the next sense state and an optional workflow to start (`workflow: null` means no workflow).
|
* Returns the next sense state and an optional trigger (`workflow: null` means no side effect).
|
||||||
*/
|
*/
|
||||||
export type SenseComputeFn<S = unknown> = (
|
export type SenseComputeFn<S = unknown> = (
|
||||||
state: S,
|
state: S,
|
||||||
) => Promise<{ state: S; workflow: WorkflowTrigger | null }>;
|
) => Promise<{ state: S; workflow: SenseTrigger | null }>;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The full shape a sense module (`src/index.ts`) must export.
|
* The full shape a sense module (`src/index.ts`) must export.
|
||||||
@@ -69,13 +69,7 @@ export function senseTriggerLabels(
|
|||||||
return [labelSenseTrigger({ interval: sc.interval, on: sc.on })];
|
return [labelSenseTrigger({ interval: sc.interval, on: sc.on })];
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
function parseWorkflowTriggerBranch(value: Record<string, unknown>): Result<WorkflowTrigger> {
|
||||||
* Validates a structured workflow trigger object from Sense compute or IPC.
|
|
||||||
*/
|
|
||||||
export function parseWorkflowTrigger(value: unknown): Result<WorkflowTrigger> {
|
|
||||||
if (!isPlainRecord(value)) {
|
|
||||||
return err(new Error("workflow trigger must be a plain object"));
|
|
||||||
}
|
|
||||||
const nameRaw = value.name;
|
const nameRaw = value.name;
|
||||||
if (typeof nameRaw !== "string" || nameRaw.trim().length === 0) {
|
if (typeof nameRaw !== "string" || nameRaw.trim().length === 0) {
|
||||||
return err(new Error('workflow trigger: "name" must be a non-empty string'));
|
return err(new Error('workflow trigger: "name" must be a non-empty string'));
|
||||||
@@ -92,5 +86,36 @@ export function parseWorkflowTrigger(value: unknown): Result<WorkflowTrigger> {
|
|||||||
if (typeof dryRun !== "boolean") {
|
if (typeof dryRun !== "boolean") {
|
||||||
return err(new Error('workflow trigger: "dryRun" must be a boolean'));
|
return err(new Error('workflow trigger: "dryRun" must be a boolean'));
|
||||||
}
|
}
|
||||||
return ok({ name: nameRaw.trim(), maxRounds, prompt, dryRun });
|
return ok({
|
||||||
|
kind: "workflow",
|
||||||
|
name: nameRaw.trim(),
|
||||||
|
maxRounds,
|
||||||
|
prompt,
|
||||||
|
dryRun,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function parseShellTriggerBranch(value: Record<string, unknown>): Result<ShellTrigger> {
|
||||||
|
const command = value.command;
|
||||||
|
if (typeof command !== "string" || command.trim().length === 0) {
|
||||||
|
return err(new Error('shell trigger: "command" must be a non-empty string'));
|
||||||
|
}
|
||||||
|
return ok({ kind: "shell", command: command.trim() });
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Validates a structured sense trigger from Sense compute or IPC (`workflow` field).
|
||||||
|
*/
|
||||||
|
export function parseSenseTrigger(value: unknown): Result<SenseTrigger> {
|
||||||
|
if (!isPlainRecord(value)) {
|
||||||
|
return err(new Error("sense trigger must be a plain object"));
|
||||||
|
}
|
||||||
|
const kind = value.kind;
|
||||||
|
if (kind === "workflow") {
|
||||||
|
return parseWorkflowTriggerBranch(value);
|
||||||
|
}
|
||||||
|
if (kind === "shell") {
|
||||||
|
return parseShellTriggerBranch(value);
|
||||||
|
}
|
||||||
|
return err(new Error('sense trigger: "kind" must be "workflow" or "shell"'));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -184,6 +184,7 @@ describe("kernel + workflowManager integration", () => {
|
|||||||
sense: "cpu-usage",
|
sense: "cpu-usage",
|
||||||
state: { reason: "test" },
|
state: { reason: "test" },
|
||||||
workflow: {
|
workflow: {
|
||||||
|
kind: "workflow",
|
||||||
name: "my-workflow",
|
name: "my-workflow",
|
||||||
maxRounds: 10,
|
maxRounds: 10,
|
||||||
prompt: "run this workflow",
|
prompt: "run this workflow",
|
||||||
@@ -240,6 +241,7 @@ describe("kernel + workflowManager integration", () => {
|
|||||||
sense: "cpu-usage",
|
sense: "cpu-usage",
|
||||||
state: { level: "critical" },
|
state: { level: "critical" },
|
||||||
workflow: {
|
workflow: {
|
||||||
|
kind: "workflow",
|
||||||
name: "alert-workflow",
|
name: "alert-workflow",
|
||||||
maxRounds: 5,
|
maxRounds: 5,
|
||||||
prompt: "handle critical alert",
|
prompt: "handle critical alert",
|
||||||
@@ -294,6 +296,7 @@ describe("kernel + workflowManager integration", () => {
|
|||||||
sense: "cpu-usage",
|
sense: "cpu-usage",
|
||||||
state: { seq: 1 },
|
state: { seq: 1 },
|
||||||
workflow: {
|
workflow: {
|
||||||
|
kind: "workflow",
|
||||||
name: "order-wf",
|
name: "order-wf",
|
||||||
maxRounds: 2,
|
maxRounds: 2,
|
||||||
prompt: "p",
|
prompt: "p",
|
||||||
@@ -374,6 +377,51 @@ describe("kernel + workflowManager integration", () => {
|
|||||||
await vi.runAllTimersAsync();
|
await vi.runAllTimersAsync();
|
||||||
await stopPromise;
|
await stopPromise;
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("logs shell-launch and does not start a workflow for shell triggers", async () => {
|
||||||
|
const logStore = makeLogStore();
|
||||||
|
const config = makeConfig({ workflows: {} });
|
||||||
|
const kernel = createKernel(config, nerveRoot, {
|
||||||
|
workerScript: "fake-worker.js",
|
||||||
|
logStore,
|
||||||
|
});
|
||||||
|
await flushSenseWorkerForkMicrotasks(kernel);
|
||||||
|
await vi.runAllTimersAsync();
|
||||||
|
|
||||||
|
const workerPool = mockChildren[0];
|
||||||
|
if (workerPool) {
|
||||||
|
workerPool.emit("message", {
|
||||||
|
type: "compute-result",
|
||||||
|
sense: "cpu-usage",
|
||||||
|
state: {},
|
||||||
|
workflow: {
|
||||||
|
kind: "shell",
|
||||||
|
command: "echo nerve-shell-test",
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
await vi.runAllTimersAsync();
|
||||||
|
|
||||||
|
const shellLaunch = logStore.append.mock.calls
|
||||||
|
.map((c) => c[0] as { source: string; type: string })
|
||||||
|
.find((e) => e.type === "shell-launch");
|
||||||
|
expect(shellLaunch).toBeDefined();
|
||||||
|
|
||||||
|
const startThread = mockChildren
|
||||||
|
.flatMap((c) => (c.send as ReturnType<typeof vi.fn>).mock.calls as [unknown][])
|
||||||
|
.some(
|
||||||
|
([msg]) =>
|
||||||
|
msg !== null &&
|
||||||
|
typeof msg === "object" &&
|
||||||
|
(msg as Record<string, unknown>).type === "start-thread",
|
||||||
|
);
|
||||||
|
expect(startThread).toBe(false);
|
||||||
|
|
||||||
|
const stopPromise = kernel.stop();
|
||||||
|
await vi.runAllTimersAsync();
|
||||||
|
await stopPromise;
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe("workflow events are logged", () => {
|
describe("workflow events are logged", () => {
|
||||||
@@ -407,6 +455,7 @@ describe("kernel + workflowManager integration", () => {
|
|||||||
sense: "cpu-usage",
|
sense: "cpu-usage",
|
||||||
state: { note: "log" },
|
state: { note: "log" },
|
||||||
workflow: {
|
workflow: {
|
||||||
|
kind: "workflow",
|
||||||
name: "log-test-workflow",
|
name: "log-test-workflow",
|
||||||
maxRounds: 10,
|
maxRounds: 10,
|
||||||
prompt: "test prompt",
|
prompt: "test prompt",
|
||||||
@@ -479,6 +528,7 @@ describe("kernel + workflowManager integration", () => {
|
|||||||
sense: "cpu-usage",
|
sense: "cpu-usage",
|
||||||
state: { phase: "reload" },
|
state: { phase: "reload" },
|
||||||
workflow: {
|
workflow: {
|
||||||
|
kind: "workflow",
|
||||||
name: "new-workflow",
|
name: "new-workflow",
|
||||||
maxRounds: 10,
|
maxRounds: 10,
|
||||||
prompt: "reload test",
|
prompt: "reload test",
|
||||||
@@ -560,6 +610,7 @@ describe("kernel + workflowManager integration", () => {
|
|||||||
sense: "cpu-usage",
|
sense: "cpu-usage",
|
||||||
state: { stale: true },
|
state: { stale: true },
|
||||||
workflow: {
|
workflow: {
|
||||||
|
kind: "workflow",
|
||||||
name: "old-workflow",
|
name: "old-workflow",
|
||||||
maxRounds: 10,
|
maxRounds: 10,
|
||||||
prompt: "should not work",
|
prompt: "should not work",
|
||||||
@@ -618,6 +669,7 @@ describe("kernel + workflowManager integration", () => {
|
|||||||
sense: "cpu-usage",
|
sense: "cpu-usage",
|
||||||
state: { shutdownCase: true },
|
state: { shutdownCase: true },
|
||||||
workflow: {
|
workflow: {
|
||||||
|
kind: "workflow",
|
||||||
name: "shutdown-test",
|
name: "shutdown-test",
|
||||||
maxRounds: 10,
|
maxRounds: 10,
|
||||||
prompt: "test",
|
prompt: "test",
|
||||||
|
|||||||
+11
-20
@@ -3,8 +3,8 @@
|
|||||||
* Protocol per RFC §5.2: hub-and-spoke, all messages through engine.
|
* Protocol per RFC §5.2: hub-and-spoke, all messages through engine.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import type { Result, WorkflowTrigger } from "@uncaged/nerve-core";
|
import type { Result, SenseTrigger, WorkflowTrigger } from "@uncaged/nerve-core";
|
||||||
import { err, isPlainRecord, ok, parseWorkflowTrigger } from "@uncaged/nerve-core";
|
import { err, isPlainRecord, ok, parseSenseTrigger } from "@uncaged/nerve-core";
|
||||||
|
|
||||||
/** Parent → Worker: trigger one compute cycle for a sense */
|
/** Parent → Worker: trigger one compute cycle for a sense */
|
||||||
export type ComputeMessage = {
|
export type ComputeMessage = {
|
||||||
@@ -70,7 +70,7 @@ export type ComputeResultMessage = {
|
|||||||
type: "compute-result";
|
type: "compute-result";
|
||||||
sense: string;
|
sense: string;
|
||||||
state: unknown;
|
state: unknown;
|
||||||
workflow: WorkflowTrigger | null;
|
workflow: SenseTrigger | null;
|
||||||
};
|
};
|
||||||
|
|
||||||
/** Worker → Parent: sense compute result includes a workflow to start */
|
/** Worker → Parent: sense compute result includes a workflow to start */
|
||||||
@@ -262,11 +262,11 @@ function parseComputeResultMsg(obj: Record<string, unknown>): Result<WorkerToPar
|
|||||||
if (wfRaw !== null && !isPlainRecord(wfRaw)) {
|
if (wfRaw !== null && !isPlainRecord(wfRaw)) {
|
||||||
return err(new Error("Worker 'compute-result' workflow must be an object or null"));
|
return err(new Error("Worker 'compute-result' workflow must be an object or null"));
|
||||||
}
|
}
|
||||||
let workflow: WorkflowTrigger | null;
|
let workflow: SenseTrigger | null;
|
||||||
if (wfRaw === null) {
|
if (wfRaw === null) {
|
||||||
workflow = null;
|
workflow = null;
|
||||||
} else {
|
} else {
|
||||||
const parsed = parseWorkflowTrigger(wfRaw);
|
const parsed = parseSenseTrigger(wfRaw);
|
||||||
if (!parsed.ok) return err(parsed.error);
|
if (!parsed.ok) return err(parsed.error);
|
||||||
workflow = parsed.value;
|
workflow = parsed.value;
|
||||||
}
|
}
|
||||||
@@ -412,24 +412,15 @@ function parseSenseWorkflowTriggerMsg(obj: Record<string, unknown>): Result<Work
|
|||||||
new Error("Worker 'sense-workflow-trigger' message missing object 'workflow' field"),
|
new Error("Worker 'sense-workflow-trigger' message missing object 'workflow' field"),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
const wf = obj.workflow;
|
const parsed = parseSenseTrigger(obj.workflow);
|
||||||
if (typeof wf.name !== "string")
|
if (!parsed.ok) return err(parsed.error);
|
||||||
return err(new Error("Worker 'sense-workflow-trigger' workflow missing string 'name'"));
|
if (parsed.value.kind !== "workflow") {
|
||||||
if (typeof wf.maxRounds !== "number")
|
return err(new Error("Worker 'sense-workflow-trigger' expects kind \"workflow\""));
|
||||||
return err(new Error("Worker 'sense-workflow-trigger' workflow missing number 'maxRounds'"));
|
}
|
||||||
if (typeof wf.prompt !== "string")
|
|
||||||
return err(new Error("Worker 'sense-workflow-trigger' workflow missing string 'prompt'"));
|
|
||||||
if (typeof wf.dryRun !== "boolean")
|
|
||||||
return err(new Error("Worker 'sense-workflow-trigger' workflow missing boolean 'dryRun'"));
|
|
||||||
return ok({
|
return ok({
|
||||||
type: "sense-workflow-trigger",
|
type: "sense-workflow-trigger",
|
||||||
sense: obj.sense,
|
sense: obj.sense,
|
||||||
workflow: {
|
workflow: parsed.value,
|
||||||
name: wf.name,
|
|
||||||
maxRounds: wf.maxRounds,
|
|
||||||
prompt: wf.prompt,
|
|
||||||
dryRun: wf.dryRun,
|
|
||||||
},
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ import {
|
|||||||
type HealthInfo,
|
type HealthInfo,
|
||||||
type NerveConfig,
|
type NerveConfig,
|
||||||
type SenseInfo,
|
type SenseInfo,
|
||||||
type WorkflowTrigger,
|
type SenseTrigger,
|
||||||
senseTriggerLabels,
|
senseTriggerLabels,
|
||||||
} from "@uncaged/nerve-core";
|
} from "@uncaged/nerve-core";
|
||||||
|
|
||||||
@@ -145,7 +145,7 @@ export function createKernel(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function handleComputeResult(senseName: string, workflow: WorkflowTrigger | null): void {
|
function handleComputeResult(senseName: string, workflow: SenseTrigger | null): void {
|
||||||
logStore.append({
|
logStore.append({
|
||||||
source: "sense",
|
source: "sense",
|
||||||
type: "compute-complete",
|
type: "compute-complete",
|
||||||
@@ -155,18 +155,28 @@ export function createKernel(
|
|||||||
});
|
});
|
||||||
|
|
||||||
if (workflow !== null) {
|
if (workflow !== null) {
|
||||||
workflowManager.startWorkflow(workflow.name, {
|
if (workflow.kind === "workflow") {
|
||||||
prompt: workflow.prompt,
|
workflowManager.startWorkflow(workflow.name, {
|
||||||
maxRounds: workflow.maxRounds,
|
prompt: workflow.prompt,
|
||||||
dryRun: workflow.dryRun,
|
maxRounds: workflow.maxRounds,
|
||||||
});
|
dryRun: workflow.dryRun,
|
||||||
logStore.append({
|
});
|
||||||
source: "sense",
|
logStore.append({
|
||||||
type: "workflow-launch",
|
source: "sense",
|
||||||
refId: senseName,
|
type: "workflow-launch",
|
||||||
payload: JSON.stringify(workflow),
|
refId: senseName,
|
||||||
timestamp: Date.now(),
|
payload: JSON.stringify(workflow),
|
||||||
});
|
timestamp: Date.now(),
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
logStore.append({
|
||||||
|
source: "sense",
|
||||||
|
type: "shell-launch",
|
||||||
|
refId: senseName,
|
||||||
|
payload: JSON.stringify(workflow),
|
||||||
|
timestamp: Date.now(),
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
scheduler.onComputeComplete(senseName);
|
scheduler.onComputeComplete(senseName);
|
||||||
scheduler.onSenseCompleted(senseName);
|
scheduler.onSenseCompleted(senseName);
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
import { existsSync, mkdirSync, readFileSync, renameSync, writeFileSync } from "node:fs";
|
import { existsSync, mkdirSync, readFileSync, renameSync, writeFileSync } from "node:fs";
|
||||||
import { dirname, join } from "node:path";
|
import { dirname, join } from "node:path";
|
||||||
|
|
||||||
import type { Result, SenseComputeFn, WorkflowTrigger } from "@uncaged/nerve-core";
|
import type { Result, SenseComputeFn, SenseTrigger } from "@uncaged/nerve-core";
|
||||||
import { err, isPlainRecord, ok } from "@uncaged/nerve-core";
|
import { err, isPlainRecord, ok } from "@uncaged/nerve-core";
|
||||||
|
|
||||||
/** All state held for one sense inside a worker */
|
/** All state held for one sense inside a worker */
|
||||||
@@ -19,7 +19,9 @@ export function readState(statePath: string, initialState: unknown): unknown {
|
|||||||
return JSON.parse(raw);
|
return JSON.parse(raw);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
const msg = e instanceof Error ? e.message : String(e);
|
const msg = e instanceof Error ? e.message : String(e);
|
||||||
process.stderr.write(`[sense-runtime] warning: failed to read state from "${statePath}": ${msg} — using initialState\n`);
|
process.stderr.write(
|
||||||
|
`[sense-runtime] warning: failed to read state from "${statePath}": ${msg} — using initialState\n`,
|
||||||
|
);
|
||||||
return initialState;
|
return initialState;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -72,7 +74,7 @@ export async function loadSenseModule(
|
|||||||
export async function executeCompute(
|
export async function executeCompute(
|
||||||
runtime: SenseRuntime,
|
runtime: SenseRuntime,
|
||||||
timeoutMs?: number,
|
timeoutMs?: number,
|
||||||
): Promise<Result<{ state: unknown; workflow: WorkflowTrigger | null }>> {
|
): Promise<Result<{ state: unknown; workflow: SenseTrigger | null }>> {
|
||||||
const controller = new AbortController();
|
const controller = new AbortController();
|
||||||
|
|
||||||
let timer: ReturnType<typeof setTimeout> | undefined;
|
let timer: ReturnType<typeof setTimeout> | undefined;
|
||||||
|
|||||||
@@ -14,11 +14,12 @@
|
|||||||
|
|
||||||
import "./experimental-warning-suppression.js";
|
import "./experimental-warning-suppression.js";
|
||||||
|
|
||||||
|
import { spawn } from "node:child_process";
|
||||||
import { readFileSync } from "node:fs";
|
import { readFileSync } from "node:fs";
|
||||||
import { join, resolve } from "node:path";
|
import { join, resolve } from "node:path";
|
||||||
|
|
||||||
import { parseNerveConfig } from "@uncaged/nerve-core";
|
import { parseNerveConfig } from "@uncaged/nerve-core";
|
||||||
import type { NerveConfig, WorkflowTrigger } from "@uncaged/nerve-core";
|
import type { NerveConfig, SenseTrigger } from "@uncaged/nerve-core";
|
||||||
|
|
||||||
import type { WorkerToParentMessage } from "./ipc.js";
|
import type { WorkerToParentMessage } from "./ipc.js";
|
||||||
import { parseParentMessage } from "./ipc.js";
|
import { parseParentMessage } from "./ipc.js";
|
||||||
@@ -42,11 +43,25 @@ function sendReady(): void {
|
|||||||
|
|
||||||
function sendComputeResult(
|
function sendComputeResult(
|
||||||
sense: string,
|
sense: string,
|
||||||
value: { state: unknown; workflow: WorkflowTrigger | null },
|
value: { state: unknown; workflow: SenseTrigger | null },
|
||||||
): void {
|
): void {
|
||||||
send({ type: "compute-result", sense, state: value.state, workflow: value.workflow });
|
send({ type: "compute-result", sense, state: value.state, workflow: value.workflow });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function executeShellTriggerIfNeeded(nerveRoot: string, trigger: SenseTrigger | null): void {
|
||||||
|
if (trigger === null || trigger.kind !== "shell") return;
|
||||||
|
const child = spawn(trigger.command, {
|
||||||
|
shell: true,
|
||||||
|
cwd: nerveRoot,
|
||||||
|
detached: true,
|
||||||
|
stdio: "ignore",
|
||||||
|
});
|
||||||
|
child.on("error", (err) => {
|
||||||
|
process.stderr.write(`[sense-worker] shell trigger failed: ${err.message}\n`);
|
||||||
|
});
|
||||||
|
child.unref();
|
||||||
|
}
|
||||||
|
|
||||||
function sendError(sense: string, error: string): void {
|
function sendError(sense: string, error: string): void {
|
||||||
send({ type: "error", sense, error });
|
send({ type: "error", sense, error });
|
||||||
}
|
}
|
||||||
@@ -132,6 +147,7 @@ async function runCompute(
|
|||||||
runtime: SenseRuntime,
|
runtime: SenseRuntime,
|
||||||
timeoutMs: number,
|
timeoutMs: number,
|
||||||
gracePeriodMs: number | null,
|
gracePeriodMs: number | null,
|
||||||
|
nerveRoot: string,
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
try {
|
try {
|
||||||
const result = await executeCompute(runtime, timeoutMs);
|
const result = await executeCompute(runtime, timeoutMs);
|
||||||
@@ -143,6 +159,7 @@ async function runCompute(
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
clearGracePeriodTimer(senseName);
|
clearGracePeriodTimer(senseName);
|
||||||
|
executeShellTriggerIfNeeded(nerveRoot, result.value.workflow);
|
||||||
sendComputeResult(senseName, result.value);
|
sendComputeResult(senseName, result.value);
|
||||||
} catch (e: unknown) {
|
} catch (e: unknown) {
|
||||||
const errMsg = e instanceof Error ? e.message : String(e);
|
const errMsg = e instanceof Error ? e.message : String(e);
|
||||||
@@ -160,6 +177,7 @@ function handleMessage(
|
|||||||
group: string,
|
group: string,
|
||||||
senseConfigs: Map<string, { timeout: number | null; gracePeriod: number | null }>,
|
senseConfigs: Map<string, { timeout: number | null; gracePeriod: number | null }>,
|
||||||
inFlight: Map<string, Promise<void>>,
|
inFlight: Map<string, Promise<void>>,
|
||||||
|
nerveRoot: string,
|
||||||
): void {
|
): void {
|
||||||
const parseResult = parseParentMessage(raw);
|
const parseResult = parseParentMessage(raw);
|
||||||
if (!parseResult.ok) {
|
if (!parseResult.ok) {
|
||||||
@@ -196,7 +214,7 @@ function handleMessage(
|
|||||||
|
|
||||||
const previous = inFlight.get(msg.sense) ?? Promise.resolve();
|
const previous = inFlight.get(msg.sense) ?? Promise.resolve();
|
||||||
const next = previous
|
const next = previous
|
||||||
.then(() => runCompute(msg.sense, runtime, timeoutMs, gracePeriodMs))
|
.then(() => runCompute(msg.sense, runtime, timeoutMs, gracePeriodMs, nerveRoot))
|
||||||
.catch((e: unknown) => {
|
.catch((e: unknown) => {
|
||||||
const errMsg = e instanceof Error ? e.message : String(e);
|
const errMsg = e instanceof Error ? e.message : String(e);
|
||||||
sendError(msg.sense, errMsg);
|
sendError(msg.sense, errMsg);
|
||||||
@@ -257,7 +275,7 @@ async function bootstrap(nerveRoot: string, group: string): Promise<void> {
|
|||||||
sendReady();
|
sendReady();
|
||||||
|
|
||||||
process.on("message", (raw: unknown) => {
|
process.on("message", (raw: unknown) => {
|
||||||
handleMessage(raw, runtimes, group, senseConfigs, inFlight);
|
handleMessage(raw, runtimes, group, senseConfigs, inFlight, nerveRoot);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user