refactor(core): rename workflow→trigger in sense return, capture shell stderr #317
@@ -109,7 +109,7 @@ import type { SenseTrigger } from "@uncaged/nerve-core";
|
||||
// ✅ Good — sense modules return explicit next state + optional trigger (workflow or shell)
|
||||
type SenseComputeReturn<S> = {
|
||||
state: S;
|
||||
workflow: SenseTrigger | null;
|
||||
trigger: SenseTrigger | null;
|
||||
};
|
||||
```
|
||||
|
||||
|
||||
@@ -226,11 +226,11 @@ export const initialState: MyState = { lastRun: null, count: 0 };
|
||||
|
||||
export async function compute(state: MyState): Promise<{
|
||||
state: MyState;
|
||||
workflow: WorkflowTrigger | null;
|
||||
trigger: WorkflowTrigger | null;
|
||||
}> {
|
||||
return {
|
||||
state: { lastRun: Date.now(), count: state.count + 1 },
|
||||
workflow: null,
|
||||
trigger: null,
|
||||
};
|
||||
}
|
||||
```
|
||||
@@ -247,8 +247,8 @@ export async function compute(state: MyState): Promise<{
|
||||
### 返回值
|
||||
|
||||
```typescript
|
||||
// workflow: null → 不触发 workflow
|
||||
// workflow: WorkflowTrigger → 触发 workflow
|
||||
// trigger: null → 不触发 workflow
|
||||
// trigger: WorkflowTrigger → 触发 workflow
|
||||
|
||||
type WorkflowTrigger = {
|
||||
name: string; // workflow 名称(对应 nerve.yaml 中的 key)
|
||||
@@ -271,7 +271,7 @@ export const initialState: MyState = { ... };
|
||||
// 2. compute 函数
|
||||
export async function compute(state: MyState): Promise<{
|
||||
state: MyState;
|
||||
workflow: WorkflowTrigger | null;
|
||||
trigger: WorkflowTrigger | null;
|
||||
}> {
|
||||
// ...
|
||||
}
|
||||
@@ -304,12 +304,12 @@ export const initialState: CpuState = { samples: [] };
|
||||
|
||||
export async function compute(state: CpuState): Promise<{
|
||||
state: CpuState;
|
||||
workflow: null;
|
||||
trigger: null;
|
||||
}> {
|
||||
const [oneMin] = loadavg();
|
||||
const value = typeof oneMin === "number" && !Number.isNaN(oneMin) ? oneMin : 0;
|
||||
const newSamples = [...state.samples.slice(-99), { ts: Date.now(), value }];
|
||||
return { state: { samples: newSamples }, workflow: null };
|
||||
return { state: { samples: newSamples }, trigger: null };
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
@@ -250,7 +250,7 @@ export async function compute(): Promise<ComputeResult<MySignalShape>> {
|
||||
// 返回非 null = 发出 signal(并写入业务表),可选触发 workflow
|
||||
type ComputeResult<T> =
|
||||
| null
|
||||
| { signal: T; workflow: WorkflowTrigger | null };
|
||||
| { signal: T; trigger: WorkflowTrigger | null };
|
||||
|
||||
type WorkflowTrigger = {
|
||||
name: string; // workflow 名称(对应 nerve.yaml 中的 key)
|
||||
@@ -260,7 +260,7 @@ type WorkflowTrigger = {
|
||||
};
|
||||
```
|
||||
|
||||
若返回值是普通对象且不含 `signal` 字段,内核会按 shorthand 视为 `{ signal: payload, workflow: null }`(见 core 的 `routeSenseComputeOutput`)。
|
||||
若返回值是普通对象且不含 `signal` 字段,内核会按 shorthand 视为 `{ signal: payload, trigger: null }`(见 core 的 `routeSenseComputeOutput`)。
|
||||
|
||||
### Sense 模块导出
|
||||
|
||||
@@ -273,7 +273,7 @@ type Row = { ts: number; value: number };
|
||||
|
||||
export async function compute(): Promise<ComputeResult<Row>> {
|
||||
const row: Row = { ts: Date.now(), value: Math.random() }; // 替换为真实观测逻辑
|
||||
return { signal: row, workflow: null };
|
||||
return { signal: row, trigger: null };
|
||||
}
|
||||
|
||||
export { table };
|
||||
@@ -325,7 +325,7 @@ type Row = { ts: number; value: number };
|
||||
|
||||
export async function compute(): Promise<ComputeResult<Row>> {
|
||||
const oneMin = os.loadavg()[0];
|
||||
return { signal: { ts: Date.now(), value: oneMin }, workflow: null };
|
||||
return { signal: { ts: Date.now(), value: oneMin }, trigger: null };
|
||||
}
|
||||
|
||||
export { table };
|
||||
|
||||
@@ -26,7 +26,7 @@ describe("buildSenseIndexTs", () => {
|
||||
expect(ts).toContain("type SenseState");
|
||||
expect(ts).toContain("export const initialState");
|
||||
expect(ts).toContain("export async function compute");
|
||||
expect(ts).toContain("workflow: null");
|
||||
expect(ts).toContain("trigger: null");
|
||||
expect(ts).toContain("lastRun");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -120,7 +120,7 @@ const counterIndexJs = `export const initialState = { count: 0 };
|
||||
export async function compute(state) {
|
||||
return {
|
||||
state: { count: state.count + 1 },
|
||||
workflow: null,
|
||||
trigger: null,
|
||||
};
|
||||
}
|
||||
`;
|
||||
@@ -132,7 +132,7 @@ export async function compute(state) {
|
||||
if (!state.launched) {
|
||||
return {
|
||||
state: { launched: true, idleTicks: state.idleTicks },
|
||||
workflow: {
|
||||
trigger: {
|
||||
kind: "workflow",
|
||||
name: "noop",
|
||||
maxRounds: 3,
|
||||
@@ -143,7 +143,7 @@ export async function compute(state) {
|
||||
}
|
||||
return {
|
||||
state: { launched: state.launched, idleTicks: state.idleTicks + 1 },
|
||||
workflow: null,
|
||||
trigger: null,
|
||||
};
|
||||
}
|
||||
`;
|
||||
|
||||
@@ -94,12 +94,12 @@ export const initialState: SenseState = { lastRun: null };
|
||||
|
||||
export async function compute(state: SenseState): Promise<{
|
||||
state: SenseState;
|
||||
workflow: null;
|
||||
trigger: null;
|
||||
}> {
|
||||
// TODO: implement sense logic
|
||||
return {
|
||||
state: { lastRun: Date.now() },
|
||||
workflow: null,
|
||||
trigger: null,
|
||||
};
|
||||
}
|
||||
`;
|
||||
|
||||
@@ -224,12 +224,12 @@ export const initialState: CpuState = { samples: [] };
|
||||
|
||||
export async function compute(state: CpuState): Promise<{
|
||||
state: CpuState;
|
||||
workflow: null;
|
||||
trigger: null;
|
||||
}> {
|
||||
const [oneMin] = loadavg();
|
||||
const value = typeof oneMin === "number" && !Number.isNaN(oneMin) ? oneMin : 0;
|
||||
const newSamples = [...state.samples.slice(-99), { ts: Date.now(), value }];
|
||||
return { state: { samples: newSamples }, workflow: null };
|
||||
return { state: { samples: newSamples }, trigger: null };
|
||||
}
|
||||
`;
|
||||
|
||||
|
||||
@@ -16,11 +16,11 @@ export type SenseInfo = {
|
||||
* `compute` export.
|
||||
*
|
||||
* Pure: no DB, no peers.
|
||||
* Returns the next sense state and an optional trigger (`workflow: null` means no side effect).
|
||||
* Returns the next sense state and an optional trigger (`trigger: null` means no side effect).
|
||||
*/
|
||||
export type SenseComputeFn<S = unknown> = (
|
||||
state: S,
|
||||
) => Promise<{ state: S; workflow: SenseTrigger | null }>;
|
||||
) => Promise<{ state: S; trigger: SenseTrigger | null }>;
|
||||
|
||||
/**
|
||||
* The full shape a sense module (`src/index.ts`) must export.
|
||||
@@ -103,9 +103,7 @@ function parseShellTriggerBranch(value: Record<string, unknown>): Result<ShellTr
|
||||
return ok({ kind: "shell", command: command.trim() });
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates a structured sense trigger from Sense compute or IPC (`workflow` field).
|
||||
*/
|
||||
/** Validates a structured sense trigger from Sense compute or IPC (`trigger` field). */
|
||||
export function parseSenseTrigger(value: unknown): Result<SenseTrigger> {
|
||||
if (!isPlainRecord(value)) {
|
||||
return err(new Error("sense trigger must be a plain object"));
|
||||
|
||||
@@ -79,7 +79,7 @@ describe("createFileWatcher", () => {
|
||||
await new Promise((r) => setTimeout(r, 100));
|
||||
writeFileSync(
|
||||
join(root, "senses", "cpu-usage", "index.js"),
|
||||
"export const initialState = {}; export async function compute(state) { return { state, workflow: null }; }",
|
||||
"export const initialState = {}; export async function compute(state) { return { state, trigger: null }; }",
|
||||
);
|
||||
|
||||
await waitFor(() => changes.length > 0, 3000);
|
||||
|
||||
@@ -35,7 +35,7 @@ process.on("message", (msg) => {
|
||||
type: "compute-result",
|
||||
sense: msg.sense,
|
||||
state: 42,
|
||||
workflow: null,
|
||||
trigger: null,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
@@ -9,7 +9,7 @@
|
||||
*
|
||||
* Behaviour:
|
||||
* - Sends { type: "ready" } on startup
|
||||
* - On { type: "compute", sense } → sends back compute-result with state + workflow:null
|
||||
* - On { type: "compute", sense } → sends back compute-result with state + trigger:null
|
||||
* - On { type: "shutdown" } → exits cleanly with code 0
|
||||
*/
|
||||
|
||||
@@ -27,7 +27,7 @@ process.on("message", (msg) => {
|
||||
type: "compute-result",
|
||||
sense: msg.sense,
|
||||
state: 42,
|
||||
workflow: null,
|
||||
trigger: null,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
@@ -22,7 +22,7 @@ process.on("message", (msg) => {
|
||||
type: "compute-result",
|
||||
sense: msg.sense,
|
||||
state: "late",
|
||||
workflow: null,
|
||||
trigger: null,
|
||||
});
|
||||
}, 10_000);
|
||||
}
|
||||
|
||||
@@ -57,7 +57,7 @@ function makeMockChild(pid = 1): MockChild {
|
||||
type: "compute-result",
|
||||
sense: m.sense,
|
||||
state: 42,
|
||||
workflow: null,
|
||||
trigger: null,
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -183,7 +183,7 @@ describe("kernel + workflowManager integration", () => {
|
||||
type: "compute-result",
|
||||
sense: "cpu-usage",
|
||||
state: { reason: "test" },
|
||||
workflow: {
|
||||
trigger: {
|
||||
kind: "workflow",
|
||||
name: "my-workflow",
|
||||
maxRounds: 10,
|
||||
@@ -240,7 +240,7 @@ describe("kernel + workflowManager integration", () => {
|
||||
type: "compute-result",
|
||||
sense: "cpu-usage",
|
||||
state: { level: "critical" },
|
||||
workflow: {
|
||||
trigger: {
|
||||
kind: "workflow",
|
||||
name: "alert-workflow",
|
||||
maxRounds: 5,
|
||||
@@ -295,7 +295,7 @@ describe("kernel + workflowManager integration", () => {
|
||||
type: "compute-result",
|
||||
sense: "cpu-usage",
|
||||
state: { seq: 1 },
|
||||
workflow: {
|
||||
trigger: {
|
||||
kind: "workflow",
|
||||
name: "order-wf",
|
||||
maxRounds: 2,
|
||||
@@ -358,7 +358,7 @@ describe("kernel + workflowManager integration", () => {
|
||||
type: "compute-result",
|
||||
sense: "cpu-usage",
|
||||
state: 50,
|
||||
workflow: null,
|
||||
trigger: null,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -394,7 +394,7 @@ describe("kernel + workflowManager integration", () => {
|
||||
type: "compute-result",
|
||||
sense: "cpu-usage",
|
||||
state: {},
|
||||
workflow: {
|
||||
trigger: {
|
||||
kind: "shell",
|
||||
command: "echo nerve-shell-test",
|
||||
},
|
||||
@@ -454,7 +454,7 @@ describe("kernel + workflowManager integration", () => {
|
||||
type: "compute-result",
|
||||
sense: "cpu-usage",
|
||||
state: { note: "log" },
|
||||
workflow: {
|
||||
trigger: {
|
||||
kind: "workflow",
|
||||
name: "log-test-workflow",
|
||||
maxRounds: 10,
|
||||
@@ -527,7 +527,7 @@ describe("kernel + workflowManager integration", () => {
|
||||
type: "compute-result",
|
||||
sense: "cpu-usage",
|
||||
state: { phase: "reload" },
|
||||
workflow: {
|
||||
trigger: {
|
||||
kind: "workflow",
|
||||
name: "new-workflow",
|
||||
maxRounds: 10,
|
||||
@@ -609,7 +609,7 @@ describe("kernel + workflowManager integration", () => {
|
||||
type: "compute-result",
|
||||
sense: "cpu-usage",
|
||||
state: { stale: true },
|
||||
workflow: {
|
||||
trigger: {
|
||||
kind: "workflow",
|
||||
name: "old-workflow",
|
||||
maxRounds: 10,
|
||||
@@ -668,7 +668,7 @@ describe("kernel + workflowManager integration", () => {
|
||||
type: "compute-result",
|
||||
sense: "cpu-usage",
|
||||
state: { shutdownCase: true },
|
||||
workflow: {
|
||||
trigger: {
|
||||
kind: "workflow",
|
||||
name: "shutdown-test",
|
||||
maxRounds: 10,
|
||||
|
||||
@@ -41,7 +41,7 @@ function makeMockChild(pid = 1): MockChild {
|
||||
type: "compute-result",
|
||||
sense: m.sense,
|
||||
state: 42,
|
||||
workflow: null,
|
||||
trigger: null,
|
||||
});
|
||||
});
|
||||
}
|
||||
@@ -140,7 +140,7 @@ describe("kernel — message routing", () => {
|
||||
type: "compute-result",
|
||||
sense: "cpu-usage",
|
||||
state: 42,
|
||||
workflow: null,
|
||||
trigger: null,
|
||||
});
|
||||
}).not.toThrow();
|
||||
|
||||
@@ -171,7 +171,7 @@ describe("kernel — message routing", () => {
|
||||
type: "compute-result",
|
||||
sense: "cpu-usage",
|
||||
state: 123,
|
||||
workflow: null,
|
||||
trigger: null,
|
||||
});
|
||||
const rows = logStore.query({
|
||||
source: "sense",
|
||||
|
||||
@@ -63,7 +63,7 @@ describe("executeCompute", () => {
|
||||
it("passes state into compute and persists returned state", async () => {
|
||||
const path = makeTempStatePath();
|
||||
const runtime = makeRuntime(
|
||||
async (s) => ({ state: { n: s.n + 1 }, workflow: null }),
|
||||
async (s) => ({ state: { n: s.n + 1 }, trigger: null }),
|
||||
{ n: 0 },
|
||||
path,
|
||||
);
|
||||
@@ -71,7 +71,7 @@ describe("executeCompute", () => {
|
||||
const result = await executeCompute(runtime);
|
||||
expect(result.ok).toBe(true);
|
||||
if (!result.ok) return;
|
||||
expect(result.value).toEqual({ state: { n: 1 }, workflow: null });
|
||||
expect(result.value).toEqual({ state: { n: 1 }, trigger: null });
|
||||
expect(runtime.state).toEqual({ n: 1 });
|
||||
expect(JSON.parse(readFileSync(path, "utf8"))).toEqual({ n: 1 });
|
||||
});
|
||||
@@ -93,7 +93,7 @@ describe("executeCompute", () => {
|
||||
it("returns err when compute exceeds timeoutMs", async () => {
|
||||
const runtime = makeRuntime(
|
||||
async (s) =>
|
||||
new Promise((resolve) => setTimeout(() => resolve({ state: s, workflow: null }), 5_000)),
|
||||
new Promise((resolve) => setTimeout(() => resolve({ state: s, trigger: null }), 5_000)),
|
||||
{ n: 0 },
|
||||
);
|
||||
|
||||
@@ -104,7 +104,7 @@ describe("executeCompute", () => {
|
||||
});
|
||||
|
||||
it("completes within timeout when compute is fast", async () => {
|
||||
const runtime = makeRuntime(async (s) => ({ state: { n: s.n }, workflow: null }), { n: 42 });
|
||||
const runtime = makeRuntime(async (s) => ({ state: { n: s.n }, trigger: null }), { n: 42 });
|
||||
const result = await executeCompute(runtime, 5_000);
|
||||
expect(result.ok).toBe(true);
|
||||
if (!result.ok) return;
|
||||
|
||||
@@ -84,13 +84,13 @@ describe("createSenseWorkerPool", () => {
|
||||
type: "compute-result",
|
||||
sense: "s",
|
||||
state: 1,
|
||||
workflow: null,
|
||||
trigger: null,
|
||||
});
|
||||
expect(onWorkerMessage).toHaveBeenCalledWith({
|
||||
type: "compute-result",
|
||||
sense: "s",
|
||||
state: 1,
|
||||
workflow: null,
|
||||
trigger: null,
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -70,7 +70,7 @@ export type ComputeResultMessage = {
|
||||
type: "compute-result";
|
||||
sense: string;
|
||||
state: unknown;
|
||||
workflow: SenseTrigger | null;
|
||||
trigger: SenseTrigger | null;
|
||||
};
|
||||
|
||||
/** Worker → Parent: sense compute result includes a workflow to start */
|
||||
@@ -255,26 +255,26 @@ function parseComputeResultMsg(obj: Record<string, unknown>): Result<WorkerToPar
|
||||
if (!("state" in obj)) {
|
||||
return err(new Error("Worker 'compute-result' message missing 'state' field"));
|
||||
}
|
||||
if (!("workflow" in obj)) {
|
||||
return err(new Error("Worker 'compute-result' message missing 'workflow' field"));
|
||||
if (!("trigger" in obj)) {
|
||||
return err(new Error("Worker 'compute-result' message missing 'trigger' field"));
|
||||
}
|
||||
const wfRaw = obj.workflow;
|
||||
const wfRaw = obj.trigger;
|
||||
if (wfRaw !== null && !isPlainRecord(wfRaw)) {
|
||||
return err(new Error("Worker 'compute-result' workflow must be an object or null"));
|
||||
return err(new Error("Worker 'compute-result' trigger must be an object or null"));
|
||||
}
|
||||
let workflow: SenseTrigger | null;
|
||||
let trigger: SenseTrigger | null;
|
||||
if (wfRaw === null) {
|
||||
workflow = null;
|
||||
trigger = null;
|
||||
} else {
|
||||
const parsed = parseSenseTrigger(wfRaw);
|
||||
if (!parsed.ok) return err(parsed.error);
|
||||
workflow = parsed.value;
|
||||
trigger = parsed.value;
|
||||
}
|
||||
return ok({
|
||||
type: "compute-result",
|
||||
sense: obj.sense,
|
||||
state: obj.state,
|
||||
workflow,
|
||||
trigger,
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -145,7 +145,7 @@ export function createKernel(
|
||||
}
|
||||
}
|
||||
|
||||
function handleComputeResult(senseName: string, workflow: SenseTrigger | null): void {
|
||||
function handleComputeResult(senseName: string, trigger: SenseTrigger | null): void {
|
||||
logStore.append({
|
||||
source: "sense",
|
||||
type: "compute-complete",
|
||||
@@ -154,18 +154,18 @@ export function createKernel(
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
|
||||
if (workflow !== null) {
|
||||
if (workflow.kind === "workflow") {
|
||||
workflowManager.startWorkflow(workflow.name, {
|
||||
prompt: workflow.prompt,
|
||||
maxRounds: workflow.maxRounds,
|
||||
dryRun: workflow.dryRun,
|
||||
if (trigger !== null) {
|
||||
if (trigger.kind === "workflow") {
|
||||
workflowManager.startWorkflow(trigger.name, {
|
||||
prompt: trigger.prompt,
|
||||
maxRounds: trigger.maxRounds,
|
||||
dryRun: trigger.dryRun,
|
||||
});
|
||||
logStore.append({
|
||||
source: "sense",
|
||||
type: "workflow-launch",
|
||||
refId: senseName,
|
||||
payload: JSON.stringify(workflow),
|
||||
payload: JSON.stringify(trigger),
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
} else {
|
||||
@@ -173,7 +173,7 @@ export function createKernel(
|
||||
source: "sense",
|
||||
type: "shell-launch",
|
||||
refId: senseName,
|
||||
payload: JSON.stringify(workflow),
|
||||
payload: JSON.stringify(trigger),
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
}
|
||||
@@ -212,7 +212,7 @@ export function createKernel(
|
||||
}
|
||||
|
||||
if (msg.type === "compute-result") {
|
||||
handleComputeResult(msg.sense, msg.workflow);
|
||||
handleComputeResult(msg.sense, msg.trigger);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -74,7 +74,7 @@ export async function loadSenseModule(
|
||||
export async function executeCompute(
|
||||
runtime: SenseRuntime,
|
||||
timeoutMs?: number,
|
||||
): Promise<Result<{ state: unknown; workflow: SenseTrigger | null }>> {
|
||||
): Promise<Result<{ state: unknown; trigger: SenseTrigger | null }>> {
|
||||
const controller = new AbortController();
|
||||
|
||||
let timer: ReturnType<typeof setTimeout> | undefined;
|
||||
|
||||
@@ -43,9 +43,9 @@ function sendReady(): void {
|
||||
|
||||
function sendComputeResult(
|
||||
sense: string,
|
||||
value: { state: unknown; workflow: SenseTrigger | null },
|
||||
value: { state: unknown; trigger: SenseTrigger | null },
|
||||
): void {
|
||||
send({ type: "compute-result", sense, state: value.state, workflow: value.workflow });
|
||||
send({ type: "compute-result", sense, state: value.state, trigger: value.trigger });
|
||||
}
|
||||
|
||||
function executeShellTriggerIfNeeded(nerveRoot: string, trigger: SenseTrigger | null): void {
|
||||
@@ -54,11 +54,24 @@ function executeShellTriggerIfNeeded(nerveRoot: string, trigger: SenseTrigger |
|
||||
shell: true,
|
||||
cwd: nerveRoot,
|
||||
detached: true,
|
||||
stdio: "ignore",
|
||||
stdio: ["ignore", "ignore", "pipe"],
|
||||
});
|
||||
child.on("error", (err) => {
|
||||
process.stderr.write(`[sense-worker] shell trigger failed: ${err.message}\n`);
|
||||
});
|
||||
if (child.stderr) {
|
||||
let stderrBuf = "";
|
||||
child.stderr.on("data", (chunk: Buffer) => {
|
||||
stderrBuf += chunk.toString();
|
||||
});
|
||||
child.on("close", (code) => {
|
||||
if (code !== null && code !== 0 && stderrBuf.length > 0) {
|
||||
process.stderr.write(
|
||||
`[sense-worker] shell trigger exited with code ${code}: ${stderrBuf.trimEnd()}\n`,
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
child.unref();
|
||||
}
|
||||
|
||||
@@ -159,7 +172,7 @@ async function runCompute(
|
||||
return;
|
||||
}
|
||||
clearGracePeriodTimer(senseName);
|
||||
executeShellTriggerIfNeeded(nerveRoot, result.value.workflow);
|
||||
executeShellTriggerIfNeeded(nerveRoot, result.value.trigger);
|
||||
sendComputeResult(senseName, result.value);
|
||||
} catch (e: unknown) {
|
||||
const errMsg = e instanceof Error ? e.message : String(e);
|
||||
|
||||
Reference in New Issue
Block a user