Compare commits

...

8 Commits

Author SHA1 Message Date
xiaoju 8dd82d99da refactor(core): remove WorkflowTrigger from SenseTrigger — shell only
Senses trigger shell commands only. Workflows are invoked via CLI.

SenseTrigger is now { command: string } — no discriminated union.

Closes #318

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-02 12:33:38 +00:00
xiaomo 5ec0c71ee3 Merge pull request 'refactor(core): rename workflow→trigger in sense return, capture shell stderr' (#317) from refactor/316-followup into main 2026-05-02 11:12:09 +00:00
xiaoju 52a03d7de4 refactor(core): rename workflow→trigger in sense return, capture shell stderr
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-02 10:47:22 +00:00
xiaomo 1aeb23f75a Merge pull request 'feat(core): sense trigger supports arbitrary shell commands' (#316) from feat/315-shell-trigger into main 2026-05-02 10:20:02 +00:00
xiaoju b9b804eac5 feat(core): sense trigger supports arbitrary shell commands
Extend SenseComputeReturn to support shell triggers in addition to workflow
triggers via a discriminated union (kind: 'shell' | 'workflow').

Shell triggers execute a command string in the sense worker subprocess
(spawned detached). The kernel logs 'shell-launch' events without involving
the workflow manager.

Breaking change: WorkflowTrigger now requires kind: 'workflow'.
New ShellTrigger type: { kind: 'shell', command: string }.
SenseTrigger = WorkflowTrigger | ShellTrigger.

Closes #315
2026-05-02 10:00:23 +00:00
xiaomo 6b8c917358 Merge pull request 'feat(cli): nerve agent inject claude — RFC #289 Phase 4' (#306) from feat/agent-inject-claude into main 2026-05-02 02:10:07 +00:00
xiaomo d442a64275 Merge pull request 'fix: harden state persistence (follow-up #313)' (#314) from fix/313-state-persistence-hardening into main 2026-05-01 12:03:31 +00:00
xiaoju 02c5e8bea6 fix(daemon): harden state persistence, ReadonlyArray triggers
1. writeState: atomic write via temp file + rename
2. readState: distinguish missing file vs corrupt JSON (warn on error)
3. executeCompute: write disk before updating memory state
4. SenseInfo.triggers: ReadonlyArray<string>
5. CLAUDE.md: added Sense State Persistence docs

Fixes #313
2026-05-01 12:01:50 +00:00
33 changed files with 357 additions and 522 deletions
+8 -9
View File
@@ -17,17 +17,16 @@ export async function compute(): Promise<ComputeResult<T>> { ... } // pure, no
- No database access within compute — runtime provides isolated execution context
- Must be pure function (no side effects, no external API calls)
**Return Value Contract:**
- `ComputeResult<T>` = `null | { signal: T; workflow: WorkflowTrigger | null }`
- `null` → silent, no storage, no signal
- `{ signal: data, workflow: null }` → persist + emit signal
- `{ signal, workflow: WorkflowTrigger }` → persist + emit signal + trigger workflow
- Any other value → treated as `{ signal: value, workflow: null }`
**Return Value Contract (current engine):**
- `compute(state)` returns `Promise<{ state: S; trigger: SenseTrigger | null }>` where `SenseTrigger = { command: string }`.
- `trigger: null` → persist state only; no shell command
- `trigger: { command }` → persist state; worker runs the command with `shell: true` after a successful compute
- Workflows are **not** started from `trigger`; use CLI / daemon IPC (`nerve workflow trigger`, etc.).
**Error Handling & Serialization:**
- Exceptions caught by worker, logged as errors (no signal emitted)
- Signal payload must be JSON-serializable (passed via IPC)
- Invalid workflow triggers silently dropped (signal still emitted)
- Exceptions caught by worker, logged as errors (state unchanged)
- State must be JSON-serializable (persisted to `data/senses/<name>.json`)
- Invalid `trigger` shapes fail IPC validation when the worker sends `compute-result`
**Timeout & Scheduling Semantics:**
- Timeout priority: explicit config → AbortSignal → DEFAULT_TIMEOUT_MS (30s)
+14 -13
View File
@@ -1,30 +1,31 @@
# Sense compute → workflow (RFC #308)
# Sense compute → shell + scheduler (issue #318)
Stateful senses no longer emit signals or pass outputs through `routeSenseComputeOutput`. The worker runs `compute(state)` and returns `{ state, workflow }`.
Stateful senses run `compute(state)` and return `{ state, trigger }`. The worker persists state JSON and sends `compute-result` to the kernel. Optional side effects are **shell commands only**, executed in the sense worker. Workflows are not started from sense return values.
## Flow
```
Sense worker: compute(state) → { state, workflow }
Sense worker: compute(state) → { state, trigger }
persist state JSON (data/senses/<name>.json)
IPC compute-result → kernel
trigger !== null → spawn shell command (cwd = nerve root)
workflow !== nullparseWorkflowTrigger (validation) → workflowManager.startWorkflow
scheduler.onSenseCompleted(senseName) → dependents with `on: [senseName]`
IPC compute-resultkernel (audit: shell-launch log)
scheduler.onSenseCompleted(senseName) → dependents with `on: [senseName]`
```
## Workflow trigger shape
Workflow runs: **`workflowManager.startWorkflow`** from CLI / daemon IPC only (`nerve workflow trigger`, HTTP when enabled).
When `workflow` is non-null it must be a plain object validated by `parseWorkflowTrigger()` in `packages/core/src/sense.ts`:
## Sense trigger shape
- `name`: non-empty string
- `maxRounds`: integer ≥ 1
- `prompt`: string
- `dryRun`: boolean
When `trigger` is non-null it must be a plain object validated by `parseSenseTrigger()` in `packages/core/src/sense.ts`:
Invalid triggers are rejected by the daemon when handling the worker message (workflow is not started).
- Exactly one property: `command` (non-empty string after trim)
- No `kind` field; no workflow fields
Invalid triggers are rejected when parsing the worker message.
## Scheduling
+17 -2
View File
@@ -28,6 +28,19 @@ External World → Sense(state) → { newState, workflow? } → Workflow → Log
### Sense State Persistence
Each sense's state is persisted as a JSON file at `data/senses/<name>.json` (relative to the nerve root, typically `~/.uncaged-nerve/`).
| Event | Behavior |
|-------|----------|
| **Worker start** | Read `state.json`; if missing or corrupt, use `initialState` from the sense module |
| **Compute success** | Write new state atomically (write-temp + rename), then update in-memory state |
| **Compute failure** | State unchanged (both disk and memory) |
| **Daemon restart** | State restored from last successful write |
State files are written atomically (temp file + rename) to prevent corruption on crash.
## Language & Paradigm
### Functional-first
@@ -91,10 +104,12 @@ type SenseConfig = {
For mutually exclusive fields, use discriminated unions:
```typescript
// ✅ Good — sense modules return explicit next state + optional workflow trigger
import type { SenseTrigger } from "@uncaged/nerve-core";
// ✅ Good — sense modules return explicit next state + optional shell trigger only
type SenseComputeReturn<S> = {
state: S;
workflow: WorkflowTrigger | null;
trigger: SenseTrigger | null;
};
```
+7 -5
View File
@@ -7,19 +7,21 @@ Nerve is a lightweight daemon that continuously observes external state through
## Core Concepts
```
External World → Sense(state) → { newState, workflow? } → Workflow → Log
External World → Sense(state) → { state, trigger? } → (shell in worker) / Log
Workflow → Log (CLI / daemon IPC only)
scheduling: interval / on (per sense in nerve.yaml)
```
| Concept | Metaphor | Role |
|---------|----------|------|
| **Sense** | 👁️ Perception | Stateful `compute(state)` returning `{ state, workflow }`. State lives in `data/senses/<name>.json`. |
| **Sense** | 👁️ Perception | Stateful `compute(state)` returning `{ state, trigger }`. State lives in `data/senses/<name>.json`. |
| **Schedule** | ⏱️ When | Each sense entry sets optional `interval` (periodic) and `on: [other senses]` (run after those senses complete a compute). |
| **Workflow** | 🔧 Action | Stateful multi-step execution with Roles and a Moderator. Started when `workflow` is non-null in the compute result, or via CLI/daemon IPC. |
| **Workflow** | 🔧 Action | Stateful multi-step execution with Roles and a Moderator. Started via CLI / daemon IPC (`nerve workflow trigger`, transport). Not started from sense `compute()` results. |
| **Log** | 📝 Record | Immutable audit trail. **Cannot** schedule senses or workflows (prevents feedback loops). |
**Sense → Workflow:** when `workflow` is a structured object `{ name, maxRounds, prompt, dryRun }`, the kernel validates it (`@uncaged/nerve-core` `parseWorkflowTrigger`) and starts that workflow. Use `workflow: null` when no run should start.
**Sense → shell:** when `trigger` is non-null it must be `{ command: string }`. The sense worker runs it with `shell: true` (cwd = nerve root). Use `trigger: null` when no command should run. To start a workflow, invoke it from that shell command (for example calling the CLI) or trigger workflows separately via IPC.
Two extension points for **what to observe (+ when)** vs **multi-step action** — scheduling is declarative config on each sense, not a separate YAML section.
@@ -27,7 +29,7 @@ Two extension points for **what to observe (+ when)** vs **multi-step action**
| Package | Description |
|---------|-------------|
| [`@uncaged/nerve-core`](./packages/core) | Shared types, config parser, workflow trigger validation, daemon IPC protocol |
| [`@uncaged/nerve-core`](./packages/core) | Shared types, config parser, sense trigger validation (`parseSenseTrigger`), daemon IPC protocol |
| [`@uncaged/nerve-store`](./packages/store) | Append-only log SQLite, JSONL archive, CAS blob store, workflow run rows |
| [`@uncaged/nerve-daemon`](./packages/daemon) | Kernel, sense workers, sense scheduler, workflow manager, file watcher, IPC |
| [`@uncaged/nerve-cli`](./packages/cli) | CLI (`nerve`) — init, validate, daemon, dev, logs, sense, store, workflow |
+1 -1
View File
@@ -117,7 +117,7 @@
| 项目 | 位置 | 说明 | 置信度 | 建议 |
|------|------|------|--------|------|
| ~~已更名 API 仍出现在 README~~ | `packages/core/README.md` | (已修正)文档与 stateful sense、`parseWorkflowTrigger` 对齐;`routeSenseComputeOutput` 已移除 | — | 关闭 |
| ~~已更名 API 仍出现在 README~~ | `packages/core/README.md` | (已修正)文档与 stateful sense、`parseSenseTrigger`(shell-only)对齐 | — | 关闭 |
| Hermes 选项合并注释 | `packages/workflow-utils/src/shared/hermes-agent.ts` | 注释称 absorbed from `hermes-options.ts`,该文件已不存在 | **中** | **清理注释**,避免误导。 |
| `KNOWN_AGENT_ADAPTER_IDS``codex` | `packages/core/src/agent.ts` | 仓内无 `codex` 适配器包;与常量未被引用叠加 | **中** | **对齐产品**:实现适配器或从列表移除。 |
+10 -16
View File
@@ -212,10 +212,10 @@ extract:
### compute 函数签名
Sense 的 `compute` 接收当前状态,返回新状态和可选的 workflow trigger。状态以 JSON 文件持久化在 `data/senses/<name>.json`
Sense 的 `compute` 接收当前状态,返回新状态和可选的 shell trigger(`{ command: string }`。状态以 JSON 文件持久化在 `data/senses/<name>.json`Workflow 只能通过 CLI / daemon IPC 启动,不能从 sense 返回值直接启动。
```typescript
import type { SenseComputeFn, WorkflowTrigger } from "@uncaged/nerve-core";
import type { SenseComputeFn } from "@uncaged/nerve-core";
type MyState = {
lastRun: number | null;
@@ -226,11 +226,11 @@ export const initialState: MyState = { lastRun: null, count: 0 };
export async function compute(state: MyState): Promise<{
state: MyState;
workflow: WorkflowTrigger | null;
trigger: { command: string } | null;
}> {
return {
state: { lastRun: Date.now(), count: state.count + 1 },
workflow: null,
trigger: null,
};
}
```
@@ -247,15 +247,9 @@ export async function compute(state: MyState): Promise<{
### 返回值
```typescript
// workflow: null → 不触发 workflow
// workflow: WorkflowTrigger → 触发 workflow
type WorkflowTrigger = {
name: string; // workflow 名称(对应 nerve.yaml 中的 key)
maxRounds: number; // moderator 最大轮次
prompt: string; // 初始 prompt
dryRun: boolean; // 干跑模式
};
// trigger: null → 不执行 shell 命令
// trigger: { command } → sense worker 在成功的 compute 后以 shell 执行该命令(cwd = nerve 根目录)
// 启动 workflow:在 shell 中调用 `nerve workflow trigger ...`,或使用 daemon IPC / HTTP API
```
### Sense 模块导出
@@ -271,7 +265,7 @@ export const initialState: MyState = { ... };
// 2. compute 函数
export async function compute(state: MyState): Promise<{
state: MyState;
workflow: WorkflowTrigger | null;
trigger: { command: string } | null;
}> {
// ...
}
@@ -304,12 +298,12 @@ export const initialState: CpuState = { samples: [] };
export async function compute(state: CpuState): Promise<{
state: CpuState;
workflow: null;
trigger: null;
}> {
const [oneMin] = loadavg();
const value = typeof oneMin === "number" && !Number.isNaN(oneMin) ? oneMin : 0;
const newSamples = [...state.samples.slice(-99), { ts: Date.now(), value }];
return { state: { samples: newSamples }, workflow: null };
return { state: { samples: newSamples }, trigger: null };
}
```
+1 -16
View File
@@ -252,22 +252,7 @@ export async function compute(): Promise<ComputeResult<MySignalShape>> {
### 返回值
```typescript
// 返回 null = 静默,不发 signal
// 返回非 null = 发出 signal(并写入业务表),可选触发 workflow
type ComputeResult<T> =
| null
| { signal: T; workflow: WorkflowTrigger | null };
type WorkflowTrigger = {
name: string; // workflow 名称(对应 nerve.yaml 中的 key)
maxRounds: number; // moderator 最大轮次
prompt: string; // 初始 prompt
dryRun: boolean; // 干跑模式
};
```
若返回值是普通对象且不含 `signal` 字段,内核会按 shorthand 视为 `{ signal: payload, workflow: null }`(见 core 的 `routeSenseComputeOutput`)。
当前引擎:`compute(state)` 返回 `{ state, trigger }`,`trigger` 为 `null` 或 `{ command: string }`(shell 命令)。Workflow 仅通过 CLI / daemon IPC 启动;类型见 `@uncaged/nerve-core` 的 `SenseComputeFn` / `SenseTrigger`。
### Sense 模块导出
+3 -18
View File
@@ -245,22 +245,7 @@ export async function compute(): Promise<ComputeResult<MySignalShape>> {
### 返回值
```typescript
// 返回 null = 静默,不发 signal
// 返回非 null = 发出 signal(并写入业务表),可选触发 workflow
type ComputeResult<T> =
| null
| { signal: T; workflow: WorkflowTrigger | null };
type WorkflowTrigger = {
name: string; // workflow 名称(对应 nerve.yaml 中的 key)
maxRounds: number; // moderator 最大轮次
prompt: string; // 初始 prompt
dryRun: boolean; // 干跑模式
};
```
若返回值是普通对象且不含 `signal` 字段,内核会按 shorthand 视为 `{ signal: payload, workflow: null }`(见 core 的 `routeSenseComputeOutput`)。
当前引擎:`compute(state)` 返回 `{ state, trigger }`,其中 `trigger``null``{ command: string }`(sense worker 内 `shell: true` 执行)。Workflow 仅通过 CLI / daemon IPC 启动,类型见 `@uncaged/nerve-core``SenseComputeFn` / `SenseTrigger`
### Sense 模块导出
@@ -273,7 +258,7 @@ type Row = { ts: number; value: number };
export async function compute(): Promise<ComputeResult<Row>> {
const row: Row = { ts: Date.now(), value: Math.random() }; // 替换为真实观测逻辑
return { signal: row, workflow: null };
return { signal: row, trigger: null };
}
export { table };
@@ -325,7 +310,7 @@ type Row = { ts: number; value: number };
export async function compute(): Promise<ComputeResult<Row>> {
const oneMin = os.loadavg()[0];
return { signal: { ts: Date.now(), value: oneMin }, workflow: null };
return { signal: { ts: Date.now(), value: oneMin }, trigger: null };
}
export { table };
@@ -26,7 +26,7 @@ describe("buildSenseIndexTs", () => {
expect(ts).toContain("type SenseState");
expect(ts).toContain("export const initialState");
expect(ts).toContain("export async function compute");
expect(ts).toContain("workflow: null");
expect(ts).toContain("trigger: null");
expect(ts).toContain("lastRun");
});
});
+4 -30
View File
@@ -120,29 +120,7 @@ const counterIndexJs = `export const initialState = { count: 0 };
export async function compute(state) {
return {
state: { count: state.count + 1 },
workflow: null,
};
}
`;
/** First trigger launches local noop workflow; later triggers only advance idleTicks. */
const counterIndexJsWithNoopWorkflow = `export const initialState = { launched: false, idleTicks: 0 };
export async function compute(state) {
if (!state.launched) {
return {
state: { launched: true, idleTicks: state.idleTicks },
workflow: {
name: "noop",
maxRounds: 3,
prompt: "e2e-archive",
dryRun: false,
},
};
}
return {
state: { launched: state.launched, idleTicks: state.idleTicks + 1 },
workflow: null,
trigger: null,
};
}
`;
@@ -204,11 +182,7 @@ function writeWorkspaceLayout(nerveRoot: string, withNoopWorkflow: boolean): voi
withNoopWorkflow ? nerveYamlWithNoopWorkflow : nerveYamlTemplate,
"utf8",
);
writeFileSync(
join(nerveRoot, "dist", "senses", "counter", "index.js"),
withNoopWorkflow ? counterIndexJsWithNoopWorkflow : counterIndexJs,
"utf8",
);
writeFileSync(join(nerveRoot, "dist", "senses", "counter", "index.js"), counterIndexJs, "utf8");
writeFileSync(
join(nerveRoot, "dist", "workflows", "echo", "index.js"),
echoWorkflowIndexJs,
@@ -234,8 +208,8 @@ export type TestDaemonHandle = {
export type StartTestDaemonOpts = {
/**
* When true, counter sense's first compute launches a local `noop` workflow (real
* workflow-worker child). Requires built `workflow-worker.js` next to `sense-worker.js`.
* When true, bundles a local `noop` workflow under `dist/workflows/noop` for tests that
* start runs via `nerve workflow trigger` (real workflow-worker child).
*/
withNoopWorkflow: boolean;
} | null;
@@ -46,8 +46,14 @@ describe("e2e store archive", () => {
daemon = await startTestDaemon({ withNoopWorkflow: true });
linkWorkspaceDaemonIntoNerveRoot(daemon.nerveRoot);
const triggerResult = await runCli(daemon, ["sense", "trigger", "counter"]);
expect(triggerResult.exitCode).toBe(0);
const wfTrigger = await runCli(daemon, [
"workflow",
"trigger",
"noop",
"--prompt",
"e2e-archive",
]);
expect(wfTrigger.exitCode).toBe(0);
const logsDb = join(daemon.nerveRoot, "data", "logs.db");
await pollUntil(() => {
@@ -101,8 +107,14 @@ describe("e2e store archive", () => {
daemon = await startTestDaemon({ withNoopWorkflow: true });
linkWorkspaceDaemonIntoNerveRoot(daemon.nerveRoot);
const triggerResult = await runCli(daemon, ["sense", "trigger", "counter"]);
expect(triggerResult.exitCode).toBe(0);
const wfTrigger = await runCli(daemon, [
"workflow",
"trigger",
"noop",
"--prompt",
"e2e-archive",
]);
expect(wfTrigger.exitCode).toBe(0);
const logsDb = join(daemon.nerveRoot, "data", "logs.db");
await pollUntil(() => {
+2 -2
View File
@@ -94,12 +94,12 @@ export const initialState: SenseState = { lastRun: null };
export async function compute(state: SenseState): Promise<{
state: SenseState;
workflow: null;
trigger: null;
}> {
// TODO: implement sense logic
return {
state: { lastRun: Date.now() },
workflow: null,
trigger: null,
};
}
`;
+2 -2
View File
@@ -224,12 +224,12 @@ export const initialState: CpuState = { samples: [] };
export async function compute(state: CpuState): Promise<{
state: CpuState;
workflow: null;
trigger: null;
}> {
const [oneMin] = loadavg();
const value = typeof oneMin === "number" && !Number.isNaN(oneMin) ? oneMin : 0;
const newSamples = [...state.samples.slice(-99), { ts: Date.now(), value }];
return { state: { samples: newSamples }, workflow: null };
return { state: { samples: newSamples }, trigger: null };
}
`;
+8 -13
View File
@@ -4,9 +4,9 @@ Shared types and configuration parser for the [nerve](../../README.md) observati
## What's Inside
- **Type definitions** — `SenseConfig`, `SenseInfo`, `SenseComputeFn`, `SenseModule`, `WorkflowConfig`, `NerveConfig`, `WorkflowTrigger`, and related types
- **Type definitions** — `SenseConfig`, `SenseInfo`, `SenseComputeFn`, `SenseModule`, `SenseTrigger`, `WorkflowConfig`, `NerveConfig`, and related types
- **Config parser** — `parseNerveConfig(yaml)` validates and parses `nerve.yaml` into `NerveConfig` (top-level `reflexes` is rejected; use `interval` / `on` on each sense)
- **Workflow triggers** — `parseWorkflowTrigger` validates structured workflow launch objects from Sense compute results or IPC
- **Sense triggers** — `parseSenseTrigger` validates `{ command: string }` from sense compute results or worker IPC (`trigger` field on `compute-result`)
- **Daemon IPC protocol** — request/response types (`DaemonIpcRequest`, `DaemonIpcResponse`, …) and `parseDaemonIpcRequest` for newline-delimited JSON on the CLI ↔ daemon socket
- **Workflow automaton types** — `START` / `END` sentinel constants, `WorkflowMessage`, `StartStep`, `RoleStep`, `ModeratorContext` (`start` + `steps`; empty `steps` on first moderator call), `Moderator` (single `context` argument), `WorkflowDefinition`, `Role`, `RoleResult`, plus `DEFAULT_ENGINE_MAX_ROUNDS`
- **Result type** — `Result<T>` with `ok()` / `err()` helpers for explicit error handling (no thrown exceptions for parse paths)
@@ -23,23 +23,18 @@ if (result.ok) {
}
```
### Workflow trigger validation
### Sense trigger validation
```typescript
import { parseWorkflowTrigger } from "@uncaged/nerve-core";
import { parseSenseTrigger } from "@uncaged/nerve-core";
const directive = parseWorkflowTrigger({
name: "my-workflow",
maxRounds: 8,
prompt: "Hello from sense",
dryRun: false,
});
if (directive.ok) {
console.log(directive.value.name, directive.value.maxRounds, directive.value.prompt);
const parsed = parseSenseTrigger({ command: "echo hello" });
if (parsed.ok) {
console.log(parsed.value.command);
}
```
Sense modules return `{ state, workflow }` from `compute(state)`; when `workflow` is non-null it must satisfy the shape validated by `parseWorkflowTrigger` (the daemon validates before starting a run).
Sense modules return `{ state, trigger }` from `compute(state)`; when `trigger` is non-null it must be exactly `{ command: string }` (non-empty after trim). The daemon validates worker IPC with `parseSenseTrigger`. Workflows are started only via CLI / daemon IPC, not from this field.
## Duration Format
@@ -0,0 +1,50 @@
import { describe, expect, it } from "vitest";
import { parseSenseTrigger } from "../sense.js";
describe("parseSenseTrigger", () => {
it("accepts a valid command trigger", () => {
const r = parseSenseTrigger({ command: "echo hi" });
expect(r.ok).toBe(true);
if (!r.ok) return;
expect(r.value).toEqual({ command: "echo hi" });
});
it("trims command", () => {
const r = parseSenseTrigger({ command: " echo hi " });
expect(r.ok).toBe(true);
if (!r.ok) return;
expect(r.value).toEqual({ command: "echo hi" });
});
it("rejects empty command", () => {
const r = parseSenseTrigger({ command: "" });
expect(r.ok).toBe(false);
});
it("rejects whitespace-only command", () => {
const r = parseSenseTrigger({ command: " " });
expect(r.ok).toBe(false);
});
it("rejects non-string command", () => {
const r = parseSenseTrigger({ command: 1 as unknown as string });
expect(r.ok).toBe(false);
});
it("rejects non-object", () => {
expect(parseSenseTrigger(null).ok).toBe(false);
expect(parseSenseTrigger("x").ok).toBe(false);
expect(parseSenseTrigger([]).ok).toBe(false);
});
it("rejects extra properties", () => {
const r = parseSenseTrigger({ command: "x", kind: "shell" });
expect(r.ok).toBe(false);
});
it("rejects empty object", () => {
const r = parseSenseTrigger({});
expect(r.ok).toBe(false);
});
});
@@ -1,59 +0,0 @@
import { describe, expect, it } from "vitest";
import { parseWorkflowTrigger } from "../sense.js";
describe("parseWorkflowTrigger", () => {
it("accepts a valid trigger object", () => {
const r = parseWorkflowTrigger({
name: "my-wf",
maxRounds: 3,
prompt: "go",
dryRun: true,
});
expect(r.ok).toBe(true);
if (!r.ok) return;
expect(r.value).toEqual({ name: "my-wf", maxRounds: 3, prompt: "go", dryRun: true });
});
it("trims workflow name", () => {
const r = parseWorkflowTrigger({
name: " spaced ",
maxRounds: 1,
prompt: "",
dryRun: false,
});
expect(r.ok).toBe(true);
if (!r.ok) return;
expect(r.value.name).toBe("spaced");
});
it("rejects empty name", () => {
const r = parseWorkflowTrigger({ name: "", maxRounds: 1, prompt: "x", dryRun: false });
expect(r.ok).toBe(false);
});
it("rejects non-integer maxRounds", () => {
const r = parseWorkflowTrigger({
name: "w",
maxRounds: 1.5,
prompt: "",
dryRun: false,
});
expect(r.ok).toBe(false);
});
it("rejects maxRounds < 1", () => {
const r = parseWorkflowTrigger({ name: "w", maxRounds: 0, prompt: "", dryRun: false });
expect(r.ok).toBe(false);
});
it("rejects non-boolean dryRun", () => {
const r = parseWorkflowTrigger({
name: "w",
maxRounds: 1,
prompt: "",
dryRun: "no" as unknown as boolean,
});
expect(r.ok).toBe(false);
});
});
+7 -6
View File
@@ -52,12 +52,13 @@ export type ExtractConfig = {
model: string;
};
/** Parameters for starting a workflow from a Sense compute result (or CLI trigger). */
export type WorkflowTrigger = {
name: string;
maxRounds: number;
prompt: string;
dryRun: boolean;
/**
* Optional shell side effect after a successful sense `compute()`.
* Executed in the sense worker (`spawn` with `shell: true`, cwd = nerve root).
* Workflows are started only via CLI / daemon IPC, not from sense compute results.
*/
export type SenseTrigger = {
command: string;
};
export type NerveConfig = {
+2 -2
View File
@@ -7,7 +7,7 @@ export type {
AgentConfig,
ExtractConfig,
NerveConfig,
WorkflowTrigger,
SenseTrigger,
} from "./config.js";
export type { SenseInfo } from "./sense.js";
export type { SenseComputeFn, SenseModule } from "./sense.js";
@@ -44,7 +44,7 @@ export type { KnowledgeConfig } from "./config.js";
export { parseKnowledgeYaml } from "./config.js";
export { isPlainRecord } from "./util.js";
export { parseWorkflowTrigger } from "./sense.js";
export { parseSenseTrigger } from "./sense.js";
export { isSenseInfo, isWorkflowStatus } from "./daemon.js";
export type {
+15 -24
View File
@@ -1,4 +1,4 @@
import type { SenseConfig, WorkflowTrigger } from "./config.js";
import type { SenseConfig, SenseTrigger } from "./config.js";
import { type Result, err, isPlainRecord, ok } from "./util.js";
/** Runtime metadata for a sense (e.g. daemon list-senses IPC). */
@@ -8,7 +8,7 @@ export type SenseInfo = {
throttle: number | null;
timeout: number | null;
/** Declarative schedule (`interval` / `on`) for this sense (derived from nerve.yaml). */
triggers: string[];
triggers: ReadonlyArray<string>;
};
/**
@@ -16,11 +16,11 @@ export type SenseInfo = {
* `compute` export.
*
* Pure: no DB, no peers.
* Returns the next sense state and an optional workflow to start (`workflow: null` means no workflow).
* Returns the next sense state and an optional trigger (`trigger: null` means no side effect).
*/
export type SenseComputeFn<S = unknown> = (
state: S,
) => Promise<{ state: S; workflow: WorkflowTrigger | null }>;
) => Promise<{ state: S; trigger: SenseTrigger | null }>;
/**
* The full shape a sense module (`src/index.ts`) must export.
@@ -69,28 +69,19 @@ export function senseTriggerLabels(
return [labelSenseTrigger({ interval: sc.interval, on: sc.on })];
}
/**
* Validates a structured workflow trigger object from Sense compute or IPC.
*/
export function parseWorkflowTrigger(value: unknown): Result<WorkflowTrigger> {
/** Validates `{ command: string }` from Sense compute or IPC (`trigger` field). */
export function parseSenseTrigger(value: unknown): Result<SenseTrigger> {
if (!isPlainRecord(value)) {
return err(new Error("workflow trigger must be a plain object"));
return err(new Error("sense trigger must be a plain object"));
}
const nameRaw = value.name;
if (typeof nameRaw !== "string" || nameRaw.trim().length === 0) {
return err(new Error('workflow trigger: "name" must be a non-empty string'));
for (const key of Object.keys(value)) {
if (key !== "command") {
return err(new Error(`sense trigger: unexpected property "${key}"`));
}
}
const maxRounds = value.maxRounds;
if (typeof maxRounds !== "number" || !Number.isInteger(maxRounds) || maxRounds < 1) {
return err(new Error('workflow trigger: "maxRounds" must be an integer >= 1'));
const command = value.command;
if (typeof command !== "string" || command.trim().length === 0) {
return err(new Error('sense trigger: "command" must be a non-empty string'));
}
const prompt = value.prompt;
if (typeof prompt !== "string") {
return err(new Error('workflow trigger: "prompt" must be a string'));
}
const dryRun = value.dryRun;
if (typeof dryRun !== "boolean") {
return err(new Error('workflow trigger: "dryRun" must be a boolean'));
}
return ok({ name: nameRaw.trim(), maxRounds, prompt, dryRun });
return ok({ command: command.trim() });
}
@@ -79,7 +79,7 @@ describe("createFileWatcher", () => {
await new Promise((r) => setTimeout(r, 100));
writeFileSync(
join(root, "senses", "cpu-usage", "index.js"),
"export const initialState = {}; export async function compute(state) { return { state, workflow: null }; }",
"export const initialState = {}; export async function compute(state) { return { state, trigger: null }; }",
);
await waitFor(() => changes.length > 0, 3000);
@@ -35,7 +35,7 @@ process.on("message", (msg) => {
type: "compute-result",
sense: msg.sense,
state: 42,
workflow: null,
trigger: null,
});
}
});
@@ -9,7 +9,7 @@
*
* Behaviour:
* - Sends { type: "ready" } on startup
* - On { type: "compute", sense } → sends back compute-result with state + workflow:null
* - On { type: "compute", sense } → sends back compute-result with state + trigger:null
* - On { type: "shutdown" } → exits cleanly with code 0
*/
@@ -27,7 +27,7 @@ process.on("message", (msg) => {
type: "compute-result",
sense: msg.sense,
state: 42,
workflow: null,
trigger: null,
});
}
});
@@ -22,7 +22,7 @@ process.on("message", (msg) => {
type: "compute-result",
sense: msg.sense,
state: "late",
workflow: null,
trigger: null,
});
}, 10_000);
}
@@ -1,8 +1,9 @@
/**
* Integration tests for Kernel + WorkflowManager integration.
*
* Verifies that sense compute-result IPC triggers workflow runs when `workflow`
* is non-null; that workflow events are logged; that reloadConfig handles workflow changes;
* Verifies that workflow runs are started via `workflowManager.startWorkflow` (CLI / IPC path);
* that sense compute-result with a shell trigger does not start workflows;
* that workflow events are logged; that reloadConfig handles workflow changes;
* and that graceful shutdown stops workflow workers.
*
* Uses mocked child_process.fork to avoid real subprocesses.
@@ -57,7 +58,7 @@ function makeMockChild(pid = 1): MockChild {
type: "compute-result",
sense: m.sense,
state: 42,
workflow: null,
trigger: null,
});
});
}
@@ -153,20 +154,10 @@ describe("kernel + workflowManager integration", () => {
rmSync(nerveRoot, { recursive: true, force: true });
});
describe("sense compute triggers workflow via return value", () => {
it("calls workflowManager.startWorkflow when a sense compute returns a workflow launch", async () => {
describe("workflowManager.startWorkflow", () => {
it("spawns a workflow worker and sends start-thread", async () => {
const logStore = makeLogStore();
const config = makeConfig({
senses: {
"cpu-usage": {
group: "system",
throttle: null,
timeout: null,
gracePeriod: null,
interval: null,
on: [],
},
},
workflows: { "my-workflow": { concurrency: 2, overflow: "drop" } },
});
@@ -177,24 +168,14 @@ describe("kernel + workflowManager integration", () => {
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
const workerPool = mockChildren[0];
if (workerPool) {
workerPool.emit("message", {
type: "compute-result",
sense: "cpu-usage",
state: { reason: "test" },
workflow: {
name: "my-workflow",
maxRounds: 10,
prompt: "run this workflow",
dryRun: false,
},
});
}
kernel.workflowManager.startWorkflow("my-workflow", {
prompt: "run this workflow",
maxRounds: 10,
dryRun: false,
});
await vi.runAllTimersAsync();
// A workflow worker should be spawned and a start-thread message sent
const workflowWorker = mockChildren.find((c) =>
(c.send as ReturnType<typeof vi.fn>).mock.calls.some(
(args: unknown[]) =>
@@ -210,19 +191,9 @@ describe("kernel + workflowManager integration", () => {
await stopPromise;
});
it("passes prompt and maxRounds from the workflow field to the workflow", async () => {
it("passes prompt and maxRounds on start-thread", async () => {
const logStore = makeLogStore();
const config = makeConfig({
senses: {
"cpu-usage": {
group: "system",
throttle: null,
timeout: null,
gracePeriod: null,
interval: null,
on: [],
},
},
workflows: { "alert-workflow": { concurrency: 1, overflow: "drop" } },
});
@@ -233,24 +204,14 @@ describe("kernel + workflowManager integration", () => {
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
const workerPool = mockChildren[0];
if (workerPool) {
workerPool.emit("message", {
type: "compute-result",
sense: "cpu-usage",
state: { level: "critical" },
workflow: {
name: "alert-workflow",
maxRounds: 5,
prompt: "handle critical alert",
dryRun: false,
},
});
}
kernel.workflowManager.startWorkflow("alert-workflow", {
prompt: "handle critical alert",
maxRounds: 5,
dryRun: false,
});
await vi.runAllTimersAsync();
// Find the start-thread call and verify triggerPayload
const startThreadCall = mockChildren
.flatMap((c) => (c.send as ReturnType<typeof vi.fn>).mock.calls as [unknown][])
.find(
@@ -273,8 +234,10 @@ describe("kernel + workflowManager integration", () => {
await vi.runAllTimersAsync();
await stopPromise;
});
});
it("logs compute-complete before workflow-launch when workflow is present", async () => {
describe("sense compute-result triggers", () => {
it("logs compute-complete before shell-launch when shell trigger is present", async () => {
const logStore = makeLogStore();
const config = makeConfig({
workflows: { "order-wf": { concurrency: 1, overflow: "drop" } },
@@ -293,12 +256,7 @@ describe("kernel + workflowManager integration", () => {
type: "compute-result",
sense: "cpu-usage",
state: { seq: 1 },
workflow: {
name: "order-wf",
maxRounds: 2,
prompt: "p",
dryRun: true,
},
trigger: { command: "echo order-test" },
});
}
@@ -309,16 +267,16 @@ describe("kernel + workflowManager integration", () => {
.filter((e) => e.source === "sense" && e.refId === "cpu-usage");
const typeOrder = senseEntries.map((e) => e.type);
const completeAt = typeOrder.indexOf("compute-complete");
const launchAt = typeOrder.indexOf("workflow-launch");
const shellAt = typeOrder.indexOf("shell-launch");
expect(completeAt).toBeGreaterThanOrEqual(0);
expect(launchAt).toBeGreaterThan(completeAt);
expect(shellAt).toBeGreaterThan(completeAt);
const stopPromise = kernel.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
it("does not trigger workflow when compute-result has workflow null", async () => {
it("does not trigger workflow when compute-result has trigger null", async () => {
const logStore = makeLogStore();
const config = makeConfig({
senses: {
@@ -355,11 +313,10 @@ describe("kernel + workflowManager integration", () => {
type: "compute-result",
sense: "cpu-usage",
state: 50,
workflow: null,
trigger: null,
});
}
// No workflow should have been started
const workflowWorkerSpawned = mockChildren.some((c) =>
(c.send as ReturnType<typeof vi.fn>).mock.calls.some(
(args: unknown[]) =>
@@ -374,25 +331,10 @@ describe("kernel + workflowManager integration", () => {
await vi.runAllTimersAsync();
await stopPromise;
});
});
describe("workflow events are logged", () => {
it("logs a 'started' event when workflow thread is triggered via sense compute", async () => {
it("logs shell-launch and does not start a workflow for shell triggers", async () => {
const logStore = makeLogStore();
const config = makeConfig({
senses: {
"cpu-usage": {
group: "system",
throttle: null,
timeout: null,
gracePeriod: null,
interval: null,
on: [],
},
},
workflows: { "log-test-workflow": { concurrency: 2, overflow: "drop" } },
});
const config = makeConfig({ workflows: {} });
const kernel = createKernel(config, nerveRoot, {
workerScript: "fake-worker.js",
logStore,
@@ -405,18 +347,58 @@ describe("kernel + workflowManager integration", () => {
workerPool.emit("message", {
type: "compute-result",
sense: "cpu-usage",
state: { note: "log" },
workflow: {
name: "log-test-workflow",
maxRounds: 10,
prompt: "test prompt",
dryRun: false,
state: {},
trigger: {
command: "echo nerve-shell-test",
},
});
}
await vi.runAllTimersAsync();
const shellLaunch = logStore.append.mock.calls
.map((c) => c[0] as { source: string; type: string })
.find((e) => e.type === "shell-launch");
expect(shellLaunch).toBeDefined();
const startThread = mockChildren
.flatMap((c) => (c.send as ReturnType<typeof vi.fn>).mock.calls as [unknown][])
.some(
([msg]) =>
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "start-thread",
);
expect(startThread).toBe(false);
const stopPromise = kernel.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
});
describe("workflow events are logged", () => {
it("logs a 'started' event when workflow thread is started via workflowManager", async () => {
const logStore = makeLogStore();
const config = makeConfig({
workflows: { "log-test-workflow": { concurrency: 2, overflow: "drop" } },
});
const kernel = createKernel(config, nerveRoot, {
workerScript: "fake-worker.js",
logStore,
});
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
kernel.workflowManager.startWorkflow("log-test-workflow", {
prompt: "test prompt",
maxRounds: 10,
dryRun: false,
});
await vi.runAllTimersAsync();
expect(logStore.upsertWorkflowRun).toHaveBeenCalledWith(
expect.objectContaining({ source: "workflow", type: "started" }),
expect.objectContaining({ workflow: "log-test-workflow", status: "started" }),
@@ -432,16 +414,6 @@ describe("kernel + workflowManager integration", () => {
it("new workflows are available after reloadConfig", async () => {
const logStore = makeLogStore();
const initialConfig = makeConfig({
senses: {
"cpu-usage": {
group: "system",
throttle: null,
timeout: null,
gracePeriod: null,
interval: null,
on: [],
},
},
workflows: {},
maxRounds: 10,
});
@@ -453,7 +425,6 @@ describe("kernel + workflowManager integration", () => {
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
// Reload with a workflow added
const newConfig: NerveConfig = {
senses: {
"cpu-usage": {
@@ -472,20 +443,11 @@ describe("kernel + workflowManager integration", () => {
};
kernel.reloadConfig(newConfig);
const workerPool = mockChildren[0];
if (workerPool) {
workerPool.emit("message", {
type: "compute-result",
sense: "cpu-usage",
state: { phase: "reload" },
workflow: {
name: "new-workflow",
maxRounds: 10,
prompt: "reload test",
dryRun: false,
},
});
}
kernel.workflowManager.startWorkflow("new-workflow", {
prompt: "reload test",
maxRounds: 10,
dryRun: false,
});
await vi.runAllTimersAsync();
@@ -509,16 +471,6 @@ describe("kernel + workflowManager integration", () => {
it("old workflows are removed after reloadConfig", async () => {
const logStore = makeLogStore();
const initialConfig = makeConfig({
senses: {
"cpu-usage": {
group: "system",
throttle: null,
timeout: null,
gracePeriod: null,
interval: null,
on: [],
},
},
workflows: { "old-workflow": { concurrency: 1, overflow: "drop" } },
});
@@ -529,7 +481,6 @@ describe("kernel + workflowManager integration", () => {
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
// Reload with the workflow removed
const newConfig: NerveConfig = {
senses: {
"cpu-usage": {
@@ -548,25 +499,15 @@ describe("kernel + workflowManager integration", () => {
};
kernel.reloadConfig(newConfig);
// Clear send history
for (const c of mockChildren) {
(c.send as ReturnType<typeof vi.fn>).mockClear();
}
const workerPool = mockChildren[0];
if (workerPool) {
workerPool.emit("message", {
type: "compute-result",
sense: "cpu-usage",
state: { stale: true },
workflow: {
name: "old-workflow",
maxRounds: 10,
prompt: "should not work",
dryRun: false,
},
});
}
kernel.workflowManager.startWorkflow("old-workflow", {
prompt: "should not work",
maxRounds: 10,
dryRun: false,
});
await vi.runAllTimersAsync();
@@ -591,16 +532,6 @@ describe("kernel + workflowManager integration", () => {
it("stop() resolves after workflow workers exit", async () => {
const logStore = makeLogStore();
const config = makeConfig({
senses: {
"cpu-usage": {
group: "system",
throttle: null,
timeout: null,
gracePeriod: null,
interval: null,
on: [],
},
},
workflows: { "shutdown-test": { concurrency: 1, overflow: "drop" } },
});
@@ -611,20 +542,11 @@ describe("kernel + workflowManager integration", () => {
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
const workerPool = mockChildren[0];
if (workerPool) {
workerPool.emit("message", {
type: "compute-result",
sense: "cpu-usage",
state: { shutdownCase: true },
workflow: {
name: "shutdown-test",
maxRounds: 10,
prompt: "test",
dryRun: false,
},
});
}
kernel.workflowManager.startWorkflow("shutdown-test", {
prompt: "test",
maxRounds: 10,
dryRun: false,
});
await vi.runAllTimersAsync();
@@ -655,16 +577,6 @@ describe("kernel + workflowManager integration", () => {
it("getHealth includes activeWorkflows count", async () => {
const logStore = makeLogStore();
const config = makeConfig({
senses: {
"cpu-usage": {
group: "system",
throttle: null,
timeout: null,
gracePeriod: null,
interval: null,
on: [],
},
},
workflows: { "health-wf": { concurrency: 2, overflow: "drop" } },
});
+3 -3
View File
@@ -41,7 +41,7 @@ function makeMockChild(pid = 1): MockChild {
type: "compute-result",
sense: m.sense,
state: 42,
workflow: null,
trigger: null,
});
});
}
@@ -140,7 +140,7 @@ describe("kernel — message routing", () => {
type: "compute-result",
sense: "cpu-usage",
state: 42,
workflow: null,
trigger: null,
});
}).not.toThrow();
@@ -171,7 +171,7 @@ describe("kernel — message routing", () => {
type: "compute-result",
sense: "cpu-usage",
state: 123,
workflow: null,
trigger: null,
});
const rows = logStore.query({
source: "sense",
@@ -63,7 +63,7 @@ describe("executeCompute", () => {
it("passes state into compute and persists returned state", async () => {
const path = makeTempStatePath();
const runtime = makeRuntime(
async (s) => ({ state: { n: s.n + 1 }, workflow: null }),
async (s) => ({ state: { n: s.n + 1 }, trigger: null }),
{ n: 0 },
path,
);
@@ -71,7 +71,7 @@ describe("executeCompute", () => {
const result = await executeCompute(runtime);
expect(result.ok).toBe(true);
if (!result.ok) return;
expect(result.value).toEqual({ state: { n: 1 }, workflow: null });
expect(result.value).toEqual({ state: { n: 1 }, trigger: null });
expect(runtime.state).toEqual({ n: 1 });
expect(JSON.parse(readFileSync(path, "utf8"))).toEqual({ n: 1 });
});
@@ -93,7 +93,7 @@ describe("executeCompute", () => {
it("returns err when compute exceeds timeoutMs", async () => {
const runtime = makeRuntime(
async (s) =>
new Promise((resolve) => setTimeout(() => resolve({ state: s, workflow: null }), 5_000)),
new Promise((resolve) => setTimeout(() => resolve({ state: s, trigger: null }), 5_000)),
{ n: 0 },
);
@@ -104,7 +104,7 @@ describe("executeCompute", () => {
});
it("completes within timeout when compute is fast", async () => {
const runtime = makeRuntime(async (s) => ({ state: { n: s.n }, workflow: null }), { n: 42 });
const runtime = makeRuntime(async (s) => ({ state: { n: s.n }, trigger: null }), { n: 42 });
const result = await executeCompute(runtime, 5_000);
expect(result.ok).toBe(true);
if (!result.ok) return;
@@ -84,13 +84,13 @@ describe("createSenseWorkerPool", () => {
type: "compute-result",
sense: "s",
state: 1,
workflow: null,
trigger: null,
});
expect(onWorkerMessage).toHaveBeenCalledWith({
type: "compute-result",
sense: "s",
state: 1,
workflow: null,
trigger: null,
});
});
+16 -56
View File
@@ -3,8 +3,8 @@
* Protocol per RFC §5.2: hub-and-spoke, all messages through engine.
*/
import type { Result, WorkflowTrigger } from "@uncaged/nerve-core";
import { err, isPlainRecord, ok, parseWorkflowTrigger } from "@uncaged/nerve-core";
import type { Result, SenseTrigger } from "@uncaged/nerve-core";
import { err, isPlainRecord, ok, parseSenseTrigger } from "@uncaged/nerve-core";
/** Parent → Worker: trigger one compute cycle for a sense */
export type ComputeMessage = {
@@ -65,19 +65,12 @@ export type ParentToWorkerMessage =
| ResumeThreadMessage
| KillThreadMessage;
/** Worker → Parent: sense compute finished (state persisted in worker; workflow optional). */
/** Worker → Parent: sense compute finished (state persisted in worker; optional shell trigger). */
export type ComputeResultMessage = {
type: "compute-result";
sense: string;
state: unknown;
workflow: WorkflowTrigger | null;
};
/** Worker → Parent: sense compute result includes a workflow to start */
export type SenseWorkflowTriggerMessage = {
type: "sense-workflow-trigger";
sense: string;
workflow: WorkflowTrigger;
trigger: SenseTrigger | null;
};
/** Worker → Parent: compute threw or returned an unexpected error */
@@ -147,8 +140,7 @@ export type WorkerToParentMessage =
| HealthResponseMessage
| ThreadEventMessage
| WorkflowErrorMessage
| ThreadWorkflowMessageMessage
| SenseWorkflowTriggerMessage;
| ThreadWorkflowMessageMessage;
const PARENT_MSG_TYPES = new Set([
"compute",
@@ -255,26 +247,26 @@ function parseComputeResultMsg(obj: Record<string, unknown>): Result<WorkerToPar
if (!("state" in obj)) {
return err(new Error("Worker 'compute-result' message missing 'state' field"));
}
if (!("workflow" in obj)) {
return err(new Error("Worker 'compute-result' message missing 'workflow' field"));
if (!("trigger" in obj)) {
return err(new Error("Worker 'compute-result' message missing 'trigger' field"));
}
const wfRaw = obj.workflow;
if (wfRaw !== null && !isPlainRecord(wfRaw)) {
return err(new Error("Worker 'compute-result' workflow must be an object or null"));
const triggerRaw = obj.trigger;
if (triggerRaw !== null && !isPlainRecord(triggerRaw)) {
return err(new Error("Worker 'compute-result' trigger must be an object or null"));
}
let workflow: WorkflowTrigger | null;
if (wfRaw === null) {
workflow = null;
let trigger: SenseTrigger | null;
if (triggerRaw === null) {
trigger = null;
} else {
const parsed = parseWorkflowTrigger(wfRaw);
const parsed = parseSenseTrigger(triggerRaw);
if (!parsed.ok) return err(parsed.error);
workflow = parsed.value;
trigger = parsed.value;
}
return ok({
type: "compute-result",
sense: obj.sense,
state: obj.state,
workflow,
trigger,
});
}
@@ -365,7 +357,6 @@ const WORKER_MSG_TYPES = new Set([
"thread-event",
"workflow-error",
"thread-workflow-message",
"sense-workflow-trigger",
]);
function parseThreadWorkflowMessageMsg(
@@ -403,36 +394,6 @@ function parseThreadWorkflowMessageMsg(
});
}
function parseSenseWorkflowTriggerMsg(obj: Record<string, unknown>): Result<WorkerToParentMessage> {
if (typeof obj.sense !== "string") {
return err(new Error("Worker 'sense-workflow-trigger' message missing string 'sense' field"));
}
if (!isPlainRecord(obj.workflow)) {
return err(
new Error("Worker 'sense-workflow-trigger' message missing object 'workflow' field"),
);
}
const wf = obj.workflow;
if (typeof wf.name !== "string")
return err(new Error("Worker 'sense-workflow-trigger' workflow missing string 'name'"));
if (typeof wf.maxRounds !== "number")
return err(new Error("Worker 'sense-workflow-trigger' workflow missing number 'maxRounds'"));
if (typeof wf.prompt !== "string")
return err(new Error("Worker 'sense-workflow-trigger' workflow missing string 'prompt'"));
if (typeof wf.dryRun !== "boolean")
return err(new Error("Worker 'sense-workflow-trigger' workflow missing boolean 'dryRun'"));
return ok({
type: "sense-workflow-trigger",
sense: obj.sense,
workflow: {
name: wf.name,
maxRounds: wf.maxRounds,
prompt: wf.prompt,
dryRun: wf.dryRun,
},
});
}
/** Validate and parse an unknown IPC message received from a worker process. */
export function parseWorkerMessage(raw: unknown): Result<WorkerToParentMessage> {
if (!isPlainRecord(raw)) {
@@ -451,6 +412,5 @@ export function parseWorkerMessage(raw: unknown): Result<WorkerToParentMessage>
if (obj.type === "thread-event") return parseThreadEventMsg(obj);
if (obj.type === "workflow-error") return parseWorkflowErrorMsg(obj);
if (obj.type === "thread-workflow-message") return parseThreadWorkflowMessageMsg(obj);
if (obj.type === "sense-workflow-trigger") return parseSenseWorkflowTriggerMsg(obj);
return ok({ type: "ready" });
}
+6 -11
View File
@@ -12,7 +12,7 @@ import {
type HealthInfo,
type NerveConfig,
type SenseInfo,
type WorkflowTrigger,
type SenseTrigger,
senseTriggerLabels,
} from "@uncaged/nerve-core";
@@ -145,7 +145,7 @@ export function createKernel(
}
}
function handleComputeResult(senseName: string, workflow: WorkflowTrigger | null): void {
function handleComputeResult(senseName: string, trigger: SenseTrigger | null): void {
logStore.append({
source: "sense",
type: "compute-complete",
@@ -154,17 +154,12 @@ export function createKernel(
timestamp: Date.now(),
});
if (workflow !== null) {
workflowManager.startWorkflow(workflow.name, {
prompt: workflow.prompt,
maxRounds: workflow.maxRounds,
dryRun: workflow.dryRun,
});
if (trigger !== null) {
logStore.append({
source: "sense",
type: "workflow-launch",
type: "shell-launch",
refId: senseName,
payload: JSON.stringify(workflow),
payload: JSON.stringify(trigger),
timestamp: Date.now(),
});
}
@@ -202,7 +197,7 @@ export function createKernel(
}
if (msg.type === "compute-result") {
handleComputeResult(msg.sense, msg.workflow);
handleComputeResult(msg.sense, msg.trigger);
}
}
+16 -8
View File
@@ -1,7 +1,7 @@
import { mkdirSync, readFileSync, writeFileSync } from "node:fs";
import { dirname } from "node:path";
import { existsSync, mkdirSync, readFileSync, renameSync, writeFileSync } from "node:fs";
import { dirname, join } from "node:path";
import type { Result, SenseComputeFn, WorkflowTrigger } from "@uncaged/nerve-core";
import type { Result, SenseComputeFn, SenseTrigger } from "@uncaged/nerve-core";
import { err, isPlainRecord, ok } from "@uncaged/nerve-core";
/** All state held for one sense inside a worker */
@@ -14,16 +14,24 @@ export type SenseRuntime = {
export function readState(statePath: string, initialState: unknown): unknown {
try {
if (!existsSync(statePath)) return initialState;
const raw = readFileSync(statePath, "utf8");
return JSON.parse(raw);
} catch {
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(
`[sense-runtime] warning: failed to read state from "${statePath}": ${msg} — using initialState\n`,
);
return initialState;
}
}
export function writeState(statePath: string, state: unknown): void {
mkdirSync(dirname(statePath), { recursive: true });
writeFileSync(statePath, JSON.stringify(state, null, 2));
const dir = dirname(statePath);
mkdirSync(dir, { recursive: true });
const tmp = join(dir, `.${Date.now()}.tmp`);
writeFileSync(tmp, JSON.stringify(state, null, 2));
renameSync(tmp, statePath);
}
/**
@@ -66,7 +74,7 @@ export async function loadSenseModule(
export async function executeCompute(
runtime: SenseRuntime,
timeoutMs?: number,
): Promise<Result<{ state: unknown; workflow: WorkflowTrigger | null }>> {
): Promise<Result<{ state: unknown; trigger: SenseTrigger | null }>> {
const controller = new AbortController();
let timer: ReturnType<typeof setTimeout> | undefined;
@@ -86,8 +94,8 @@ export async function executeCompute(
? await Promise.race([computePromise, timeoutPromise])
: await computePromise;
runtime.state = result.state;
writeState(runtime.statePath, result.state);
runtime.state = result.state;
return ok(result);
} catch (e) {
+36 -5
View File
@@ -14,11 +14,12 @@
import "./experimental-warning-suppression.js";
import { spawn } from "node:child_process";
import { readFileSync } from "node:fs";
import { join, resolve } from "node:path";
import { parseNerveConfig } from "@uncaged/nerve-core";
import type { NerveConfig, WorkflowTrigger } from "@uncaged/nerve-core";
import type { NerveConfig, SenseTrigger } from "@uncaged/nerve-core";
import type { WorkerToParentMessage } from "./ipc.js";
import { parseParentMessage } from "./ipc.js";
@@ -42,9 +43,36 @@ function sendReady(): void {
function sendComputeResult(
sense: string,
value: { state: unknown; workflow: WorkflowTrigger | null },
value: { state: unknown; trigger: SenseTrigger | null },
): void {
send({ type: "compute-result", sense, state: value.state, workflow: value.workflow });
send({ type: "compute-result", sense, state: value.state, trigger: value.trigger });
}
function executeShellTriggerIfNeeded(nerveRoot: string, trigger: SenseTrigger | null): void {
if (trigger === null) return;
const child = spawn(trigger.command, {
shell: true,
cwd: nerveRoot,
detached: true,
stdio: ["ignore", "ignore", "pipe"],
});
child.on("error", (err) => {
process.stderr.write(`[sense-worker] shell trigger failed: ${err.message}\n`);
});
if (child.stderr) {
let stderrBuf = "";
child.stderr.on("data", (chunk: Buffer) => {
stderrBuf += chunk.toString();
});
child.on("close", (code) => {
if (code !== null && code !== 0 && stderrBuf.length > 0) {
process.stderr.write(
`[sense-worker] shell trigger exited with code ${code}: ${stderrBuf.trimEnd()}\n`,
);
}
});
}
child.unref();
}
function sendError(sense: string, error: string): void {
@@ -132,6 +160,7 @@ async function runCompute(
runtime: SenseRuntime,
timeoutMs: number,
gracePeriodMs: number | null,
nerveRoot: string,
): Promise<void> {
try {
const result = await executeCompute(runtime, timeoutMs);
@@ -143,6 +172,7 @@ async function runCompute(
return;
}
clearGracePeriodTimer(senseName);
executeShellTriggerIfNeeded(nerveRoot, result.value.trigger);
sendComputeResult(senseName, result.value);
} catch (e: unknown) {
const errMsg = e instanceof Error ? e.message : String(e);
@@ -160,6 +190,7 @@ function handleMessage(
group: string,
senseConfigs: Map<string, { timeout: number | null; gracePeriod: number | null }>,
inFlight: Map<string, Promise<void>>,
nerveRoot: string,
): void {
const parseResult = parseParentMessage(raw);
if (!parseResult.ok) {
@@ -196,7 +227,7 @@ function handleMessage(
const previous = inFlight.get(msg.sense) ?? Promise.resolve();
const next = previous
.then(() => runCompute(msg.sense, runtime, timeoutMs, gracePeriodMs))
.then(() => runCompute(msg.sense, runtime, timeoutMs, gracePeriodMs, nerveRoot))
.catch((e: unknown) => {
const errMsg = e instanceof Error ? e.message : String(e);
sendError(msg.sense, errMsg);
@@ -257,7 +288,7 @@ async function bootstrap(nerveRoot: string, group: string): Promise<void> {
sendReady();
process.on("message", (raw: unknown) => {
handleMessage(raw, runtimes, group, senseConfigs, inFlight);
handleMessage(raw, runtimes, group, senseConfigs, inFlight, nerveRoot);
});
}
+1 -1
View File
@@ -35,7 +35,7 @@ export type WorkflowLaunchParams = {
};
export type WorkflowManager = {
/** Trigger a new workflow thread (Sense-driven launch or CLI / IPC). */
/** Trigger a new workflow thread (CLI / daemon IPC). */
startWorkflow: (workflowName: string, launch: WorkflowLaunchParams) => void;
/**
* Kill a running or queued workflow thread by runId.
+12 -28
View File
@@ -150,46 +150,30 @@ export async function compute(
}
```
**Signal + Workflow 联动**:Signal 和 Workflow 是蕴含关系 — 有 signal 才可能触发 workflow,两者不互斥:
**Shell trigger vs Workflow**:Sense `compute` 只能请求 `{ command: string }`,由 worker 执行 shell。要跑 workflow,请在命令里调用 CLI(例如 `nerve workflow trigger <name> ...`)或由外部通过 daemon IPC 触发。
```typescript
export async function compute(db) {
// 示例:异常时在 shell 里触发 workflow(需 PATH 中能调用 nerve)
export async function compute(state) {
const anomaly = detectAnomaly();
if (!anomaly) return null;
if (!anomaly) return { state, trigger: null };
return {
signal: { level: "critical", cpu: anomaly.cpu },
workflow: {
name: "alert",
maxRounds: 5,
prompt: "CPU 持续高负载,需要分析",
dryRun: false,
state: { ...state, lastAlert: Date.now() },
trigger: {
command:
'nerve workflow trigger alert --prompt "CPU 持续高负载" --max-rounds 5',
},
};
// → 先发 Signal,再启动 alert workflow
}
```
**`WorkflowTrigger` 类型**(定义在 `@uncaged/nerve-core`):
**`SenseTrigger` 类型**(`@uncaged/nerve-core`):`{ command: string }`。由 `parseSenseTrigger` 校验(仅允许 `command` 键)。
```typescript
type WorkflowTrigger = {
name: string; // workflow 名称
maxRounds: number; // 最大轮数(>= 1)
prompt: string; // 传递给 workflow 的 prompt
dryRun: boolean; // 是否 dry-run
};
```
**compute 返回值路由规则**(由 `routeSenseComputeOutput()` 决定):
| 返回值 | 行为 |
| `trigger` | 行为 |
|--------|------|
| `null` | 静默,不发 Signal |
| `{ signal: T, workflow: null }` | 发出 **Signal**,不触发 Workflow |
| `{ signal: T, workflow: WorkflowTrigger }` | 先发 **Signal**,再启动 **Workflow** |
| `{ signal: T, workflow: 非法对象 }` | 降级为 signal-only(workflow 被忽略) |
| 裸值(无 `signal` 键) | 兼容模式:整个值作为 signal payload,不触发 workflow |
| `null` | 只持久化 state |
| `{ command }` | 持久化 state + worker 执行 shell 命令 |
### Drizzle Schema 与迁移