Compare commits

..

30 Commits

Author SHA1 Message Date
tuanzi 5db80c99a0 feat(cli): nerve agent inject cursor — generate .cursorrules
Phase 4 of #289:
- Add cursor target to nerve agent inject/remove/status
- Generate .cursorrules from latest SKILL.md content (aligned with PR #301):
  - Parameterless SenseComputeFn compute signature
  - createRole four-tuple for LLM integration
  - Flat roles/<role>.ts layout
  - Sense persistence pitfall
  - nerve agent CLI docs including cursor commands
- Cursor-specific tips (@file, @Folder, @Terminal references)
- Version tracking via .nerve-version + HTML comment
- --path flag for cursor (project-local, not global like hermes)

Closes #299
Ref: #289
2026-04-30 14:38:19 +00:00
xingyue 4d75c8683f Merge pull request 'docs(cli): sync Hermes SKILL.md with flat workspace and runtime types' (#301) from fix/298-update-hermes-skill into main 2026-04-30 14:20:36 +00:00
xingyue 99b0e58fb6 Merge pull request 'chore: RFC-006 Phase 4 cleanup — delete worker-fork-support.ts' (#300) from chore/rfc-006-cleanup into main 2026-04-30 14:19:44 +00:00
xiaomo a1b1d5eaf1 chore: RFC-006 Phase 4 cleanup — delete worker-fork-support.ts
- Move formatChildExitSummary/formatCapturedStderrTail to worker-runtime.ts
- Move ignoreSessionBroadcastSignals to new worker-signals.ts
- Delete worker-fork-support.ts (teeCapturedStderr no longer used)
- Update .knowledge/worker-isolation.md and architecture.md for WorkerRuntime
- All 167 tests pass, biome check clean

Closes #283
2026-04-30 14:17:16 +00:00
xiaoju 1849789c02 docs(cli): sync Hermes SKILL with workspace AGENT and runtime types
Update sense compute docs to SenseComputeFn (no DB/peers). Document AGENT.md, flat roles, moderator/build helpers, createRole + createLlmAdapter, verb-first naming, nerve agent commands, and root npm/pnpm build.

Fixes #298

Made-with: Cursor
2026-04-30 14:15:14 +00:00
xingyue 7ce46e7735 Merge pull request 'RFC-006 Phase 3: Migrate workflow-manager process logic to WorkerRuntime' (#295) from refactor/rfc-006-workflow-runtime into main 2026-04-30 14:14:05 +00:00
xiaomo 0455f928f5 fix: address review feedback (星月) for Phase 3
1. sendToWorker: IPC send failure now marks thread as failed + dequeues next
2. crashLimitBlocked Set: prevents new startWorkflow from bypassing crash limit
3. "respawning" log skipped when crash limit is active
4. logWorkflowEvent payload: unknown | null (project convention, not ?:)
2026-04-30 14:10:06 +00:00
xingyue 11cedfb5a5 Merge pull request 'refactor(cli): dynamic version for nerve agent — Phase 3 of RFC #289' (#297) from feat/agent-inject-phase3 into main 2026-04-30 14:08:03 +00:00
tuanzi ed1bc4e25f refactor(cli): read CLI version from package.json instead of hardcoding
Phase 3 of #289:
- Replace hardcoded CLI_VERSION constant with dynamic package.json read
- Cache version to avoid repeated fs reads
- Verified: npm pack includes skills/, version detection works correctly

Ref: #289, #296
2026-04-30 14:04:14 +00:00
xiaomo dc4454d23e refactor: migrate workflow-manager process logic to WorkerRuntime (RFC-006 Phase 3)
- workflow-manager.ts: 792 → 498 lines, no more fork/ChildProcess/crash counting
- Extract pure functions to workflow-manager-support.ts (256 lines)
- WorkerRuntime gains: onCrashLimitReached callback, WorkerDrainOpts for per-call timeout
- worker-pool.ts updated for new evict/drain signature (null opts)
- All 167 daemon tests pass

Closes #282
2026-04-30 13:56:43 +00:00
xingyue 7c256620c5 Merge pull request 'feat(cli): nerve agent inject/update/remove/status — Phase 2 of RFC #289' (#294) from feat/agent-inject-phase2 into main 2026-04-30 13:53:39 +00:00
tuanzi 14898e1827 feat(cli): add nerve agent inject/update/remove/status commands
Phase 2 of #289:
- nerve agent inject hermes [--profile <name>]
- nerve agent update (updates all injected skills)
- nerve agent remove hermes [--profile <name>]
- nerve agent status (version check across profiles)
- Include skills/ in npm package files

Ref: #289, #293
2026-04-30 13:46:55 +00:00
xingyue 082d2e72f2 Merge pull request 'refactor: align develop prompts and .knowledge with flat workspace' (#288) from refactor/287-align-prompts-knowledge into main 2026-04-30 13:46:33 +00:00
xingyue fbf63e0266 Merge pull request 'RFC-006 Phase 2: Migrate SenseWorkerPool to WorkerRuntime' (#292) from refactor/rfc-006-worker-runtime into main 2026-04-30 13:44:56 +00:00
xingyue 7d89e8ab61 Merge pull request 'feat(cli): add hermes nerve skill — Phase 1 of RFC #289' (#291) from feat/agent-inject-phase1 into main 2026-04-30 13:42:05 +00:00
xiaomo e67ddc58d8 fix: address review feedback (星月)
1. trySendSync: wrap child.send in try/catch — IPC race between connected check and send
2. gracefulStop: same try/catch for shutdown send
3. Remove crashTimestamps reset on ready — crash window detection was being bypassed
2026-04-30 13:41:31 +00:00
xiaoju 06b1e3d785 refactor(cli,workflow-meta): scaffold AGENT.md on init; align develop prompts
Generate AGENT.md at ~/.uncaged-nerve root during nerve init (layout, verb-first workflows, createRole four-tuple, root build, coding style). Role prompts instruct agents to use cat AGENT.md instead of node_modules nerve-skills paths.

E2E init test asserts AGENT.md. Retain .knowledge workflow/adapter updates and flat single-file roles guidance from the branch.

Fixes #287

Made-with: Cursor
2026-04-30 13:41:24 +00:00
tuanzi f828ebc28b fix: align testing issue commands + add moderator sync pitfall
Address review from 星月:
- Update testing issue #290 to match actual CLI commands
- Add pitfall: moderator must be sync (not async)
- knowledge commands confirmed real (exist in codebase)

Ref: #290
2026-04-30 13:38:56 +00:00
tuanzi 809a11afe3 feat(cli): add hermes nerve skill (Phase 1 of #289)
Add SKILL.md for Hermes Agent covering:
- Core concepts (Sense → Signal → Workflow → Log)
- Complete CLI reference
- Sense development guide with examples
- Workflow development guide with examples
- Daily operation patterns
- nerve.yaml config reference

Ref: #289, #290
2026-04-30 13:36:03 +00:00
xiaomo 4dffcb636b fix: resolve 2 failing tests after WorkerRuntime migration
- Add trySendSync() for synchronous send when worker is ready+connected
- sendCompute uses sync path first, async fallback for cold start
- Add forwardStderr, allowRespawn, hasDisconnectedChild, onReady(key,msg)
- Tests: add connected:true to mocks, flush async fork microtasks
- All 167 daemon tests pass
2026-04-30 13:34:10 +00:00
xiaomo c34ec46416 feat(daemon): WorkerRuntime — generic message-routed process manager (closes #280)
RFC-006 Phase 1: ManagedWorker state machine + WorkerRuntime<K> with
cold start, crash respawn, drain/evict, graceful shutdown.
8 test cases covering all lifecycle scenarios.
2026-04-30 13:09:19 +00:00
xingyue d2bb0275dc Merge pull request 'feat(workflow-utils): add createLlmAdapter AgentFn factory' (#278) from refactor/277-llm-adapter-four-tuple into main 2026-04-30 12:51:29 +00:00
xiaoju 005739f6bc chore(workflow-utils): remove deprecated role factory exports
Remove createCursorRole, createHermesRole, createLlmRole, createReActRole
from public API — all superseded by createRole(adapter, prompt, schema, extract).
Source files retained as internal implementation.

Also remove unused type exports: CliPromptFn, CursorRole*, HermesRole*,
LlmPromptFn, LlmRole*, ReActRole*, ReActTool. Keep LlmMessage and
MetaExtractConfig (used internally).

Refs #277
2026-04-30 12:44:30 +00:00
xiaoju fbe1cc8eba feat(workflow-utils): add createLlmAdapter AgentFn factory
Single-turn chat via chatCompletionText: system from createRole prompt, user from ctx.start.content.

Fixes #277

Made-with: Cursor
2026-04-30 12:38:00 +00:00
xiaomo ba286a2f27 Merge pull request 'refactor(cli): single-package workspace init and root dist build' (#276) from refactor/274-single-package-workspace into main 2026-04-30 11:24:19 +00:00
xiaoju c98e14e9e6 refactor(cli): single-package workspace init and root dist build (#274)
Init templates match ~/.uncaged-nerve: scripts/build.mjs writes dist/senses/*/index.js and dist/workflows/*/index.js; drop @uncaged/nerve-skills from generated package.json; refresh Cursor skills rule copy.

Sense worker sends full compute result on signal IPC so the kernel can route workflow triggers; update e2e harness paths (migrations under senses/, noop under dist/workflows).

Fixes #274

Made-with: Cursor
2026-04-30 10:17:44 +00:00
xiaomo 011345e114 Merge pull request 'refactor(core): consolidate file structure — 22 files → 6' (#275) from refactor/core-file-consolidation into main 2026-04-30 09:21:01 +00:00
xiaoju d9c86c49ae refactor(daemon): load sense/workflow bundles from dist/ directory
Workspace build output moved from senses/<name>/index.js and
workflows/<name>/dist/index.js to dist/senses/<name>/index.js
and dist/workflows/<name>/index.js.

Refs #274
小橘 <xiaoju@shazhou.work>
2026-04-30 09:16:25 +00:00
xiaomo 0d78df89b1 refactor(core): consolidate file structure — 22 files → 6 (closes #273) 2026-04-30 09:15:18 +00:00
xiaomo 0140cdd952 Merge pull request 'refactor: RFC-005 — Separate Agent and Role types' (#272) from refactor/rfc-005-phase-1 into main 2026-04-30 08:29:12 +00:00
68 changed files with 3940 additions and 1536 deletions
+1
View File
@@ -3,3 +3,4 @@ dist
.turbo
*.tsbuildinfo
*.tgz
knowledge.db
+22 -1
View File
@@ -28,8 +28,29 @@ For long-running or incremental agent outputs:
|---------|---------|------|
| `@uncaged/nerve-adapter-cursor` | `cursorAdapter` / `createCursorAdapter()` | cursor-agent CLI |
| `@uncaged/nerve-adapter-hermes` | `hermesAdapter` / `createHermesAdapter()` | hermes chat CLI |
| `@uncaged/nerve-workflow-utils` | `createLlmAdapter(provider)` | OpenAI-compatible HTTP chat (single-turn) |
Each exports a **default instance** (sensible defaults) and a **factory** for custom config.
The Cursor and Hermes adapter packages each export a **default instance** (sensible defaults) and a **factory** for custom config. `createLlmAdapter` is a factory on `@uncaged/nerve-workflow-utils` only.
## createLlmAdapter
`createLlmAdapter` builds an `AgentFn` from an `LlmProvider` (`baseUrl`, `apiKey`, `model`). One chat completion per role step: **system** = the string passed by `createRole` (your prompt); **user** = `ctx.start.content` (the thread’s start frame). On failure it throws with a formatted LLM error.
```ts
import { createLlmAdapter, createRole } from "@uncaged/nerve-workflow-utils";
import { z } from "zod";
const metaSchema = z.object({ ok: z.boolean() });
const planner = createRole(
createLlmAdapter({ baseUrl: "https://api.example.com/v1", apiKey: "…", model: "gpt-4o-mini" }),
"You are a planner…",
metaSchema,
extractConfig,
);
```
Use this when you want a role backed by an HTTP LLM instead of a subprocess CLI adapter.
## Usage in Workflows
+1
View File
@@ -33,6 +33,7 @@ Senses own both the "what" (compute logic) and the "when" (config-driven schedul
- One worker per Workflow type (on-demand)
- Workers never talk to each other
- All user code runs in isolated Workers; kernel never loads user code directly
- **`WorkerRuntime`** (`packages/daemon/src/worker-runtime.ts`) centralizes fork lifecycle for both sense groups (`worker-pool.ts`) and workflow types (`workflow-manager.ts`); see `.knowledge/worker-isolation.md`
## Storage Systems
+8 -2
View File
@@ -12,6 +12,12 @@ Kernel (Main Process)
└── Workflow Worker (review) ── review workflow instances
```
### WorkerRuntime (RFC-006)
Forked worker processes are managed by **`WorkerRuntime`** (`worker-runtime.ts`): one Node child per logical key, cold start, optional respawn after crash, drain/evict, and coordinated shutdown over IPC. **`worker-pool.ts`** (sense groups) and **`workflow-manager.ts`** (workflow types) both configure and delegate to `createWorkerRuntime` instead of owning ad-hoc fork logic.
Worker **entrypoints** (`sense-worker.ts`, `workflow-worker.ts`) import lightweight helpers only — e.g. `worker-signals.ts` for session broadcast signal handling — so they do not pull in the parent-side runtime module.
## Isolation Boundaries
### 1. Sense Workers
@@ -111,10 +117,10 @@ workflows:
### Process Management
#### Signal Handling
Workers ignore session broadcast signals (SIGINT/SIGTERM):
Workers ignore session broadcast signals (SIGINT/SIGTERM) via `ignoreSessionBroadcastSignals()` in `worker-signals.ts`:
```typescript
// Workers ignore terminal signals; kernel coordinates shutdown
process.on("SIGINT", () => {});
process.on("SIGINT", () => {});
process.on("SIGTERM", () => {});
```
+14
View File
@@ -2,6 +2,20 @@
Stateful multi-step execution driven by Roles and a Moderator.
## Workspace Layout (authoring)
User Nerve workspaces use a **flat** build: one root `package.json`, one root bundle script (typically `scripts/build.mjs` wired from `scripts.build`), and **no** per-workflow `package.json` or `tsconfig.json`.
| Location | Purpose |
|----------|---------|
| `workflows/<name>/index.ts` | Default export: `WorkflowDefinition` (moderator + role map). |
| `workflows/<name>/roles/<role>.ts` | One module per role — schemas, prompts, `createRole` factories, or hand-written async role functions. |
| `dist/workflows/<name>/index.js` | Emit of the root build; this is what the daemon loads. |
**Naming:** Workflow ids should be **verb-first** kebab-case phrases (e.g. `deploy-staging`, `scan-dependencies`), not opaque nouns alone.
Senses follow the same flat pattern: `senses/<name>/src/*.ts`, `migrations/`, root build → `dist/senses/<name>/index.js`. See `.knowledge/sense.md`.
## Core Concepts
- **Workflow** — definition with concurrency strategy
+2 -1
View File
@@ -10,13 +10,14 @@
},
"main": "dist/index.js",
"types": "dist/index.d.ts",
"files": ["dist"],
"files": ["dist", "skills"],
"publishConfig": {
"access": "public"
},
"scripts": {
"prepublishOnly": "bash ../../scripts/prepublish-check.sh",
"build": "rslib build",
"pretest": "pnpm --filter @uncaged/nerve-core run build && pnpm --filter @uncaged/nerve-daemon run build",
"test": "vitest run"
},
"dependencies": {
+587
View File
@@ -0,0 +1,587 @@
<!-- nerve-cli-version: __NERVE_CLI_VERSION__ -->
## Cursor Agent 使用提示
在 Cursor 中与 Agent 对话时,可以用以下方式指代代码与配置:
- **`@Files` / `@file`**:引用单个文件,例如 `@nerve.yaml`、`@senses/cpu-usage/src/index.ts`,减少幻觉并让修改对准正确路径。
- **`@Folder` / `@Codebase`**:需要跨目录理解工作区结构时使用;改动前仍应优先打开相关 sense/workflow 源文件确认。
- **`@Terminal`**:把 CLI 输出纳入上下文,便于对照 `nerve daemon logs`、`nerve sense query` 等结果。
- **`@Docs`**:若项目或依赖有文档索引,可用来对齐 API 与约定。
- 工作区根目录下的 **`nerve.yaml`**、`senses/`、`workflows/` 是 nerve 的核心入口;讨论调度与配置时优先 `@` 这些路径。
- 本规则由 `nerve agent inject cursor` 安装;更新 CLI 后在同一目录再次执行可覆盖为新版。
---
# Nerve — AI Agent 观测引擎
Nerve 是一个轻量级观测引擎守护进程。它持续观测外部状态,通过声明式规则响应变化,编排多步骤工作流。
## 核心架构
```
External World → Sense → Signal → Workflow → Log
```
| 概念 | 说明 |
|------|------|
| **Sense** | 观测函数,`compute()` 采样或推导数据。返回非 null 则发出 Signal,可选触发 Workflow。每个 Sense 有独立 SQLite 数据库。 |
| **Signal** | Sense 返回非 null 时发出的通知。纯事实,无意图。通过内存 Signal Bus 分发,不持久化。 |
| **Workflow** | 有状态的多步骤执行。包含 Role(有副作用的执行者)和 Moderator(纯路由器)。每个实例是一个 Thread,有唯一 runId。 |
| **Log** | 不可变审计日志。记录执行、状态转换、错误。不能触发 Sense(防止反馈循环)。 |
| **Engine** | 内核,持有 Signal Bus、Process Manager、Workflow Manager。不直接加载用户代码。 |
| **Daemon** | 引擎运行时,作为后台进程运行。 |
**关键规则:**
- 因果链单向:External → Sense → Signal → Workflow + Log
- 进程隔离:每个 Sense group 一个 worker(长期),每个 Workflow 类型一个 worker(按需)
- 两个扩展点:Sense(观测什么 + 何时)、Workflow(做什么)
## 工作区结构
由 `nerve init` 生成的工作区根目录(默认 `~/.uncaged-nerve/`)包含 **`AGENT.md`**。实现 sense/workflow 前先阅读该文件:它与本文 skill 对齐,约定目录布局、`createRole` 用法以及**始终在仓库根目录**执行的构建命令。
```
~/.uncaged-nerve/
├── AGENT.md # 人类 / Agent 可读的工作区约定(init 生成)
├── nerve.yaml # 核心配置
├── package.json # 单一根包(sense/workflow 下不再有独立 package)
├── scripts/build.mjs # 根目录 esbuild;通过 npm/pnpm 的 build 脚本调用
├── senses/
│ └── <name>/
│ ├── src/index.ts # exports compute() + table
│ ├── src/schema.ts # Drizzle 表定义
│ └── migrations/ # SQL 迁移
├── workflows/
│ └── <name>/
│ ├── index.ts # default export:WorkflowDefinition
│ ├── moderator.ts # 可选:抽出 moderator,由 index 导入
│ ├── build.ts # 可选:共享常量 / 纯函数(避免 index 臃肿;非 esbuild 入口)
│ └── roles/
│ └── <role>.ts # 每角色单文件(推荐平铺,而非 roles/<role>/index.ts)
└── data/ # 运行时数据(SQLite、blobs)
```
### 命名约定
- **Workflow**:动词开头的 kebab-case(例如 `review-pull-request`、`deploy-staging`)。避免单独名词式命名(如 `notifications`)。
- **Sense**:描述性名词 kebab-case(例如 `cpu-usage`)。
---
## CLI 完整参考
全局选项:`--host <host:port>`(连接远程 daemon)、`--api-token <secret>`(Bearer 认证)
### 初始化与脚手架
```bash
nerve init # 初始化工作区
nerve init --from <git-url> # 从 git 仓库克隆工作区
nerve init workspace # 只初始化工作区结构
nerve create sense <name> # 创建 sense 脚手架
nerve create sense <name> --force # 覆盖已有
nerve create workflow <name> # 创建 workflow 脚手架
nerve create workflow <name> --force
nerve validate # 验证 nerve.yaml 配置
```
### Daemon 管理
```bash
nerve daemon start # 启动后台 daemon
nerve daemon start --port 3000 # 指定 HTTP API 端口
nerve daemon stop # 停止 daemon
nerve daemon restart # 重启
nerve daemon status # 查看状态
nerve daemon logs # 查看日志
nerve daemon logs --follow # 实时日志
nerve daemon logs --n 50 # 最近 50 行
nerve dev # 前台开发模式(不 fork daemon)
nerve dev --port 3000 # 指定端口
```
### Sense 操作
```bash
nerve sense list # 列出所有注册的 sense
nerve sense trigger <name> # 手动触发 sense 计算
nerve sense schema <name> # 查看 sense 数据库表结构
nerve sense schema <name> --json # JSON 格式
nerve sense query <name> <sql> # 对 sense 数据库执行只读 SQL
nerve sense query <name> "SELECT * FROM samples ORDER BY ts DESC LIMIT 10" --json
```
### Workflow 操作
```bash
nerve workflow list # 列出 nerve.yaml 中定义的 workflow
nerve workflow status # 查看运行中的 workflow 状态
nerve workflow trigger <name> # 触发 workflow
nerve workflow trigger <name> --prompt "检查生产环境"
nerve workflow trigger <name> --maxRounds 50
nerve workflow trigger <name> --dryRun # 干跑模式
```
### Thread(Workflow 执行记录)
```bash
nerve thread list # 列出最近的 workflow 执行
nerve thread list --all # 包含已完成/失败的
nerve thread list --workflow <name> # 按 workflow 过滤
nerve thread list --limit 50 # 最多 50 条
nerve thread show <runId> # 查看 role 对话轮次
nerve thread show <runId> --budget 16000 # 增大输出预算(默认 8000 字符)
nerve thread inspect <runId> # 查看详情和事件
nerve thread kill <runId> # 终止运行中/排队中的 thread
```
### Store(日志归档)
```bash
nerve store archive # 导出旧日志到 JSONL 归档
nerve store archive --vacuum # 归档后 VACUUM 数据库
```
### Knowledge(知识库)
```bash
nerve knowledge sync # 从 knowledge.yaml 重建索引
nerve knowledge query "搜索内容" # 搜索知识库
nerve knowledge query "内容" --limit 5
nerve knowledge query "内容" -g # 搜索所有注册仓库
```
### Remote(远程 daemon)
```bash
nerve remote add <name> <host:port> --token <secret>
nerve remote list
nerve remote show <name>
nerve remote set-url <name> <host>
nerve remote set-token <name> <token>
nerve remote remove <name>
nerve remote default <name> # 设为默认远程
```
### Agent(向 Hermes 注入本 skill)
```bash
nerve agent status # CLI 版本与各 Hermes 注入目录中的 skill 版本
nerve agent inject hermes # 安装到 ~/.hermes/skills/nerve
nerve agent inject hermes --profile <name> # 写入 ~/.hermes/profiles/<name>/skills/nerve
nerve agent update # 将所有已注入目录更新到当前 CLI 对应版本
nerve agent remove hermes # 移除默认 profile 的注入
nerve agent remove hermes --profile <name>
nerve agent inject cursor # 在 cwd 生成 .cursorrules
nerve agent inject cursor --path /foo # 在指定目录生成
nerve agent remove cursor [--path /foo]
```
---
## nerve.yaml 配置参考
```yaml
# 引擎全局配置
max_rounds: 100 # moderator 最大轮次(默认 100)
# Sense 配置
senses:
cpu-usage:
group: system # 必填,同 group 的 sense 共享 worker
interval: 10s # 轮询间隔(duration: 5s, 10m, 1h)
throttle: 5s # 最小计算间隔
timeout: 10s # compute 超时
grace_period: null # 优雅关闭等待
retention: 10000 # _signals 表最大行数(默认 10000)
system-health:
group: derived
on: [cpu-usage, disk-usage] # 响应式:被列出的 sense 发出 signal 时触发
throttle: null
timeout: null
# Workflow 配置
workflows:
my-workflow:
concurrency: 1 # 必填,并发数
overflow: drop # 必填,超并发时处理:drop | queue
max_queue: 100 # overflow=queue 时的队列上限(默认 100)
# HTTP API
api:
port: 3000 # null = 不启用 HTTP
host: "127.0.0.1" # 监听地址
token: null # 非 loopback 时必填
# LLM Extract(可选)
extract:
provider: anthropic
model: claude-sonnet-4-20250514
```
---
## Sense 开发指南
### compute 函数签名
Sense 的 `compute` **无参数**。它不接收数据库句柄:daemon 在 worker 内调用 `SenseComputeFn`,由运行时负责把非 null 结果的 `signal` 写入该 sense 的 Drizzle 表并记入 `_signals`。超时由运行时控制(对应 `nerve.yaml` 里的 `timeout`),无需在业务代码里读取 `AbortSignal`。
```typescript
import type { ComputeResult, SenseComputeFn } from "@uncaged/nerve-core";
export const compute: SenseComputeFn<MySignalShape> = async () => {
// ...
};
// 或等价地:
export async function compute(): Promise<ComputeResult<MySignalShape>> {
// ...
}
```
(运行时定义见 `@uncaged/nerve-core` 的 `SenseComputeFn` / `SenseModule`,daemon 侧在 `sense-runtime.ts` 的 `executeCompute` 中插入 `result.signal`。)
### 返回值
```typescript
// 返回 null = 静默,不发 signal
// 返回非 null = 发出 signal(并写入业务表),可选触发 workflow
type ComputeResult<T> =
| null
| { signal: T; workflow: WorkflowTrigger | null };
type WorkflowTrigger = {
name: string; // workflow 名称(对应 nerve.yaml 中的 key)
maxRounds: number; // moderator 最大轮次
prompt: string; // 初始 prompt
dryRun: boolean; // 干跑模式
};
```
若返回值是普通对象且不含 `signal` 字段,内核会按 shorthand 视为 `{ signal: payload, workflow: null }`(见 core 的 `routeSenseComputeOutput`)。
### Sense 模块导出
```typescript
// senses/<name>/src/index.ts
import type { ComputeResult } from "@uncaged/nerve-core";
import { table } from "./schema.js";
type Row = { ts: number; value: number };
export async function compute(): Promise<ComputeResult<Row>> {
const row: Row = { ts: Date.now(), value: Math.random() }; // 替换为真实观测逻辑
return { signal: row, workflow: null };
}
export { table };
```
### Schema 定义
```typescript
// senses/<name>/src/schema.ts
import { sqliteTable, integer, real } from "drizzle-orm/sqlite-core";
export const table = sqliteTable("samples", {
ts: integer("ts").notNull(),
value: real("value").notNull(),
});
```
### 调度方式
1. **interval 轮询**:`interval: 10s` — 每 10 秒执行一次
2. **响应式触发**:`on: [cpu-usage]` — 当 cpu-usage 发出 signal 时触发
3. 两者可以组合
### 调试
```bash
nerve dev # 前台运行,看实时输出
nerve sense trigger <name> # 手动触发一次
nerve sense query <name> "SELECT * FROM samples ORDER BY ts DESC LIMIT 5"
```
### 完整示例:CPU 监控
```typescript
// senses/cpu-usage/src/schema.ts
import { sqliteTable, integer, real } from "drizzle-orm/sqlite-core";
export const table = sqliteTable("samples", {
ts: integer("ts").notNull(),
value: real("value").notNull(),
});
// senses/cpu-usage/src/index.ts
import os from "node:os";
import type { ComputeResult } from "@uncaged/nerve-core";
import { table } from "./schema.js";
type Row = { ts: number; value: number };
export async function compute(): Promise<ComputeResult<Row>> {
const oneMin = os.loadavg()[0];
return { signal: { ts: Date.now(), value: oneMin }, workflow: null };
}
export { table };
```
nerve.yaml:
```yaml
senses:
cpu-usage:
group: system
interval: 10s
throttle: 5s
timeout: 10s
retention: 10000
```
---
## Workflow 开发指南
### 核心类型
```typescript
import type {
WorkflowDefinition,
RoleResult,
ThreadContext,
RoleMeta,
Moderator,
} from "@uncaged/nerve-core";
import { END } from "@uncaged/nerve-core";
// Role<Meta> — (ctx: ThreadContext) => Promise<RoleResult<Meta>>
// RoleResult<Meta> — { content: string; meta: Meta }
// ThreadContext<M extends RoleMeta> — threadId, start(__start__ 帧), steps(各 role 轮次)
// Moderator<M> — (ctx) => 下一个 role 名 | END
// WorkflowDefinition<M extends RoleMeta> — name, roles, moderator
```
### createRole 四元组(接入 LLM 时推荐)
工作区根目录需安装 **`@uncaged/nerve-workflow-utils`**(及所选 agent 适配器包)。默认 `nerve init` 的 `package.json` 不含该依赖时,在 `~/.uncaged-nerve` 下执行 `pnpm add @uncaged/nerve-workflow-utils`(或 npm 等价命令)。
使用 **`createRole`**,按固定顺序传入四件事:
1. **adapter** — `AgentFn`,`(ctx, systemPrompt) => Promise<string>`(原始模型输出文本)。
2. **prompt** — `string`,或 `async (ctx: ThreadContext) => string`。
3. **meta** — `z.ZodType<M>`,供 moderator 路由的结构化 meta。
4. **extract** — `{ provider: LlmProvider; dryRun: boolean | null }`,声明从回复中抽取 meta 时用的 LLM(OpenAI 兼容)及是否 dry-run。
```typescript
import { createLlmAdapter, createRole } from "@uncaged/nerve-workflow-utils";
import type { ThreadContext } from "@uncaged/nerve-core";
import { z } from "zod";
const provider = {
baseUrl: "https://api.example.com/v1",
apiKey: process.env.EXAMPLE_API_KEY!,
model: "gpt-4o-mini",
};
const planMeta = z.object({ next: z.enum(["execute", "stop"]) });
export const planner = createRole(
createLlmAdapter(provider),
async (ctx: ThreadContext) => `规划任务:${ctx.start.content}`,
planMeta,
{ provider, dryRun: null },
);
```
`createLlmAdapter` 仅位于 **`@uncaged/nerve-workflow-utils`**:用 `LlmProvider` 生成 `AgentFn`,单轮对话里 **system** 来自 `createRole` 解析后的 prompt 字符串,**user** 为线程起点 `ctx.start.content`。
### 基本 Workflow 示例(平铺 `roles/<role>.ts`)
```typescript
// workflows/example/roles/main.ts
import type { RoleResult, ThreadContext } from "@uncaged/nerve-core";
export async function main(ctx: ThreadContext): Promise<RoleResult<{ round: number }>> {
const prompt = ctx.start.content;
return {
content: `处理完成: ${prompt}`,
meta: { round: ctx.steps.length },
};
}
```
```typescript
// workflows/example/index.ts
import type { ThreadContext, WorkflowDefinition } from "@uncaged/nerve-core";
import { END } from "@uncaged/nerve-core";
import { main } from "./roles/main.js";
type Meta = Record<"main", { round: number }>;
const workflow: WorkflowDefinition<Meta> = {
name: "example",
roles: { main },
moderator(ctx: ThreadContext<Meta>) {
return ctx.steps.length === 0 ? "main" : END;
},
};
export default workflow;
```
可选:将 `moderator` 挪到 `moderator.ts` 再 `import { route } from "./moderator.js"`,保持 `index.ts` 只负责组装 `WorkflowDefinition`。
### 多 Role Workflow 示例
```typescript
// workflows/plan-execute-review/roles/planner.ts
import type { RoleResult, ThreadContext } from "@uncaged/nerve-core";
export async function planner(ctx: ThreadContext): Promise<RoleResult<{ status: string }>> {
void ctx;
return { content: "计划: ...", meta: { status: "planned" } };
}
```
```typescript
// workflows/plan-execute-review/roles/executor.ts
import type { RoleResult, ThreadContext } from "@uncaged/nerve-core";
export async function executor(ctx: ThreadContext): Promise<RoleResult<{ status: string }>> {
void ctx;
return { content: "执行: ...", meta: { status: "executed" } };
}
```
```typescript
// workflows/plan-execute-review/roles/reviewer.ts
import type { RoleResult, ThreadContext } from "@uncaged/nerve-core";
export async function reviewer(ctx: ThreadContext): Promise<RoleResult<{ status: string }>> {
void ctx;
return { content: "审核通过", meta: { status: "approved" } };
}
```
```typescript
// workflows/plan-execute-review/index.ts
import type { WorkflowDefinition, ThreadContext } from "@uncaged/nerve-core";
import { END } from "@uncaged/nerve-core";
import { executor } from "./roles/executor.js";
import { planner } from "./roles/planner.js";
import { reviewer } from "./roles/reviewer.js";
type Roles = Record<"planner" | "executor" | "reviewer", { status: string }>;
const workflow: WorkflowDefinition<Roles> = {
name: "plan-execute-review",
roles: { planner, executor, reviewer },
moderator(ctx: ThreadContext<Roles>) {
if (ctx.steps.length === 0) return "planner";
const last = ctx.steps[ctx.steps.length - 1];
if (last.role === "planner") return "executor";
if (last.role === "executor") return "reviewer";
return END;
},
};
export default workflow;
```
### Agent 适配器
Workflow role 可以集成 AI agent。已知适配器 **ID**:`echo`、`cursor`、`hermes`、`codex`。
```typescript
type AgentFn = (ctx: ThreadContext, systemPrompt: string) => Promise<string>;
```
没有现成 agent 包时,用 **`createLlmAdapter`(`@uncaged/nerve-workflow-utils`)** 从 OpenAI 兼容的 `LlmProvider` 构造 `AgentFn`,再交给 **`createRole`** 的四元组。
### Workflow 运行状态
`queued` → `started` → `completed` | `failed` | `crashed` | `killed` | `interrupted` | `dropped`
---
## 日常操作 Pattern
### 查看系统整体状态
```bash
nerve daemon status # daemon 是否在运行
nerve sense list # 所有 sense 及其调度配置
nerve workflow status # 运行中的 workflow
nerve thread list # 最近的 workflow 执行记录
```
### 检查某个 sense 的历史数据
```bash
nerve sense query cpu-usage "SELECT * FROM samples ORDER BY ts DESC LIMIT 10" --json
nerve sense schema cpu-usage # 查看表结构
```
### 手动触发 workflow
```bash
nerve workflow trigger my-workflow --prompt "手动检查"
nerve thread list --workflow my-workflow # 查看执行状态
nerve thread show <runId> # 查看对话详情
```
### 排查 sense 报错
```bash
nerve daemon logs --follow # 查看实时日志
nerve sense trigger <name> # 手动触发看报错
nerve dev # 前台模式,更详细的输出
```
### 开发新 sense
```bash
nerve create sense my-sensor # 脚手架
# 编辑 senses/my-sensor/src/index.ts 和 schema.ts
nerve validate # 验证配置
nerve dev # 前台测试
nerve sense trigger my-sensor # 单次触发验证
nerve sense query my-sensor "SELECT * FROM ..." # 检查数据
```
### 开发新 workflow
```bash
nerve create workflow my-flow # 脚手架(当前 CLI 可能仍生成 roles/<name>/ 子目录)
# 推荐对齐 AGENT.md:workflows/my-flow/index.ts + roles/<role>.ts(平铺),moderator 可拆到 moderator.ts
nerve validate # 验证配置
cd ~/.uncaged-nerve && npm run build # 工作区根目录构建(等价:pnpm run build);勿在单个 workflow 子目录单独跑 build
nerve workflow trigger my-flow --prompt "测试" --dryRun # 干跑
nerve thread show <runId> # 查看执行轨迹
```
---
## Pitfalls
- **Sense 返回值**:返回 `null` 表示静默(不发 signal);返回 `{ signal, workflow }` 才发 signal。不要返回 undefined。
- **Sense 持久化**:daemon 在 `compute()` 返回非 null 时自动执行 `db.insert(table).values(signal)` 并写入 `_signals`;业务代码不要自行 insert。
- **no optional properties**:nerve 代码规范禁止 `?:`,用 `T | null` 代替。
- **函数式风格**:用 `function` + `type`,不用 `class` + `interface`。
- **workflow 用 default export**:工作区里通常只有 `workflows/<name>/index.ts` 使用 default export(daemon 加载约定)。
- **_signals 表**:每个 sense 自动有 `_signals` 表记录 signal 历史,受 `retention` 配置限制。
- **concurrency + overflow**:workflow 必须配置并发策略,否则验证失败。
- **moderator 是同步函数**:不要加 async,moderator 是纯路由逻辑,不能有副作用。
+580
View File
@@ -0,0 +1,580 @@
---
name: nerve
version: 0.5.0
description: >
Nerve — AI agent 观测引擎。掌握 nerve 的核心概念、CLI 操作、sense/workflow 开发。
加载此 skill 后你可以:查看系统状态、监控 sense、触发 workflow、开发新 sense 和 workflow。
metadata:
hermes:
tags: [nerve, sense, workflow, monitoring, agent-kernel]
homepage: https://git.shazhou.work/uncaged/nerve
---
# Nerve — AI Agent 观测引擎
Nerve 是一个轻量级观测引擎守护进程。它持续观测外部状态,通过声明式规则响应变化,编排多步骤工作流。
## 核心架构
```
External World → Sense → Signal → Workflow → Log
```
| 概念 | 说明 |
|------|------|
| **Sense** | 观测函数,`compute()` 采样或推导数据。返回非 null 则发出 Signal,可选触发 Workflow。每个 Sense 有独立 SQLite 数据库。 |
| **Signal** | Sense 返回非 null 时发出的通知。纯事实,无意图。通过内存 Signal Bus 分发,不持久化。 |
| **Workflow** | 有状态的多步骤执行。包含 Role(有副作用的执行者)和 Moderator(纯路由器)。每个实例是一个 Thread,有唯一 runId。 |
| **Log** | 不可变审计日志。记录执行、状态转换、错误。不能触发 Sense(防止反馈循环)。 |
| **Engine** | 内核,持有 Signal Bus、Process Manager、Workflow Manager。不直接加载用户代码。 |
| **Daemon** | 引擎运行时,作为后台进程运行。 |
**关键规则:**
- 因果链单向:External → Sense → Signal → Workflow + Log
- 进程隔离:每个 Sense group 一个 worker(长期),每个 Workflow 类型一个 worker(按需)
- 两个扩展点:Sense(观测什么 + 何时)、Workflow(做什么)
## 工作区结构
`nerve init` 生成的工作区根目录(默认 `~/.uncaged-nerve/`)包含 **`AGENT.md`**。实现 sense/workflow 前先阅读该文件:它与本文 skill 对齐,约定目录布局、`createRole` 用法以及**始终在仓库根目录**执行的构建命令。
```
~/.uncaged-nerve/
├── AGENT.md # 人类 / Agent 可读的工作区约定(init 生成)
├── nerve.yaml # 核心配置
├── package.json # 单一根包(sense/workflow 下不再有独立 package)
├── scripts/build.mjs # 根目录 esbuild;通过 npm/pnpm 的 build 脚本调用
├── senses/
│ └── <name>/
│ ├── src/index.ts # exports compute() + table
│ ├── src/schema.ts # Drizzle 表定义
│ └── migrations/ # SQL 迁移
├── workflows/
│ └── <name>/
│ ├── index.ts # default export:WorkflowDefinition
│ ├── moderator.ts # 可选:抽出 moderator,由 index 导入
│ ├── build.ts # 可选:共享常量 / 纯函数(避免 index 臃肿;非 esbuild 入口)
│ └── roles/
│ └── <role>.ts # 每角色单文件(推荐平铺,而非 roles/<role>/index.ts)
└── data/ # 运行时数据(SQLite、blobs)
```
### 命名约定
- **Workflow**:动词开头的 kebab-case(例如 `review-pull-request``deploy-staging`)。避免单独名词式命名(如 `notifications`)。
- **Sense**:描述性名词 kebab-case(例如 `cpu-usage`)。
---
## CLI 完整参考
全局选项:`--host <host:port>`(连接远程 daemon)、`--api-token <secret>`(Bearer 认证)
### 初始化与脚手架
```bash
nerve init # 初始化工作区
nerve init --from <git-url> # 从 git 仓库克隆工作区
nerve init workspace # 只初始化工作区结构
nerve create sense <name> # 创建 sense 脚手架
nerve create sense <name> --force # 覆盖已有
nerve create workflow <name> # 创建 workflow 脚手架
nerve create workflow <name> --force
nerve validate # 验证 nerve.yaml 配置
```
### Daemon 管理
```bash
nerve daemon start # 启动后台 daemon
nerve daemon start --port 3000 # 指定 HTTP API 端口
nerve daemon stop # 停止 daemon
nerve daemon restart # 重启
nerve daemon status # 查看状态
nerve daemon logs # 查看日志
nerve daemon logs --follow # 实时日志
nerve daemon logs --n 50 # 最近 50 行
nerve dev # 前台开发模式(不 fork daemon)
nerve dev --port 3000 # 指定端口
```
### Sense 操作
```bash
nerve sense list # 列出所有注册的 sense
nerve sense trigger <name> # 手动触发 sense 计算
nerve sense schema <name> # 查看 sense 数据库表结构
nerve sense schema <name> --json # JSON 格式
nerve sense query <name> <sql> # 对 sense 数据库执行只读 SQL
nerve sense query <name> "SELECT * FROM samples ORDER BY ts DESC LIMIT 10" --json
```
### Workflow 操作
```bash
nerve workflow list # 列出 nerve.yaml 中定义的 workflow
nerve workflow status # 查看运行中的 workflow 状态
nerve workflow trigger <name> # 触发 workflow
nerve workflow trigger <name> --prompt "检查生产环境"
nerve workflow trigger <name> --maxRounds 50
nerve workflow trigger <name> --dryRun # 干跑模式
```
### Thread(Workflow 执行记录)
```bash
nerve thread list # 列出最近的 workflow 执行
nerve thread list --all # 包含已完成/失败的
nerve thread list --workflow <name> # 按 workflow 过滤
nerve thread list --limit 50 # 最多 50 条
nerve thread show <runId> # 查看 role 对话轮次
nerve thread show <runId> --budget 16000 # 增大输出预算(默认 8000 字符)
nerve thread inspect <runId> # 查看详情和事件
nerve thread kill <runId> # 终止运行中/排队中的 thread
```
### Store(日志归档)
```bash
nerve store archive # 导出旧日志到 JSONL 归档
nerve store archive --vacuum # 归档后 VACUUM 数据库
```
### Knowledge(知识库)
```bash
nerve knowledge sync # 从 knowledge.yaml 重建索引
nerve knowledge query "搜索内容" # 搜索知识库
nerve knowledge query "内容" --limit 5
nerve knowledge query "内容" -g # 搜索所有注册仓库
```
### Remote(远程 daemon)
```bash
nerve remote add <name> <host:port> --token <secret>
nerve remote list
nerve remote show <name>
nerve remote set-url <name> <host>
nerve remote set-token <name> <token>
nerve remote remove <name>
nerve remote default <name> # 设为默认远程
```
### Agent(向 Hermes 注入本 skill)
```bash
nerve agent status # CLI 版本与各 Hermes 注入目录中的 skill 版本
nerve agent inject hermes # 安装到 ~/.hermes/skills/nerve
nerve agent inject hermes --profile <name> # 写入 ~/.hermes/profiles/<name>/skills/nerve
nerve agent update # 将所有已注入目录更新到当前 CLI 对应版本
nerve agent remove hermes # 移除默认 profile 的注入
nerve agent remove hermes --profile <name>
```
---
## nerve.yaml 配置参考
```yaml
# 引擎全局配置
max_rounds: 100 # moderator 最大轮次(默认 100)
# Sense 配置
senses:
cpu-usage:
group: system # 必填,同 group 的 sense 共享 worker
interval: 10s # 轮询间隔(duration: 5s, 10m, 1h)
throttle: 5s # 最小计算间隔
timeout: 10s # compute 超时
grace_period: null # 优雅关闭等待
retention: 10000 # _signals 表最大行数(默认 10000)
system-health:
group: derived
on: [cpu-usage, disk-usage] # 响应式:被列出的 sense 发出 signal 时触发
throttle: null
timeout: null
# Workflow 配置
workflows:
my-workflow:
concurrency: 1 # 必填,并发数
overflow: drop # 必填,超并发时处理:drop | queue
max_queue: 100 # overflow=queue 时的队列上限(默认 100)
# HTTP API
api:
port: 3000 # null = 不启用 HTTP
host: "127.0.0.1" # 监听地址
token: null # 非 loopback 时必填
# LLM Extract(可选)
extract:
provider: anthropic
model: claude-sonnet-4-20250514
```
---
## Sense 开发指南
### compute 函数签名
Sense 的 `compute` **无参数**。它不接收数据库句柄:daemon 在 worker 内调用 `SenseComputeFn`,由运行时负责把非 null 结果的 `signal` 写入该 sense 的 Drizzle 表并记入 `_signals`。超时由运行时控制(对应 `nerve.yaml` 里的 `timeout`),无需在业务代码里读取 `AbortSignal`
```typescript
import type { ComputeResult, SenseComputeFn } from "@uncaged/nerve-core";
export const compute: SenseComputeFn<MySignalShape> = async () => {
// ...
};
// 或等价地:
export async function compute(): Promise<ComputeResult<MySignalShape>> {
// ...
}
```
(运行时定义见 `@uncaged/nerve-core``SenseComputeFn` / `SenseModule`,daemon 侧在 `sense-runtime.ts``executeCompute` 中插入 `result.signal`。)
### 返回值
```typescript
// 返回 null = 静默,不发 signal
// 返回非 null = 发出 signal(并写入业务表),可选触发 workflow
type ComputeResult<T> =
| null
| { signal: T; workflow: WorkflowTrigger | null };
type WorkflowTrigger = {
name: string; // workflow 名称(对应 nerve.yaml 中的 key)
maxRounds: number; // moderator 最大轮次
prompt: string; // 初始 prompt
dryRun: boolean; // 干跑模式
};
```
若返回值是普通对象且不含 `signal` 字段,内核会按 shorthand 视为 `{ signal: payload, workflow: null }`(见 core 的 `routeSenseComputeOutput`)。
### Sense 模块导出
```typescript
// senses/<name>/src/index.ts
import type { ComputeResult } from "@uncaged/nerve-core";
import { table } from "./schema.js";
type Row = { ts: number; value: number };
export async function compute(): Promise<ComputeResult<Row>> {
const row: Row = { ts: Date.now(), value: Math.random() }; // 替换为真实观测逻辑
return { signal: row, workflow: null };
}
export { table };
```
### Schema 定义
```typescript
// senses/<name>/src/schema.ts
import { sqliteTable, integer, real } from "drizzle-orm/sqlite-core";
export const table = sqliteTable("samples", {
ts: integer("ts").notNull(),
value: real("value").notNull(),
});
```
### 调度方式
1. **interval 轮询**`interval: 10s` — 每 10 秒执行一次
2. **响应式触发**`on: [cpu-usage]` — 当 cpu-usage 发出 signal 时触发
3. 两者可以组合
### 调试
```bash
nerve dev # 前台运行,看实时输出
nerve sense trigger <name> # 手动触发一次
nerve sense query <name> "SELECT * FROM samples ORDER BY ts DESC LIMIT 5"
```
### 完整示例:CPU 监控
```typescript
// senses/cpu-usage/src/schema.ts
import { sqliteTable, integer, real } from "drizzle-orm/sqlite-core";
export const table = sqliteTable("samples", {
ts: integer("ts").notNull(),
value: real("value").notNull(),
});
// senses/cpu-usage/src/index.ts
import os from "node:os";
import type { ComputeResult } from "@uncaged/nerve-core";
import { table } from "./schema.js";
type Row = { ts: number; value: number };
export async function compute(): Promise<ComputeResult<Row>> {
const oneMin = os.loadavg()[0];
return { signal: { ts: Date.now(), value: oneMin }, workflow: null };
}
export { table };
```
nerve.yaml:
```yaml
senses:
cpu-usage:
group: system
interval: 10s
throttle: 5s
timeout: 10s
retention: 10000
```
---
## Workflow 开发指南
### 核心类型
```typescript
import type {
WorkflowDefinition,
RoleResult,
ThreadContext,
RoleMeta,
Moderator,
} from "@uncaged/nerve-core";
import { END } from "@uncaged/nerve-core";
// Role<Meta> — (ctx: ThreadContext) => Promise<RoleResult<Meta>>
// RoleResult<Meta> — { content: string; meta: Meta }
// ThreadContext<M extends RoleMeta> — threadId, start(__start__ 帧), steps(各 role 轮次)
// Moderator<M> — (ctx) => 下一个 role 名 | END
// WorkflowDefinition<M extends RoleMeta> — name, roles, moderator
```
### createRole 四元组(接入 LLM 时推荐)
工作区根目录需安装 **`@uncaged/nerve-workflow-utils`**(及所选 agent 适配器包)。默认 `nerve init``package.json` 不含该依赖时,在 `~/.uncaged-nerve` 下执行 `pnpm add @uncaged/nerve-workflow-utils`(或 npm 等价命令)。
使用 **`createRole`**,按固定顺序传入四件事:
1. **adapter**`AgentFn``(ctx, systemPrompt) => Promise<string>`(原始模型输出文本)。
2. **prompt**`string`,或 `async (ctx: ThreadContext) => string`
3. **meta**`z.ZodType<M>`,供 moderator 路由的结构化 meta。
4. **extract**`{ provider: LlmProvider; dryRun: boolean | null }`,声明从回复中抽取 meta 时用的 LLM(OpenAI 兼容)及是否 dry-run。
```typescript
import { createLlmAdapter, createRole } from "@uncaged/nerve-workflow-utils";
import type { ThreadContext } from "@uncaged/nerve-core";
import { z } from "zod";
const provider = {
baseUrl: "https://api.example.com/v1",
apiKey: process.env.EXAMPLE_API_KEY!,
model: "gpt-4o-mini",
};
const planMeta = z.object({ next: z.enum(["execute", "stop"]) });
export const planner = createRole(
createLlmAdapter(provider),
async (ctx: ThreadContext) => `规划任务:${ctx.start.content}`,
planMeta,
{ provider, dryRun: null },
);
```
`createLlmAdapter` 仅位于 **`@uncaged/nerve-workflow-utils`**:用 `LlmProvider` 生成 `AgentFn`,单轮对话里 **system** 来自 `createRole` 解析后的 prompt 字符串,**user** 为线程起点 `ctx.start.content`
### 基本 Workflow 示例(平铺 `roles/<role>.ts`)
```typescript
// workflows/example/roles/main.ts
import type { RoleResult, ThreadContext } from "@uncaged/nerve-core";
export async function main(ctx: ThreadContext): Promise<RoleResult<{ round: number }>> {
const prompt = ctx.start.content;
return {
content: `处理完成: ${prompt}`,
meta: { round: ctx.steps.length },
};
}
```
```typescript
// workflows/example/index.ts
import type { ThreadContext, WorkflowDefinition } from "@uncaged/nerve-core";
import { END } from "@uncaged/nerve-core";
import { main } from "./roles/main.js";
type Meta = Record<"main", { round: number }>;
const workflow: WorkflowDefinition<Meta> = {
name: "example",
roles: { main },
moderator(ctx: ThreadContext<Meta>) {
return ctx.steps.length === 0 ? "main" : END;
},
};
export default workflow;
```
可选:将 `moderator` 挪到 `moderator.ts``import { route } from "./moderator.js"`,保持 `index.ts` 只负责组装 `WorkflowDefinition`
### 多 Role Workflow 示例
```typescript
// workflows/plan-execute-review/roles/planner.ts
import type { RoleResult, ThreadContext } from "@uncaged/nerve-core";
export async function planner(ctx: ThreadContext): Promise<RoleResult<{ status: string }>> {
void ctx;
return { content: "计划: ...", meta: { status: "planned" } };
}
```
```typescript
// workflows/plan-execute-review/roles/executor.ts
import type { RoleResult, ThreadContext } from "@uncaged/nerve-core";
export async function executor(ctx: ThreadContext): Promise<RoleResult<{ status: string }>> {
void ctx;
return { content: "执行: ...", meta: { status: "executed" } };
}
```
```typescript
// workflows/plan-execute-review/roles/reviewer.ts
import type { RoleResult, ThreadContext } from "@uncaged/nerve-core";
export async function reviewer(ctx: ThreadContext): Promise<RoleResult<{ status: string }>> {
void ctx;
return { content: "审核通过", meta: { status: "approved" } };
}
```
```typescript
// workflows/plan-execute-review/index.ts
import type { WorkflowDefinition, ThreadContext } from "@uncaged/nerve-core";
import { END } from "@uncaged/nerve-core";
import { executor } from "./roles/executor.js";
import { planner } from "./roles/planner.js";
import { reviewer } from "./roles/reviewer.js";
type Roles = Record<"planner" | "executor" | "reviewer", { status: string }>;
const workflow: WorkflowDefinition<Roles> = {
name: "plan-execute-review",
roles: { planner, executor, reviewer },
moderator(ctx: ThreadContext<Roles>) {
if (ctx.steps.length === 0) return "planner";
const last = ctx.steps[ctx.steps.length - 1];
if (last.role === "planner") return "executor";
if (last.role === "executor") return "reviewer";
return END;
},
};
export default workflow;
```
### Agent 适配器
Workflow role 可以集成 AI agent。已知适配器 **ID**`echo``cursor``hermes``codex`
```typescript
type AgentFn = (ctx: ThreadContext, systemPrompt: string) => Promise<string>;
```
没有现成 agent 包时,用 **`createLlmAdapter``@uncaged/nerve-workflow-utils`)** 从 OpenAI 兼容的 `LlmProvider` 构造 `AgentFn`,再交给 **`createRole`** 的四元组。
### Workflow 运行状态
`queued``started``completed` | `failed` | `crashed` | `killed` | `interrupted` | `dropped`
---
## 日常操作 Pattern
### 查看系统整体状态
```bash
nerve daemon status # daemon 是否在运行
nerve sense list # 所有 sense 及其调度配置
nerve workflow status # 运行中的 workflow
nerve thread list # 最近的 workflow 执行记录
```
### 检查某个 sense 的历史数据
```bash
nerve sense query cpu-usage "SELECT * FROM samples ORDER BY ts DESC LIMIT 10" --json
nerve sense schema cpu-usage # 查看表结构
```
### 手动触发 workflow
```bash
nerve workflow trigger my-workflow --prompt "手动检查"
nerve thread list --workflow my-workflow # 查看执行状态
nerve thread show <runId> # 查看对话详情
```
### 排查 sense 报错
```bash
nerve daemon logs --follow # 查看实时日志
nerve sense trigger <name> # 手动触发看报错
nerve dev # 前台模式,更详细的输出
```
### 开发新 sense
```bash
nerve create sense my-sensor # 脚手架
# 编辑 senses/my-sensor/src/index.ts 和 schema.ts
nerve validate # 验证配置
nerve dev # 前台测试
nerve sense trigger my-sensor # 单次触发验证
nerve sense query my-sensor "SELECT * FROM ..." # 检查数据
```
### 开发新 workflow
```bash
nerve create workflow my-flow # 脚手架(当前 CLI 可能仍生成 roles/<name>/ 子目录)
# 推荐对齐 AGENT.md:workflows/my-flow/index.ts + roles/<role>.ts(平铺),moderator 可拆到 moderator.ts
nerve validate # 验证配置
cd ~/.uncaged-nerve && npm run build # 工作区根目录构建(等价:pnpm run build);勿在单个 workflow 子目录单独跑 build
nerve workflow trigger my-flow --prompt "测试" --dryRun # 干跑
nerve thread show <runId> # 查看执行轨迹
```
---
## Pitfalls
- **Sense 返回值**:返回 `null` 表示静默(不发 signal);返回 `{ signal, workflow }` 才发 signal。不要返回 undefined。
- **Sense 持久化**:daemon 在 `compute()` 返回非 null 时自动执行 `db.insert(table).values(signal)` 并写入 `_signals`;业务代码不要自行 insert。
- **no optional properties**:nerve 代码规范禁止 `?:`,用 `T | null` 代替。
- **函数式风格**:用 `function` + `type`,不用 `class` + `interface`
- **workflow 用 default export**:工作区里通常只有 `workflows/<name>/index.ts` 使用 default export(daemon 加载约定)。
- **_signals 表**:每个 sense 自动有 `_signals` 表记录 signal 历史,受 `retention` 配置限制。
- **concurrency + overflow**:workflow 必须配置并发策略,否则验证失败。
- **moderator 是同步函数**:不要加 async,moderator 是纯路由逻辑,不能有副作用。
@@ -7,7 +7,6 @@ import { describe, expect, it } from "vitest";
import {
buildSenseIndexTs,
buildSenseMigrationSql,
buildSensePackageJson,
buildSenseSchemaTs,
validateResourceName,
} from "../commands/create.js";
@@ -46,20 +45,11 @@ describe("buildSenseMigrationSql", () => {
});
});
describe("buildSensePackageJson", () => {
it("includes esbuild script and sense name", () => {
const pkg = JSON.parse(buildSensePackageJson("my-sense"));
expect(pkg.name).toBe("nerve-sense-my-sense");
expect(pkg.scripts.build).toContain("esbuild");
expect(pkg.scripts.build).toContain("src/index.ts");
expect(pkg.devDependencies.esbuild).toBeTruthy();
});
});
describe("buildSenseIndexTs", () => {
it("embeds sense id in stub with TypeScript types", () => {
const ts = buildSenseIndexTs("my-sense");
expect(ts).toContain("my-sense");
expect(ts).toContain("export { mySense as table }");
expect(ts).toContain("export async function compute");
expect(ts).toContain("LibSQLDatabase");
expect(ts).toContain("Promise<SenseResult>");
@@ -9,7 +9,7 @@ import { mkdirSync, mkdtempSync, readFileSync, rmSync, writeFileSync } from "nod
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { buildWorkflowPackageJson, buildWorkflowScaffold } from "../commands/create.js";
import { buildWorkflowScaffold } from "../commands/create.js";
let tmpDir: string;
@@ -81,21 +81,6 @@ describe("buildWorkflowScaffold", () => {
const { roleMainPromptMd } = buildWorkflowScaffold("my-flow");
expect(roleMainPromptMd).toContain("# my-flow — main role");
});
it("package.json defines esbuild bundling to dist/", () => {
const pkg = JSON.parse(buildWorkflowPackageJson("my-flow")) as {
scripts: { build: string };
devDependencies: { esbuild: string };
};
expect(pkg.scripts.build).toContain("esbuild");
expect(pkg.scripts.build).toContain("--outdir=dist");
expect(pkg.devDependencies.esbuild).toBeTruthy();
});
it("buildWorkflowScaffold includes package.json body", () => {
const { packageJson } = buildWorkflowScaffold("wf");
expect(JSON.parse(packageJson).scripts.build).toContain("esbuild");
});
});
describe("workflow scaffold file writing (simulated)", () => {
+13 -18
View File
@@ -122,54 +122,49 @@ describe("e2e create", () => {
});
it(
"create workflow scaffolds sources and package.json with esbuild build",
{ timeout: 10_000 },
"create workflow scaffolds sources and root build emits dist/workflows/<name>/index.js",
{ timeout: 120_000 },
async () => {
fakeHome = mkdtempSync(join(tmpdir(), "nerve-create-e2e-"));
const nerveRoot = join(fakeHome, ".uncaged-nerve");
await runTestCli(fakeHome, ["init", "--force", "--skip-install"]);
await runTestCli(fakeHome, ["init", "--force"]);
const wf = await runTestCli(fakeHome, ["create", "workflow", "e2e-flow"]);
expect(wf.exitCode).toBe(0);
expect(wf.stdout).toContain("✅");
const pkgPath = join(nerveRoot, "workflows", "e2e-flow", "package.json");
const indexPath = join(nerveRoot, "workflows", "e2e-flow", "index.ts");
const mainRolePath = join(nerveRoot, "workflows", "e2e-flow", "roles", "main", "index.ts");
expect(existsSync(pkgPath)).toBe(true);
expect(JSON.parse(readFileSync(pkgPath, "utf8")).scripts.build).toContain("esbuild");
const wfDir = join(nerveRoot, "workflows", "e2e-flow");
const indexPath = join(wfDir, "index.ts");
const mainRolePath = join(wfDir, "roles", "main", "index.ts");
expect(existsSync(join(wfDir, "package.json"))).toBe(false);
expect(existsSync(indexPath)).toBe(true);
expect(existsSync(mainRolePath)).toBe(true);
expect(readFileSync(indexPath, "utf8")).toContain('name: "e2e-flow"');
expect(readFileSync(mainRolePath, "utf8")).toContain("e2e-flow started");
expect(existsSync(join(nerveRoot, "dist", "workflows", "e2e-flow", "index.js"))).toBe(true);
},
);
it(
"create sense scaffolds src/index.ts, src/schema.ts, package.json and migration",
{ timeout: 60_000 },
"create sense scaffolds src/, migration, and root build emits dist/senses/<name>/index.js",
{ timeout: 120_000 },
async () => {
fakeHome = mkdtempSync(join(tmpdir(), "nerve-create-e2e-"));
const nerveRoot = join(fakeHome, ".uncaged-nerve");
await runTestCli(fakeHome, ["init", "--force", "--skip-install"]);
await runTestCli(fakeHome, ["init", "--force"]);
const sense = await runTestCli(fakeHome, ["create", "sense", "e2e-sense"]);
expect(sense.exitCode).toBe(0);
expect(sense.stdout).toContain("✅");
const base = join(nerveRoot, "senses", "e2e-sense");
expect(existsSync(join(base, "package.json"))).toBe(true);
expect(existsSync(join(base, "package.json"))).toBe(false);
expect(existsSync(join(base, "src", "index.ts"))).toBe(true);
expect(existsSync(join(base, "src", "schema.ts"))).toBe(true);
expect(existsSync(join(base, "migrations", "0001_init.sql"))).toBe(true);
const pkg = JSON.parse(readFileSync(join(base, "package.json"), "utf8"));
expect(pkg.scripts.build).toContain("esbuild");
// pnpm install + build should produce index.js
expect(existsSync(join(base, "index.js"))).toBe(true);
expect(existsSync(join(nerveRoot, "dist", "senses", "e2e-sense", "index.js"))).toBe(true);
},
);
+79 -22
View File
@@ -37,7 +37,15 @@
* ```
*/
import { existsSync, mkdirSync, mkdtempSync, rmSync, symlinkSync, writeFileSync } from "node:fs";
import {
existsSync,
mkdirSync,
mkdtempSync,
readFileSync,
rmSync,
symlinkSync,
writeFileSync,
} from "node:fs";
import { createRequire } from "node:module";
import { tmpdir } from "node:os";
import { dirname, join } from "node:path";
@@ -61,6 +69,27 @@ const nerveDaemonRoot = dirname(require.resolve("@uncaged/nerve-daemon/package.j
const senseWorkerScript = join(nerveDaemonRoot, "dist", "sense-worker.js");
const workflowWorkerScript = join(nerveDaemonRoot, "dist", "workflow-worker.js");
function resolveDrizzleOrmPackageRoot(): string {
const requireFromDaemon = createRequire(join(nerveDaemonRoot, "package.json"));
const entry = requireFromDaemon.resolve("drizzle-orm");
let dir = dirname(entry);
for (let i = 0; i < 12; i += 1) {
const pkgPath = join(dir, "package.json");
if (existsSync(pkgPath)) {
try {
const name = (JSON.parse(readFileSync(pkgPath, "utf8")) as { name: string }).name;
if (name === "drizzle-orm") return dir;
} catch {
// keep walking
}
}
const parent = dirname(dir);
if (parent === dir) break;
dir = parent;
}
throw new Error("Could not resolve drizzle-orm package root for e2e harness");
}
const nerveYamlTemplate = `senses:
counter:
group: e2e
@@ -88,9 +117,9 @@ const echoWorkflowIndexJs = `const END = "__end__";
export default {
name: "echo",
roles: {
echo: async (start, _messages) => {
echo: async (ctx) => {
await new Promise((r) => setTimeout(r, 350));
const p = typeof start.content === "string" ? start.content : "";
const p = typeof ctx.start.content === "string" ? ctx.start.content : "";
return {
content: p.length > 0 ? "echo:" + p : "echo:empty",
meta: {},
@@ -121,17 +150,30 @@ api:
host: 127.0.0.1
`;
/** Empty migration — counter sense uses only `_signals` (auto-created by daemon). */
const counterMigration = `-- no-op migration for e2e counter sense
SELECT 1;
/** Schema for sense signal rows persisted via \`db.insert(table)\` (see sense-runtime). */
const counterMigration = `CREATE TABLE IF NOT EXISTS counter_signals (
id INTEGER PRIMARY KEY AUTOINCREMENT,
count INTEGER,
launched INTEGER,
idle INTEGER
);
`;
/**
* Minimal counter sense — each compute returns an incrementing count.
* Does NOT touch the DB directly; signal persistence is handled by the daemon
* (`runtime.persistSignal`) which writes to `_signals` automatically.
* Does NOT touch the DB directly in compute(); the daemon inserts into \`table\`
* and persistSignal handles \`_signals\`.
*/
const counterIndexJs = `let _count = 0;
const counterIndexJs = `import { integer, sqliteTable } from "drizzle-orm/sqlite-core";
export const table = sqliteTable("counter_signals", {
id: integer("id").primaryKey({ autoIncrement: true }),
count: integer("count"),
launched: integer("launched"),
idle: integer("idle"),
});
let _count = 0;
export async function compute(_db, _peers, _options) {
_count += 1;
return { signal: { count: _count }, workflow: null };
@@ -139,12 +181,21 @@ export async function compute(_db, _peers, _options) {
`;
/** First trigger launches local noop workflow; later triggers emit a plain signal. */
const counterIndexJsWithNoopWorkflow = `let _launched = false;
const counterIndexJsWithNoopWorkflow = `import { integer, sqliteTable } from "drizzle-orm/sqlite-core";
export const table = sqliteTable("counter_signals", {
id: integer("id").primaryKey({ autoIncrement: true }),
count: integer("count"),
launched: integer("launched"),
idle: integer("idle"),
});
let _launched = false;
export async function compute(_db, _peers, _options) {
if (!_launched) {
_launched = true;
return {
signal: { launched: true },
signal: { launched: 1 },
workflow: {
name: "noop",
maxRounds: 3,
@@ -153,7 +204,7 @@ export async function compute(_db, _peers, _options) {
},
};
}
return { signal: { idle: true }, workflow: null };
return { signal: { idle: 1 }, workflow: null };
}
`;
@@ -209,7 +260,8 @@ function writeWorkspaceLayout(nerveRoot: string, withNoopWorkflow: boolean): voi
mkdirSync(join(nerveRoot, "data", "senses"), { recursive: true });
mkdirSync(join(nerveRoot, "data", "blobs"), { recursive: true });
mkdirSync(join(nerveRoot, "senses", "counter", "migrations"), { recursive: true });
mkdirSync(join(nerveRoot, "workflows", "echo", "dist"), { recursive: true });
mkdirSync(join(nerveRoot, "dist", "senses", "counter"), { recursive: true });
mkdirSync(join(nerveRoot, "dist", "workflows", "echo"), { recursive: true });
writeFileSync(
join(nerveRoot, "nerve.yaml"),
withNoopWorkflow ? nerveYamlWithNoopWorkflow : nerveYamlTemplate,
@@ -221,20 +273,19 @@ function writeWorkspaceLayout(nerveRoot: string, withNoopWorkflow: boolean): voi
"utf8",
);
writeFileSync(
join(nerveRoot, "senses", "counter", "index.js"),
join(nerveRoot, "dist", "senses", "counter", "index.js"),
withNoopWorkflow ? counterIndexJsWithNoopWorkflow : counterIndexJs,
"utf8",
);
writeFileSync(
join(nerveRoot, "workflows", "echo", "dist", "index.js"),
join(nerveRoot, "dist", "workflows", "echo", "index.js"),
echoWorkflowIndexJs,
"utf8",
);
if (withNoopWorkflow) {
mkdirSync(join(nerveRoot, "workflows", "noop", "dist"), { recursive: true });
mkdirSync(join(nerveRoot, "workflows", "noop", "migrations"), { recursive: true });
mkdirSync(join(nerveRoot, "dist", "workflows", "noop"), { recursive: true });
writeFileSync(
join(nerveRoot, "workflows", "noop", "dist", "index.js"),
join(nerveRoot, "dist", "workflows", "noop", "index.js"),
noopWorkflowIndexJs,
"utf8",
);
@@ -267,11 +318,17 @@ function useNoopWorkflow(opts: StartTestDaemonOpts): boolean {
*/
export function linkWorkspaceDaemonIntoNerveRoot(nerveRoot: string): void {
const daemonPkgRoot = dirname(require.resolve("@uncaged/nerve-daemon/package.json"));
const linkDir = join(nerveRoot, "node_modules", "@uncaged");
const linkPath = join(linkDir, "nerve-daemon");
const nm = join(nerveRoot, "node_modules");
mkdirSync(nm, { recursive: true });
const linkDir = join(nm, "@uncaged");
mkdirSync(linkDir, { recursive: true });
if (existsSync(linkPath)) return;
symlinkSync(daemonPkgRoot, linkPath);
const linkPath = join(linkDir, "nerve-daemon");
if (!existsSync(linkPath)) symlinkSync(daemonPkgRoot, linkPath);
const drizzlePkgRoot = resolveDrizzleOrmPackageRoot();
const drizzleLink = join(nm, "drizzle-orm");
if (!existsSync(drizzleLink)) symlinkSync(drizzlePkgRoot, drizzleLink);
}
/**
@@ -202,10 +202,13 @@ describe("e2e init", () => {
// Verify key files exist
expect(existsSync(join(nerveRoot, "nerve.yaml"))).toBe(true);
expect(existsSync(join(nerveRoot, "package.json"))).toBe(true);
expect(existsSync(join(nerveRoot, "pnpm-workspace.yaml"))).toBe(true);
expect(existsSync(join(nerveRoot, "scripts", "build.mjs"))).toBe(true);
expect(existsSync(join(nerveRoot, "biome.json"))).toBe(true);
expect(existsSync(join(nerveRoot, ".gitignore"))).toBe(true);
expect(existsSync(join(nerveRoot, "senses", "cpu-usage", "package.json"))).toBe(true);
expect(existsSync(join(nerveRoot, "AGENT.md"))).toBe(true);
const agentMd = readFileSync(join(nerveRoot, "AGENT.md"), "utf8");
expect(agentMd).toContain("verb-first");
expect(agentMd).toContain("createRole");
expect(existsSync(join(nerveRoot, "senses", "cpu-usage", "src", "index.ts"))).toBe(true);
expect(existsSync(join(nerveRoot, "senses", "cpu-usage", "src", "schema.ts"))).toBe(true);
expect(existsSync(join(nerveRoot, "senses", "cpu-usage", "migrations", "0001_init.sql"))).toBe(
@@ -214,19 +217,14 @@ describe("e2e init", () => {
expect(existsSync(join(nerveRoot, ".cursor", "rules", "nerve-skills.mdc"))).toBe(true);
const pkgJson = readFileSync(join(nerveRoot, "package.json"), "utf8");
expect(pkgJson).toContain('"@uncaged/nerve-skills": "latest"');
expect(pkgJson).toContain('"build": "pnpm -r build"');
expect(pkgJson).not.toContain("nerve-skills");
expect(pkgJson).toContain('"build": "node scripts/build.mjs"');
expect(pkgJson).toContain('"esbuild": "^0.27.0"');
const workspaceYaml = readFileSync(join(nerveRoot, "pnpm-workspace.yaml"), "utf8");
expect(workspaceYaml).toContain("workflows/*");
expect(workspaceYaml).toContain("senses/*");
const sensePkgJson = readFileSync(
join(nerveRoot, "senses", "cpu-usage", "package.json"),
"utf8",
);
expect(sensePkgJson).toContain("nerve-sense-cpu-usage");
expect(sensePkgJson).toContain("esbuild");
const buildScript = readFileSync(join(nerveRoot, "scripts", "build.mjs"), "utf8");
expect(buildScript).toContain('path.join(root, "senses")');
expect(buildScript).toContain('path.join(root, "workflows")');
expect(buildScript).toContain("dist");
});
it("generated nerve.yaml passes validate", { timeout: 10_000 }, async () => {
+2
View File
@@ -3,6 +3,7 @@ import "@uncaged/nerve-daemon/experimental-warning-suppression.js";
import { defineCommand, runMain } from "citty";
import { consumeGlobalDaemonCliFlags } from "./cli-global.js";
import { agentCommand } from "./commands/agent.js";
import { createCommand } from "./commands/create.js";
import { daemonCommand } from "./commands/daemon.js";
import { devCommand } from "./commands/dev.js";
@@ -42,6 +43,7 @@ const main = defineCommand({
"Nerve — an AI agent kernel. Global options: --host <host:port> (remote HTTP), --api-token <secret> (Bearer auth).",
},
subCommands: {
agent: agentCommand,
init: initCommand,
create: createCommand,
daemon: daemonCommand,
+378
View File
@@ -0,0 +1,378 @@
import {
cpSync,
existsSync,
mkdirSync,
readFileSync,
readdirSync,
rmSync,
statSync,
writeFileSync,
} from "node:fs";
import { homedir } from "node:os";
import { dirname, join, resolve as resolvePath } from "node:path";
import { fileURLToPath } from "node:url";
import { defineCommand } from "citty";
function getPackageRootDir(): string {
const thisFile = fileURLToPath(import.meta.url);
let dir = dirname(thisFile);
for (let i = 0; i < 5; i++) {
if (existsSync(join(dir, "package.json"))) return dir;
dir = dirname(dir);
}
throw new Error("Cannot locate package root. Is the CLI package intact?");
}
function getCliVersion(): string {
const pkgPath = join(getPackageRootDir(), "package.json");
const pkg = JSON.parse(readFileSync(pkgPath, "utf8")) as { version: string };
return pkg.version;
}
let _cachedVersion: string | null = null;
function cliVersion(): string {
if (_cachedVersion === null) _cachedVersion = getCliVersion();
return _cachedVersion;
}
function getSkillSourceDir(): string {
const root = getPackageRootDir();
const skillsDir = join(root, "skills");
if (!existsSync(skillsDir)) {
throw new Error("Cannot locate skills directory. Is the CLI package intact?");
}
return skillsDir;
}
function getHermesSkillDir(profile: string | null): string {
const hermesHome = join(homedir(), ".hermes");
if (profile !== null) {
return join(hermesHome, "profiles", profile, "skills", "nerve");
}
return join(hermesHome, "skills", "nerve");
}
function readVersionFile(skillDir: string): string | null {
const versionPath = join(skillDir, ".nerve-version");
if (!existsSync(versionPath)) return null;
return readFileSync(versionPath, "utf8").trim();
}
function writeVersionFile(skillDir: string, version: string): void {
writeFileSync(join(skillDir, ".nerve-version"), `${version}\n`, "utf8");
}
const CURSOR_VERSION_MARKER_RE = /<!--\s*nerve-cli-version:\s*([^>]+?)\s*-->/;
function resolveCursorProjectDir(pathArg: string | null): string {
const raw = pathArg !== null && pathArg !== "" ? pathArg : process.cwd();
return resolvePath(raw);
}
function assertDirectory(projectDir: string, label: string): void {
if (!existsSync(projectDir)) {
process.stderr.write(`${label} does not exist: ${projectDir}\n`);
process.exit(1);
}
if (!statSync(projectDir).isDirectory()) {
process.stderr.write(`${label} is not a directory: ${projectDir}\n`);
process.exit(1);
}
}
function readCursorInjectVersion(projectDir: string): string | null {
const versionPath = join(projectDir, ".nerve-version");
if (existsSync(versionPath)) {
return readFileSync(versionPath, "utf8").trim();
}
const rulesPath = join(projectDir, ".cursorrules");
if (!existsSync(rulesPath)) return null;
const content = readFileSync(rulesPath, "utf8");
const match = content.match(CURSOR_VERSION_MARKER_RE);
return match !== null ? match[1].trim() : null;
}
function injectCursor(projectDir: string): void {
assertDirectory(projectDir, "Project directory");
const rulesPath = join(projectDir, ".cursorrules");
const existingVer = readCursorInjectVersion(projectDir);
if (existingVer === cliVersion() && existsSync(rulesPath)) {
process.stdout.write(
`✅ Cursor .cursorrules is already up to date (v${cliVersion()}) at ${projectDir}\n`,
);
return;
}
const templatePath = join(getSkillSourceDir(), "cursor", ".cursorrules");
if (!existsSync(templatePath)) {
throw new Error("Cannot locate cursor/.cursorrules template. Is the CLI package intact?");
}
let body = readFileSync(templatePath, "utf8");
body = body.replaceAll("__NERVE_CLI_VERSION__", cliVersion());
writeFileSync(rulesPath, body, "utf8");
writeVersionFile(projectDir, cliVersion());
const action = existingVer !== null ? "Updated" : "Installed";
process.stdout.write(`${action} Cursor .cursorrules v${cliVersion()} at ${projectDir}\n`);
}
function removeCursor(projectDir: string): void {
assertDirectory(projectDir, "Project directory");
const rulesPath = join(projectDir, ".cursorrules");
const versionPath = join(projectDir, ".nerve-version");
if (!existsSync(rulesPath)) {
process.stdout.write(`ℹ️ Cursor .cursorrules is not present at ${projectDir}\n`);
return;
}
rmSync(rulesPath, { force: true });
if (existsSync(versionPath)) {
rmSync(versionPath, { force: true });
}
process.stdout.write(`✅ Removed Cursor .cursorrules from ${projectDir}\n`);
}
function injectHermes(profile: string | null): void {
const sourceDir = join(getSkillSourceDir(), "hermes");
const targetDir = getHermesSkillDir(profile);
const existing = readVersionFile(targetDir);
if (existing === cliVersion()) {
const loc = profile !== null ? ` (profile: ${profile})` : "";
process.stdout.write(`✅ Hermes nerve skill is already up to date (v${cliVersion()})${loc}\n`);
return;
}
mkdirSync(targetDir, { recursive: true });
cpSync(sourceDir, targetDir, { recursive: true });
writeVersionFile(targetDir, cliVersion());
const action = existing !== null ? "Updated" : "Installed";
const loc = profile !== null ? ` (profile: ${profile})` : "";
process.stdout.write(`${action} Hermes nerve skill v${cliVersion()}${loc}\n`);
process.stdout.write(`${targetDir}/SKILL.md\n`);
}
function removeHermes(profile: string | null): void {
const targetDir = getHermesSkillDir(profile);
if (!existsSync(targetDir)) {
process.stdout.write("ℹ️ Hermes nerve skill is not installed.\n");
return;
}
rmSync(targetDir, { recursive: true, force: true });
const loc = profile !== null ? ` (profile: ${profile})` : "";
process.stdout.write(`✅ Removed Hermes nerve skill${loc}\n`);
}
function printCursorStatusLine(projectDir: string): void {
const rulesPath = join(projectDir, ".cursorrules");
const label = `Cursor (${projectDir})`;
if (!existsSync(rulesPath)) {
process.stdout.write(` ${label}: ❌ not installed\n`);
return;
}
const ver = readCursorInjectVersion(projectDir);
if (ver === null) {
process.stdout.write(
` ${label}: ⚠️ installed (unknown version; run \`nerve agent inject cursor\`)\n`,
);
return;
}
if (ver === cliVersion()) {
process.stdout.write(` ${label}: ✅ v${ver}\n`);
} else {
process.stdout.write(
` ${label}: ⚠️ v${ver} → v${cliVersion()} available (run \`nerve agent inject cursor\`)\n`,
);
}
}
function printStatus(): void {
process.stdout.write(`nerve agent skills (CLI v${cliVersion()})\n\n`);
printCursorStatusLine(process.cwd());
process.stdout.write("\n");
// Default profile
const defaultDir = getHermesSkillDir(null);
const defaultVer = readVersionFile(defaultDir);
printAgentLine("Hermes (default)", defaultVer);
// Named profiles
const profilesDir = join(homedir(), ".hermes", "profiles");
if (existsSync(profilesDir)) {
const profiles = readdirSync(profilesDir, { withFileTypes: true })
.filter((d) => d.isDirectory())
.map((d) => d.name);
for (const profile of profiles) {
const dir = getHermesSkillDir(profile);
const ver = readVersionFile(dir);
if (ver !== null) {
printAgentLine(`Hermes (${profile})`, ver);
}
}
}
process.stdout.write("\n");
}
function printAgentLine(label: string, version: string | null): void {
if (version === null) {
process.stdout.write(` ${label}: ❌ not installed\n`);
} else if (version === cliVersion()) {
process.stdout.write(` ${label}: ✅ v${version}\n`);
} else {
process.stdout.write(
` ${label}: ⚠️ v${version} → v${cliVersion()} available (run \`nerve agent update\`)\n`,
);
}
}
const injectCommand = defineCommand({
meta: {
name: "inject",
description: "Inject nerve skill into an AI agent",
},
args: {
target: {
type: "positional",
description: "Agent target: hermes | cursor",
},
profile: {
type: "string",
description: "Hermes profile name (default: main profile)",
},
path: {
type: "string",
description: "Project directory for Cursor rules (default: cwd); only used with cursor",
},
},
run({ args }) {
const target = args.target;
if (target === "hermes") {
if (args.path != null && args.path !== "") {
process.stderr.write("❌ --path applies only to the cursor target\n");
process.exit(1);
}
injectHermes(args.profile ?? null);
return;
}
if (target === "cursor") {
if (args.profile != null && args.profile !== "") {
process.stderr.write("❌ --profile applies only to the hermes target\n");
process.exit(1);
}
const pathArg = args.path != null && args.path !== "" ? args.path : null;
injectCursor(resolveCursorProjectDir(pathArg));
return;
}
process.stderr.write(`❌ Unknown agent target: ${target}\n`);
process.stderr.write(" Supported targets: hermes, cursor\n");
process.exit(1);
},
});
const updateCommand = defineCommand({
meta: {
name: "update",
description: "Update all injected nerve skills to current CLI version",
},
run() {
let updated = 0;
// Default profile
const defaultDir = getHermesSkillDir(null);
if (existsSync(defaultDir)) {
injectHermes(null);
updated++;
}
// Named profiles
const profilesDir = join(homedir(), ".hermes", "profiles");
if (existsSync(profilesDir)) {
const profiles = readdirSync(profilesDir, { withFileTypes: true })
.filter((d) => d.isDirectory())
.map((d) => d.name);
for (const profile of profiles) {
const dir = getHermesSkillDir(profile);
if (existsSync(dir)) {
injectHermes(profile);
updated++;
}
}
}
if (updated === 0) {
process.stdout.write("ℹ️ No injected skills found. Run `nerve agent inject hermes` first.\n");
}
},
});
const removeCommand = defineCommand({
meta: {
name: "remove",
description: "Remove injected nerve skill from an AI agent",
},
args: {
target: {
type: "positional",
description: "Agent target: hermes | cursor",
},
profile: {
type: "string",
description: "Hermes profile name (default: main profile)",
},
path: {
type: "string",
description: "Project directory for Cursor rules (default: cwd); only used with cursor",
},
},
run({ args }) {
const target = args.target;
if (target === "hermes") {
if (args.path != null && args.path !== "") {
process.stderr.write("❌ --path applies only to the cursor target\n");
process.exit(1);
}
removeHermes(args.profile ?? null);
return;
}
if (target === "cursor") {
if (args.profile != null && args.profile !== "") {
process.stderr.write("❌ --profile applies only to the hermes target\n");
process.exit(1);
}
const pathArg = args.path != null && args.path !== "" ? args.path : null;
removeCursor(resolveCursorProjectDir(pathArg));
return;
}
process.stderr.write(`❌ Unknown agent target: ${target}\n`);
process.stderr.write(" Supported targets: hermes, cursor\n");
process.exit(1);
},
});
const statusCommand = defineCommand({
meta: {
name: "status",
description: "Show injection status of nerve skills across agents",
},
run() {
printStatus();
},
});
export const agentCommand = defineCommand({
meta: {
name: "agent",
description: "Manage nerve skill injection for AI agents",
},
subCommands: {
inject: injectCommand,
update: updateCommand,
remove: removeCommand,
status: statusCommand,
},
});
+27 -58
View File
@@ -20,34 +20,13 @@ export type WorkflowScaffoldFiles = {
indexTs: string;
roleMainIndexTs: string;
roleMainPromptMd: string;
packageJson: string;
};
export function buildWorkflowPackageJson(name: string): string {
return `${JSON.stringify(
{
name: `nerve-workflow-${name}`,
private: true,
type: "module",
scripts: {
build:
"esbuild index.ts --bundle --platform=node --format=esm --outdir=dist --packages=external",
},
devDependencies: {
esbuild: "^0.27.0",
},
},
null,
2,
)}\n`;
}
export function buildWorkflowScaffold(name: string): WorkflowScaffoldFiles {
return {
indexTs: buildWorkflowIndexTs(name),
roleMainIndexTs: buildWorkflowMainRoleIndexTs(name),
roleMainPromptMd: buildWorkflowMainRolePromptMd(name),
packageJson: buildWorkflowPackageJson(name),
};
}
@@ -132,32 +111,14 @@ export const ${exportName} = sqliteTable("${table}", {
`;
}
export function buildSensePackageJson(name: string): string {
return `${JSON.stringify(
{
name: `nerve-sense-${name}`,
private: true,
type: "module",
scripts: {
build:
"esbuild src/index.ts --bundle --platform=node --format=esm --outdir=. --out-extension:.js=.js --packages=external",
},
devDependencies: {
esbuild: "^0.27.0",
"drizzle-orm": "*",
},
},
null,
2,
)}\n`;
}
export function buildSenseIndexTs(senseId: string): string {
const exportName = senseIdToSchemaExportName(senseId);
return `import type { LibSQLDatabase } from "drizzle-orm/libsql";
import { ${exportName} } from "./schema.js";
export { ${exportName} as table } from "./schema.js";
type SenseResult = {
signal: { label: string; ts: number };
workflow: null;
@@ -245,30 +206,39 @@ const createWorkflowCommand = defineCommand({
mkdirSync(workflowDir, { recursive: true });
const scaffold = buildWorkflowScaffold(args.name);
writeFile(join(workflowDir, "package.json"), scaffold.packageJson);
writeFile(join(workflowDir, "index.ts"), scaffold.indexTs);
writeFile(join(workflowDir, "roles", "main", "index.ts"), scaffold.roleMainIndexTs);
writeFile(join(workflowDir, "roles", "main", "prompt.md"), scaffold.roleMainPromptMd);
process.stdout.write("✅ Workflow scaffolded:\n");
process.stdout.write(` ${join(workflowDir, "package.json")}\n`);
process.stdout.write(` ${join(workflowDir, "index.ts")}\n`);
process.stdout.write(` ${join(workflowDir, "roles", "main", "index.ts")}\n`);
process.stdout.write(` ${join(workflowDir, "roles", "main", "prompt.md")}\n`);
process.stdout.write("\nBuilding workspace (workflows + senses)…\n");
try {
await spawnAsync("pnpm", ["run", "build"], nerveRoot);
process.stdout.write(
`✅ Build complete — ${join("dist", "workflows", args.name, "index.js")} ready.\n`,
);
} catch {
process.stdout.write(`⚠️ Build failed. Run manually:\n cd ${nerveRoot} && pnpm run build\n`);
}
process.stdout.write("\n💡 Next steps:\n");
process.stdout.write(
` 1. In ${workflowDir}, run \`npm install\` then \`npm run build\` (bundles to dist/index.js).\n`,
);
process.stdout.write(" 2. Add to nerve.yaml:\n");
process.stdout.write(" 1. Add to nerve.yaml:\n");
process.stdout.write(" workflows:\n");
process.stdout.write(` ${args.name}:\n`);
process.stdout.write(" concurrency: 1\n");
process.stdout.write(" overflow: drop\n");
process.stdout.write(
` 3. Edit ${join(workflowDir, "roles", "main", "index.ts")} (and optional prompt.md).\n`,
` 2. Edit ${join(workflowDir, "roles", "main", "index.ts")} (and optional prompt.md).\n`,
);
process.stdout.write(
` 4. Adjust moderator routing in ${join(workflowDir, "index.ts")} if you add roles.\n`,
` 3. Adjust moderator routing in ${join(workflowDir, "index.ts")} if you add roles.\n`,
);
process.stdout.write(
` 4. After edits, run \`pnpm run build\` from the workspace root (${nerveRoot}); output is dist/workflows/<name>/index.js.\n`,
);
process.stdout.write(" 5. Run `nerve start` to launch the daemon.\n");
},
@@ -309,26 +279,23 @@ const createSenseCommand = defineCommand({
mkdirSync(join(senseDir, "src"), { recursive: true });
mkdirSync(join(senseDir, "migrations"), { recursive: true });
writeFile(join(senseDir, "package.json"), buildSensePackageJson(args.name));
writeFile(join(senseDir, "src", "index.ts"), buildSenseIndexTs(args.name));
writeFile(join(senseDir, "src", "schema.ts"), buildSenseSchemaTs(args.name));
writeFile(join(senseDir, "migrations", "0001_init.sql"), buildSenseMigrationSql(args.name));
process.stdout.write("✅ Sense scaffolded:\n");
process.stdout.write(` ${join(senseDir, "package.json")}\n`);
process.stdout.write(` ${join(senseDir, "src", "index.ts")}\n`);
process.stdout.write(` ${join(senseDir, "src", "schema.ts")}\n`);
process.stdout.write(` ${join(senseDir, "migrations", "0001_init.sql")}\n`);
process.stdout.write("\nInstalling sense dependencies and building…\n");
process.stdout.write("\nBuilding workspace (senses + workflows)…\n");
try {
await spawnAsync("pnpm", ["install", "--no-cache", "--ignore-workspace"], senseDir);
await spawnAsync("pnpm", ["run", "build"], senseDir);
process.stdout.write("✅ Build complete — index.js ready.\n");
} catch {
await spawnAsync("pnpm", ["run", "build"], nerveRoot);
process.stdout.write(
`⚠️ Build failed. Run manually:\n cd ${senseDir} && pnpm install --no-cache --ignore-workspace && pnpm run build\n`,
` Build complete — ${join("dist", "senses", args.name, "index.js")} ready.\n`,
);
} catch {
process.stdout.write(`⚠️ Build failed. Run manually:\n cd ${nerveRoot} && pnpm run build\n`);
}
process.stdout.write("\n💡 Next steps:\n");
@@ -341,7 +308,9 @@ const createSenseCommand = defineCommand({
process.stdout.write(
` 2. Edit ${join(senseDir, "src", "index.ts")} to implement ${args.name}.\n`,
);
process.stdout.write(` 3. Re-run \`pnpm run build\` in ${senseDir} after edits.\n`);
process.stdout.write(
` 3. Re-run \`pnpm run build\` from the workspace root (${nerveRoot}) after edits.\n`,
);
process.stdout.write(" 4. Run `nerve start` to launch the daemon.\n");
},
});
+131 -42
View File
@@ -17,11 +17,6 @@ senses:
interval: 10s
`;
const PNPM_WORKSPACE_YAML = `packages:
- 'workflows/*'
- 'senses/*'
`;
const BIOME_JSON = `{
"$schema": "https://biomejs.dev/schemas/1.9.0/schema.json",
"formatter": {
@@ -54,17 +49,20 @@ const PACKAGE_JSON = `${JSON.stringify(
private: true,
type: "module",
scripts: {
build: "pnpm -r build",
build: "node scripts/build.mjs",
},
dependencies: {
"@uncaged/nerve-core": "latest",
"@uncaged/nerve-daemon": "latest",
"@uncaged/nerve-skills": "latest",
"drizzle-orm": "latest",
zod: "^4.3.6",
},
devDependencies: {
"@biomejs/biome": "latest",
"@types/node": "^22.0.0",
"drizzle-kit": "latest",
esbuild: "^0.27.0",
typescript: "^5.7.0",
},
pnpm: {
onlyBuiltDependencies: ["esbuild"],
@@ -74,6 +72,54 @@ const PACKAGE_JSON = `${JSON.stringify(
2,
)}\n`;
const BUILD_MJS = `import * as esbuild from "esbuild";
import fs from "node:fs";
import path from "node:path";
import { fileURLToPath } from "node:url";
const root = path.join(path.dirname(fileURLToPath(import.meta.url)), "..");
const dist = path.join(root, "dist");
const opts = {
bundle: true,
platform: "node",
format: "esm",
packages: "external",
};
function listDirs(dir) {
if (!fs.existsSync(dir)) return [];
return fs
.readdirSync(dir)
.filter((name) => !name.startsWith(".") && !name.startsWith("_"))
.map((name) => ({ name, full: path.join(dir, name) }))
.filter(({ full }) => fs.statSync(full).isDirectory());
}
async function main() {
// Clean dist/
fs.rmSync(dist, { recursive: true, force: true });
for (const { name, full } of listDirs(path.join(root, "senses"))) {
const entry = path.join(full, "src", "index.ts");
if (!fs.existsSync(entry)) continue;
const outfile = path.join(dist, "senses", name, "index.js");
fs.mkdirSync(path.dirname(outfile), { recursive: true });
await esbuild.build({ ...opts, entryPoints: [entry], outfile });
}
for (const { name, full } of listDirs(path.join(root, "workflows"))) {
const entry = path.join(full, "index.ts");
if (!fs.existsSync(entry)) continue;
const outfile = path.join(dist, "workflows", name, "index.js");
fs.mkdirSync(path.dirname(outfile), { recursive: true });
await esbuild.build({ ...opts, entryPoints: [entry], outfile });
}
}
await main();
`;
const GITIGNORE = `data/
logs/
nerve.pid
@@ -81,33 +127,92 @@ node_modules/
knowledge.db
`;
/** Generated at workspace root so agents can \`cat AGENT.md\` instead of npm skill paths. */
const AGENT_MD = `# Nerve workspace — agent guide
This file is created by \`nerve init\`. Read it before implementing senses or workflows.
## Directory layout
| Path | Purpose |
|------|---------|
| \`nerve.yaml\` | Senses, workflows, intervals, groups |
| \`package.json\` | Single root package — no per-sense/per-workflow packages |
| \`scripts/build.mjs\` | Root esbuild step; output under \`dist/\` |
| \`senses/<name>/src/index.ts\` | Sense \`compute()\` entry |
| \`senses/<name>/src/schema.ts\` | Drizzle SQLite schema (TypeScript) |
| \`senses/<name>/migrations/*.sql\` | SQL migrations (next to \`src/\`, not inside it) |
| \`workflows/<name>/index.ts\` | Default export: \`WorkflowDefinition\` |
| \`workflows/<name>/roles/<role>.ts\` | One TypeScript file per role |
| \`dist/senses/<name>/index.js\` | Bundled sense (after build) |
| \`dist/workflows/<name>/index.js\` | Bundled workflow (after build) |
There is **no** \`package.json\` or \`tsconfig.json\` inside individual senses or workflows.
## Naming
- **Workflows:** verb-first kebab-case (e.g. \`review-pull-request\`, \`deploy-staging\`). Avoid bare nouns like \`notifications\`.
- **Senses:** kebab-case descriptive nouns (e.g. \`cpu-usage\`).
## Workflow roles — four-tuple pattern
Wire each role with \`createRole\` from \`@uncaged/nerve-workflow-utils\`:
1. **Adapter** — \`AgentFn\` (LLM call)
2. **Prompt builder** — \`async (ctx: ThreadContext) => string\`
3. **Meta schema** — Zod object (routing / structured output from the model)
4. **Extractor config** — how JSON meta is parsed from replies
Keep meta small (often one boolean per role). The **moderator** in \`WorkflowDefinition\` routes between role names.
## Build commands
Always run from the **workspace root**:
\`\`\`bash
pnpm run build
# or: npm run build
\`\`\`
Fix errors until this succeeds. New workflows must appear under \`workflows/<name>/\` and be registered in \`nerve.yaml\`; new senses under \`senses/<name>/\` with matching \`nerve.yaml\` entries.
## Coding style (Nerve conventions)
- Use \`type\`, not \`interface\`; prefer \`function\` over classes (except errors / library requirements).
- **Named exports only** — no \`export default\` (exception: \`workflows/<name>/index.ts\` uses default export for the daemon loader).
- Nullable fields: \`T | null\`, not TypeScript optional \`?:\`.
- No dynamic \`import()\` in workspace code (bundling and tooling assume static imports).
- Use \`async\`/\`await\`; use a \`Result\` type for expected failures instead of control-flow try/catch.
## Extra references (optional)
- \`CONVENTIONS.md\` — project-specific overrides at repo root.
- \`.knowledge/*.md\` — deeper docs when working inside the Nerve monorepo.
- \`.cursor/skills/\` — Cursor Agent Skills (\`SKILL.md\` per skill).
`;
const NERVE_SKILLS_MDC = `---
description: >-
Nerve skills package — where bundled Agent Skills live in this workspace and how to use them
Where Agent Skills live in this Nerve workspace and how to use them with Cursor
alwaysApply: true
---
# Nerve skills (\`@uncaged/nerve-skills\`)
# Nerve Agent Skills
This workspace lists **@uncaged/nerve-skills** in \`package.json\`. It ships **Agent Skills** (one directory per skill, each with a \`SKILL.md\`) for Nerve development and related tasks.
**Agent Skills** are directories that contain a \`SKILL.md\` (with YAML frontmatter). Cursor loads them from **Project Skills** paths (for example \`.cursor/skills/\` or your global skills directory).
## After install
## Getting Nerve-oriented skills
Run your package manager in this workspace (e.g. \`pnpm install\`, \`npm install\` — whatever \`nerve init\` used). Then skills are on disk at:
There is no separate npm package for skills in the default workspace. To align with Nerve CLI, daemon, and monorepo conventions:
- \`node_modules/@uncaged/nerve-skills/<skill-id>/SKILL.md\`
Example (current catalog):
- **nerve-dev** — Nerve architecture, CLI, sense/workflow patterns, \`nerve.yaml\`, and conventions: read \`node_modules/@uncaged/nerve-skills/nerve-dev/SKILL.md\`.
1. Copy or symlink skill folders from the **Nerve** repository (e.g. \`packages/skills/*/\`) into \`.cursor/skills/\`, **or**
2. Follow project documentation and \`CLAUDE.md\` / \`.cursor/rules/\` in this repo.
## How to use in an agent
1. For tasks that match a skill’s **description** (in the \`SKILL.md\` frontmatter), open that \`SKILL.md\` and follow its structure and checklists.
2. Prefer the skill as the **source of truth** for Nerve-specific conventions over generic assumptions.
3. If the catalog grows, new skills appear as new sibling directories under \`node_modules/@uncaged/nerve-skills/\`.
Do not commit \`node_modules\`; the dependency is the supported way to get and update skills to match \`@uncaged/nerve-skills\` on npm.
1. When a task matches a skill’s **description** (in \`SKILL.md\` frontmatter), open that file and follow its steps.
2. Prefer those conventions for sense/workflow layout, \`nerve.yaml\`, and tooling over generic guesses.
3. Keep skills versioned with your dotfiles or project; update them when you upgrade Nerve.
`;
const execFileAsync = promisify(execFile);
@@ -124,6 +229,8 @@ export const cpuUsage = sqliteTable("cpu_usage", {
const CPU_INDEX_TS = `import { cpus } from "node:os";
export { cpuUsage as table } from "./schema.js";
type SenseResult = {
signal: { model: string; loadPercent: number; ts: number };
workflow: null;
@@ -154,24 +261,6 @@ export async function compute(): Promise<SenseResult> {
}
`;
const CPU_SENSE_PACKAGE_JSON = `${JSON.stringify(
{
name: "nerve-sense-cpu-usage",
private: true,
type: "module",
scripts: {
build:
"esbuild src/index.ts --bundle --platform=node --format=esm --outdir=. --out-extension:.js=.js --packages=external",
},
devDependencies: {
esbuild: "^0.27.0",
"drizzle-orm": "*",
},
},
null,
2,
)}\n`;
const CPU_MIGRATION_SQL = `CREATE TABLE IF NOT EXISTS cpu_usage (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts INTEGER NOT NULL,
@@ -334,10 +423,10 @@ async function runInitWorkspace(force: boolean, skipInstall = false): Promise<vo
writeFile(join(nerveRoot, "nerve.yaml"), NERVE_YAML);
writeFile(join(nerveRoot, "package.json"), PACKAGE_JSON);
writeFile(join(nerveRoot, "pnpm-workspace.yaml"), PNPM_WORKSPACE_YAML);
writeFile(join(nerveRoot, "scripts", "build.mjs"), BUILD_MJS);
writeFile(join(nerveRoot, "biome.json"), BIOME_JSON);
writeFile(join(nerveRoot, ".gitignore"), GITIGNORE);
writeFile(join(nerveRoot, "senses", "cpu-usage", "package.json"), CPU_SENSE_PACKAGE_JSON);
writeFile(join(nerveRoot, "AGENT.md"), AGENT_MD);
writeFile(join(nerveRoot, "senses", "cpu-usage", "src", "index.ts"), CPU_INDEX_TS);
writeFile(join(nerveRoot, "senses", "cpu-usage", "src", "schema.ts"), CPU_SCHEMA_TS);
writeFile(
+1 -1
View File
@@ -1,6 +1,6 @@
import { describe, expect, it } from "vitest";
import { parseNerveConfig } from "../parse-nerve-config.js";
import { parseNerveConfig } from "../config.js";
const VALID_CONFIG = `
senses:
@@ -1,6 +1,6 @@
import { describe, expect, it } from "vitest";
import { parseDaemonIpcRequest } from "../daemon-ipc-protocol.js";
import { parseDaemonIpcRequest } from "../daemon.js";
describe("parseDaemonIpcRequest", () => {
it("parses trigger-workflow", () => {
@@ -1,6 +1,6 @@
import { describe, expect, it } from "vitest";
import { parseKnowledgeYaml } from "../knowledge-config.js";
import { parseKnowledgeYaml } from "../config.js";
describe("parseKnowledgeYaml", () => {
it("parses include and exclude glob lists", () => {
@@ -1,6 +1,6 @@
import { describe, expect, it } from "vitest";
import { parseWorkflowTrigger, routeSenseComputeOutput } from "../sense-workflow-directive.js";
import { parseWorkflowTrigger, routeSenseComputeOutput } from "../sense.js";
describe("parseWorkflowTrigger", () => {
it("accepts a valid trigger object", () => {
@@ -1,6 +1,6 @@
import { describe, expect, it } from "vitest";
import { spawnSafe } from "../spawn-safe.js";
import { spawnSafe } from "../util.js";
describe("spawnSafe", () => {
it("passes argv literally without shell interpretation (injection-safe)", async () => {
-5
View File
@@ -1,5 +0,0 @@
/**
* Agent adapter ids referenced by tooling / docs (RFC-003).
* Workflows import adapter packages directly; echo may be used in tests via a small factory.
*/
export const KNOWN_AGENT_ADAPTER_IDS = ["echo", "cursor", "hermes", "codex"] as const;
@@ -21,3 +21,9 @@ export class ExtractError extends Error {
Object.setPrototypeOf(this, new.target.prototype);
}
}
/**
* Agent adapter ids referenced by tooling / docs (RFC-003).
* Workflows import adapter packages directly; echo may be used in tests via a small factory.
*/
export const KNOWN_AGENT_ADAPTER_IDS = ["echo", "cursor", "hermes", "codex"] as const;
+399 -2
View File
@@ -1,5 +1,7 @@
/** Default max rows kept in each sense's `_signals` SQLite table (see `retention` on `SenseConfig`). */
export const DEFAULT_SENSE_SIGNAL_RETENTION = 10_000;
import { parse } from "yaml";
import { type Result, err, isPlainRecord, ok, parseDurationStringToMs } from "./util.js";
import { DEFAULT_ENGINE_MAX_ROUNDS } from "./workflow.js";
export type SenseConfig = {
group: string;
@@ -75,3 +77,398 @@ export type NerveConfig = {
/** Global extract defaults; `null` when the section is omitted. */
extract: ExtractConfig | null;
};
export type KnowledgeConfig = {
include: ReadonlyArray<string>;
exclude: ReadonlyArray<string>;
};
/** Default max rows kept in each sense's `_signals` SQLite table (see `retention` on `SenseConfig`). */
export const DEFAULT_SENSE_SIGNAL_RETENTION = 10_000;
function isValidGroupName(value: string): boolean {
return /^[a-zA-Z0-9_-]+$/.test(value);
}
function parseRetentionField(name: string, field: unknown): Result<number> {
if (field === undefined || field === null) {
return ok(DEFAULT_SENSE_SIGNAL_RETENTION);
}
if (typeof field !== "number" || !Number.isInteger(field) || field < 1) {
return err(new Error(`senses.${name}.retention: must be a positive integer`));
}
return ok(field);
}
function parseDurationField(field: unknown, label: string): Result<number | null> {
if (field === undefined || field === null) return ok(null);
if (typeof field !== "string") {
return err(
new Error(`${label}: invalid duration "${field}" (expected e.g. "5s", "10m", "1h")`),
);
}
const msResult = parseDurationStringToMs(field);
if (!msResult.ok) {
return err(new Error(`${label}: ${msResult.error.message}`));
}
return ok(msResult.value);
}
function validateSenseConfig(name: string, raw: unknown): Result<SenseConfig> {
if (!isPlainRecord(raw)) {
return err(new Error(`senses.${name}: must be an object`));
}
const obj = raw;
if (typeof obj.group !== "string" || obj.group.trim() === "") {
return err(new Error(`senses.${name}.group: required string`));
}
if (!isValidGroupName(obj.group)) {
return err(
new Error(
`senses.${name}.group: invalid name "${obj.group}" (only alphanumeric, underscore, hyphen allowed)`,
),
);
}
const throttleResult = parseDurationField(obj.throttle, `senses.${name}.throttle`);
if (!throttleResult.ok) return throttleResult;
const timeoutResult = parseDurationField(obj.timeout, `senses.${name}.timeout`);
if (!timeoutResult.ok) return timeoutResult;
const graceResult = parseDurationField(obj.grace_period, `senses.${name}.grace_period`);
if (!graceResult.ok) return graceResult;
const retentionResult = parseRetentionField(name, obj.retention);
if (!retentionResult.ok) return retentionResult;
const intervalResult = parseDurationField(obj.interval, `senses.${name}.interval`);
if (!intervalResult.ok) return intervalResult;
let on: string[] = [];
if (obj.on !== undefined && obj.on !== null) {
if (
!Array.isArray(obj.on) ||
!obj.on.every((item: unknown): item is string => typeof item === "string")
) {
return err(new Error(`senses.${name}.on: must be an array of strings`));
}
on = obj.on;
}
return ok({
group: obj.group,
throttle: throttleResult.value,
timeout: timeoutResult.value,
gracePeriod: graceResult.value,
retention: retentionResult.value,
interval: intervalResult.value,
on,
});
}
function parseEngineMaxRounds(obj: Record<string, unknown>): Result<number> {
if (obj.max_rounds === undefined || obj.max_rounds === null) {
return ok(DEFAULT_ENGINE_MAX_ROUNDS);
}
if (
typeof obj.max_rounds !== "number" ||
!Number.isInteger(obj.max_rounds) ||
obj.max_rounds < 1
) {
return err(new Error("max_rounds: must be a positive integer"));
}
return ok(obj.max_rounds);
}
function validateWorkflowConfig(name: string, raw: unknown): Result<WorkflowConfig> {
if (!isPlainRecord(raw)) {
return err(new Error(`workflows.${name}: must be an object`));
}
const obj = raw;
if (
typeof obj.concurrency !== "number" ||
!Number.isInteger(obj.concurrency) ||
obj.concurrency < 1
) {
return err(new Error(`workflows.${name}.concurrency: must be a positive integer`));
}
if (obj.overflow !== "drop" && obj.overflow !== "queue") {
return err(new Error(`workflows.${name}.overflow: must be "drop" or "queue"`));
}
if (obj.overflow === "drop") {
if (obj.max_queue !== undefined && obj.max_queue !== null) {
return err(new Error(`workflows.${name}: max_queue is not allowed with overflow "drop"`));
}
return ok({
concurrency: obj.concurrency,
overflow: "drop" as const,
});
}
// overflow: "queue"
let maxQueue = 100; // default
if (obj.max_queue !== undefined && obj.max_queue !== null) {
if (
typeof obj.max_queue !== "number" ||
!Number.isInteger(obj.max_queue) ||
obj.max_queue < 1
) {
return err(new Error(`workflows.${name}.max_queue: must be a positive integer`));
}
maxQueue = obj.max_queue;
}
return ok({
concurrency: obj.concurrency,
overflow: "queue" as const,
maxQueue,
});
}
function parseSenses(
obj: Record<string, unknown>,
): Result<{ senses: Record<string, SenseConfig> }> {
if (!isPlainRecord(obj.senses)) {
return err(new Error("senses: required object"));
}
const sensesRaw = obj.senses;
const senses: Record<string, SenseConfig> = {};
for (const [name, senseRaw] of Object.entries(sensesRaw)) {
const result = validateSenseConfig(name, senseRaw);
if (!result.ok) return result;
senses[name] = result.value;
}
return ok({ senses });
}
const DEFAULT_API_BIND_HOST = "127.0.0.1";
/** Hosts that may bind the HTTP API without `api.token` (loopback-only). */
function isLoopbackOnlyApiHost(host: string): boolean {
const h = host.trim();
return h === "127.0.0.1" || h.toLowerCase() === "localhost";
}
function parseApiTokenField(api: Record<string, unknown>): Result<string | null> {
if (api.token === undefined || api.token === null) {
return ok(null);
}
if (typeof api.token !== "string") {
return err(new Error("api.token: must be a string when provided"));
}
if (api.token.length === 0) {
return err(new Error("api.token: must not be empty when provided"));
}
return ok(api.token);
}
function parseApiHostField(api: Record<string, unknown>): Result<string> {
if (api.host === undefined || api.host === null) {
return ok(DEFAULT_API_BIND_HOST);
}
if (typeof api.host !== "string") {
return err(new Error("api.host: must be a string when provided"));
}
if (api.host.length === 0) {
return err(new Error("api.host: must not be empty when provided"));
}
return ok(api.host);
}
function parseApiConfig(obj: Record<string, unknown>): Result<NerveApiConfig> {
if (obj.api === undefined || obj.api === null) {
return ok({ port: null, token: null, host: DEFAULT_API_BIND_HOST });
}
if (!isPlainRecord(obj.api)) {
return err(new Error("api: must be an object if provided"));
}
const api = obj.api;
if (api.port === undefined || api.port === null) {
return ok({ port: null, token: null, host: DEFAULT_API_BIND_HOST });
}
if (
typeof api.port !== "number" ||
!Number.isInteger(api.port) ||
api.port < 1 ||
api.port > 65_535
) {
return err(new Error("api.port: must be an integer between 1 and 65535 if provided"));
}
const tokenResult = parseApiTokenField(api);
if (!tokenResult.ok) return tokenResult;
const hostResult = parseApiHostField(api);
if (!hostResult.ok) return hostResult;
if (!isLoopbackOnlyApiHost(hostResult.value) && tokenResult.value === null) {
return err(
new Error("api.host binds to non-loopback address, api.token is required for security"),
);
}
return ok({ port: api.port, token: tokenResult.value, host: hostResult.value });
}
function parseWorkflows(obj: Record<string, unknown>): Result<Record<string, WorkflowConfig>> {
if (obj.workflows === undefined || obj.workflows === null) return ok({});
if (!isPlainRecord(obj.workflows)) {
return err(new Error("workflows: must be an object if provided"));
}
const workflowsRaw = obj.workflows;
const workflows: Record<string, WorkflowConfig> = {};
for (const [name, wfRaw] of Object.entries(workflowsRaw)) {
const result = validateWorkflowConfig(name, wfRaw);
if (!result.ok) return result;
workflows[name] = result.value;
}
return ok(workflows);
}
function parseExtract(obj: Record<string, unknown>): Result<ExtractConfig | null> {
if (obj.extract === undefined || obj.extract === null) {
return ok(null);
}
if (!isPlainRecord(obj.extract)) {
return err(new Error("extract: must be an object if provided"));
}
const ext = obj.extract;
if (typeof ext.provider !== "string" || ext.provider.trim() === "") {
return err(new Error("extract.provider: required non-empty string"));
}
if (typeof ext.model !== "string" || ext.model.trim() === "") {
return err(new Error("extract.model: required non-empty string"));
}
return ok({ provider: ext.provider, model: ext.model });
}
export function parseNerveConfig(raw: string): Result<NerveConfig> {
let parsed: unknown;
try {
parsed = parse(raw);
} catch (e) {
const message = e instanceof Error ? e.message : String(e);
return err(new Error(`YAML parse error: ${message}`));
}
if (!isPlainRecord(parsed)) {
return err(new Error("Config must be a YAML object"));
}
const obj = parsed;
const sensesResult = parseSenses(obj);
if (!sensesResult.ok) return sensesResult;
const { senses } = sensesResult.value;
// Legacy top-level `reflexes` is rejected; each sense carries `interval` / `on` for the sense scheduler.
if (Object.hasOwn(obj, "reflexes")) {
return err(
new Error(
"reflexes: top-level key is no longer supported; set `interval` and `on` on each sense under `senses.<name>`",
),
);
}
const workflowsResult = parseWorkflows(obj);
if (!workflowsResult.ok) return workflowsResult;
const maxRoundsResult = parseEngineMaxRounds(obj);
if (!maxRoundsResult.ok) return maxRoundsResult;
const apiResult = parseApiConfig(obj);
if (!apiResult.ok) return apiResult;
if (Object.hasOwn(obj, "agents")) {
return err(
new Error(
"agents: key is no longer supported — declare adapters on workflow roles (RFC-003)",
),
);
}
const extractResult = parseExtract(obj);
if (!extractResult.ok) return extractResult;
return ok({
maxRounds: maxRoundsResult.value,
senses,
workflows: workflowsResult.value,
api: apiResult.value,
extract: extractResult.value,
});
}
function parseStringList(field: unknown, label: string): Result<ReadonlyArray<string>> {
if (field === undefined || field === null) {
return ok([]);
}
if (!Array.isArray(field)) {
return err(new Error(`${label}: must be an array of strings`));
}
const out: string[] = [];
for (let i = 0; i < field.length; i++) {
const item = field[i];
if (typeof item !== "string" || item.length === 0) {
return err(new Error(`${label}[${String(i)}]: must be a non-empty string`));
}
out.push(item);
}
return ok(out);
}
/**
* Parse `knowledge.yaml` at the repo root (RFC-003 Knowledge Layer).
* `include` / `exclude` entries are glob patterns resolved against the repo root.
*/
export function parseKnowledgeYaml(raw: string): Result<KnowledgeConfig> {
let parsed: unknown;
try {
parsed = parse(raw);
} catch (e) {
const message = e instanceof Error ? e.message : String(e);
return err(new Error(`YAML parse error: ${message}`));
}
if (parsed === undefined || parsed === null) {
return ok({ include: [], exclude: [] });
}
if (!isPlainRecord(parsed)) {
return err(new Error("knowledge.yaml: root must be a mapping"));
}
const includeResult = parseStringList(parsed.include, "include");
if (!includeResult.ok) {
return includeResult;
}
const excludeResult = parseStringList(parsed.exclude, "exclude");
if (!excludeResult.ok) {
return excludeResult;
}
return ok({
include: includeResult.value,
exclude: excludeResult.value,
});
}
@@ -1,33 +0,0 @@
import type { WorkflowStatus } from "./daemon-ipc-protocol.js";
import { isPlainRecord } from "./is-plain-record.js";
import type { SenseInfo } from "./sense.js";
/** Type guard for JSON {@link SenseInfo} payloads from daemon HTTP/IPC. */
export function isSenseInfo(value: unknown): value is SenseInfo {
if (!isPlainRecord(value)) return false;
return (
typeof value.name === "string" &&
typeof value.group === "string" &&
(value.throttle === null || typeof value.throttle === "number") &&
(value.timeout === null || typeof value.timeout === "number") &&
Array.isArray(value.triggers) &&
value.triggers.every((t: unknown) => typeof t === "string") &&
(value.lastSignalTimestamp === null || typeof value.lastSignalTimestamp === "number")
);
}
/** Type guard for JSON {@link WorkflowStatus} payloads from daemon HTTP/IPC. */
export function isWorkflowStatus(value: unknown): value is WorkflowStatus {
if (!isPlainRecord(value)) return false;
const cfg = value.config;
if (!isPlainRecord(cfg)) return false;
return (
typeof value.name === "string" &&
typeof value.activeThreads === "number" &&
Array.isArray(value.activeRunIds) &&
value.activeRunIds.every((id: unknown) => typeof id === "string") &&
typeof value.queuedThreads === "number" &&
typeof cfg.concurrency === "number" &&
typeof cfg.overflow === "string"
);
}
-28
View File
@@ -1,28 +0,0 @@
import type { HealthInfo, WorkflowStatus } from "./daemon-ipc-protocol.js";
import type { SenseInfo } from "./sense.js";
export type DaemonTransportTriggerResult = { ok: true } | { ok: false; error: string };
export type DaemonTransportWorkflowLaunch = {
prompt: string;
maxRounds: number;
dryRun: boolean;
};
/**
* Abstraction over daemon control plane (Unix socket IPC today, HTTP in Phase 2).
* Implementations live in CLI / tools; the daemon kernel uses shared handler logic.
*/
export type DaemonTransport = {
health(): Promise<HealthInfo>;
listSenses(): Promise<SenseInfo[]>;
listWorkflows(): Promise<WorkflowStatus[]>;
triggerSense(name: string): Promise<DaemonTransportTriggerResult>;
/** When `launch` is null, implementations use engine defaults (empty prompt, default max rounds, dryRun false). */
triggerWorkflow(
name: string,
launch: DaemonTransportWorkflowLaunch | null,
): Promise<DaemonTransportTriggerResult>;
/** Kill a running or queued workflow thread by `runId` (same field as IPC `kill-workflow`). */
killWorkflow(runId: string): Promise<DaemonTransportTriggerResult>;
};
@@ -4,8 +4,8 @@
* one response object per line from the daemon.
*/
import { isPlainRecord } from "./is-plain-record.js";
import type { SenseInfo } from "./sense.js";
import { isPlainRecord } from "./util.js";
/** Runtime status of a registered workflow (for listing / observability). */
export type WorkflowStatus = {
@@ -100,6 +100,32 @@ export type DaemonIpcResponse =
| DaemonIpcListWorkflowsResponse
| DaemonIpcHealthResponse;
export type DaemonTransportTriggerResult = { ok: true } | { ok: false; error: string };
export type DaemonTransportWorkflowLaunch = {
prompt: string;
maxRounds: number;
dryRun: boolean;
};
/**
* Abstraction over daemon control plane (Unix socket IPC today, HTTP in Phase 2).
* Implementations live in CLI / tools; the daemon kernel uses shared handler logic.
*/
export type DaemonTransport = {
health(): Promise<HealthInfo>;
listSenses(): Promise<SenseInfo[]>;
listWorkflows(): Promise<WorkflowStatus[]>;
triggerSense(name: string): Promise<DaemonTransportTriggerResult>;
/** When `launch` is null, implementations use engine defaults (empty prompt, default max rounds, dryRun false). */
triggerWorkflow(
name: string,
launch: DaemonTransportWorkflowLaunch | null,
): Promise<DaemonTransportTriggerResult>;
/** Kill a running or queued workflow thread by `runId` (same field as IPC `kill-workflow`). */
killWorkflow(runId: string): Promise<DaemonTransportTriggerResult>;
};
function parseTriggerWorkflowFields(
req: Record<string, unknown>,
): DaemonIpcTriggerWorkflowRequest | null {
@@ -150,3 +176,33 @@ export function parseDaemonIpcRequest(line: string): DaemonIpcRequest | null {
return null;
}
}
/** Type guard for JSON {@link SenseInfo} payloads from daemon HTTP/IPC. */
export function isSenseInfo(value: unknown): value is SenseInfo {
if (!isPlainRecord(value)) return false;
return (
typeof value.name === "string" &&
typeof value.group === "string" &&
(value.throttle === null || typeof value.throttle === "number") &&
(value.timeout === null || typeof value.timeout === "number") &&
Array.isArray(value.triggers) &&
value.triggers.every((t: unknown) => typeof t === "string") &&
(value.lastSignalTimestamp === null || typeof value.lastSignalTimestamp === "number")
);
}
/** Type guard for JSON {@link WorkflowStatus} payloads from daemon HTTP/IPC. */
export function isWorkflowStatus(value: unknown): value is WorkflowStatus {
if (!isPlainRecord(value)) return false;
const cfg = value.config;
if (!isPlainRecord(cfg)) return false;
return (
typeof value.name === "string" &&
typeof value.activeThreads === "number" &&
Array.isArray(value.activeRunIds) &&
value.activeRunIds.every((id: unknown) => typeof id === "string") &&
typeof value.queuedThreads === "number" &&
typeof cfg.concurrency === "number" &&
typeof cfg.overflow === "string"
);
}
-22
View File
@@ -1,22 +0,0 @@
import type { Result } from "./result.js";
import { err, ok } from "./result.js";
const DURATION_RE = /^(\d+)([smh])$/;
const DURATION_MULTIPLIERS: Record<string, number> = {
s: 1_000,
m: 60_000,
h: 3_600_000,
};
/**
* Parse a duration string such as `5s`, `10m`, `1h` to milliseconds.
* Used by `parseNerveConfig` sense/workflow duration fields.
*/
export function parseDurationStringToMs(value: string): Result<number> {
const match = DURATION_RE.exec(value);
if (!match) {
return err(new Error(`invalid duration "${value}" (expected e.g. "5s", "10m", "1h")`));
}
return ok(Number(match[1]) * DURATION_MULTIPLIERS[match[2]]);
}
+19 -19
View File
@@ -12,8 +12,8 @@ export type {
ComputeResult,
} from "./config.js";
export type { Signal, SenseInfo } from "./sense.js";
export type { SenseComputeFn, SenseModule } from "./sense-contract.js";
export { labelSenseTrigger, senseTriggerLabels } from "./sense-trigger-labels.js";
export type { SenseComputeFn, SenseModule } from "./sense.js";
export { labelSenseTrigger, senseTriggerLabels } from "./sense.js";
export type {
WorkflowMessage,
RoleResult,
@@ -29,11 +29,11 @@ export type {
WorkflowDefinition,
} from "./workflow.js";
export { START, END, DEFAULT_ENGINE_MAX_ROUNDS } from "./workflow.js";
export { parseDurationStringToMs } from "./duration.js";
export type { Schema, ExtractFn } from "./extract-layer.js";
export { ExtractError } from "./extract-layer.js";
export type { Result } from "./result.js";
export { ok, err } from "./result.js";
export { parseDurationStringToMs } from "./util.js";
export type { Schema, ExtractFn } from "./agent.js";
export { ExtractError } from "./agent.js";
export type { Result } from "./util.js";
export { ok, err } from "./util.js";
export {
nerveCommandEnv,
spawnSafe,
@@ -41,17 +41,17 @@ export {
type SpawnError,
type SpawnResult,
type SpawnSafeOptions,
} from "./spawn-safe.js";
export { parseNerveConfig } from "./parse-nerve-config.js";
export type { KnowledgeConfig } from "./knowledge-config.js";
export { parseKnowledgeYaml } from "./knowledge-config.js";
export { isPlainRecord } from "./is-plain-record.js";
export { KNOWN_AGENT_ADAPTER_IDS } from "./agent-adapter-ids.js";
} from "./util.js";
export { parseNerveConfig } from "./config.js";
export type { KnowledgeConfig } from "./config.js";
export { parseKnowledgeYaml } from "./config.js";
export { isPlainRecord } from "./util.js";
export { KNOWN_AGENT_ADAPTER_IDS } from "./agent.js";
export type { RoutedSenseOutput } from "./sense-workflow-directive.js";
export { parseWorkflowTrigger, routeSenseComputeOutput } from "./sense-workflow-directive.js";
export type { RoutedSenseOutput } from "./sense.js";
export { parseWorkflowTrigger, routeSenseComputeOutput } from "./sense.js";
export { isSenseInfo, isWorkflowStatus } from "./daemon-payload-guards.js";
export { isSenseInfo, isWorkflowStatus } from "./daemon.js";
export type {
WorkflowStatus,
HealthInfo,
@@ -69,10 +69,10 @@ export type {
DaemonIpcListWorkflowsResponse,
DaemonIpcHealthResponse,
DaemonIpcResponse,
} from "./daemon-ipc-protocol.js";
export { parseDaemonIpcRequest } from "./daemon-ipc-protocol.js";
} from "./daemon.js";
export { parseDaemonIpcRequest } from "./daemon.js";
export type {
DaemonTransport,
DaemonTransportTriggerResult,
DaemonTransportWorkflowLaunch,
} from "./daemon-transport.js";
} from "./daemon.js";
-7
View File
@@ -1,7 +0,0 @@
/**
* Narrows `unknown` to a plain JSON-style object (not null, not array).
* Use after `JSON.parse` / YAML / IPC when validating structure field-by-field.
*/
export function isPlainRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
-64
View File
@@ -1,64 +0,0 @@
import { parse } from "yaml";
import { isPlainRecord } from "./is-plain-record.js";
import type { Result } from "./result.js";
import { err, ok } from "./result.js";
export type KnowledgeConfig = {
include: ReadonlyArray<string>;
exclude: ReadonlyArray<string>;
};
function parseStringList(field: unknown, label: string): Result<ReadonlyArray<string>> {
if (field === undefined || field === null) {
return ok([]);
}
if (!Array.isArray(field)) {
return err(new Error(`${label}: must be an array of strings`));
}
const out: string[] = [];
for (let i = 0; i < field.length; i++) {
const item = field[i];
if (typeof item !== "string" || item.length === 0) {
return err(new Error(`${label}[${String(i)}]: must be a non-empty string`));
}
out.push(item);
}
return ok(out);
}
/**
* Parse `knowledge.yaml` at the repo root (RFC-003 Knowledge Layer).
* `include` / `exclude` entries are glob patterns resolved against the repo root.
*/
export function parseKnowledgeYaml(raw: string): Result<KnowledgeConfig> {
let parsed: unknown;
try {
parsed = parse(raw);
} catch (e) {
const message = e instanceof Error ? e.message : String(e);
return err(new Error(`YAML parse error: ${message}`));
}
if (parsed === undefined || parsed === null) {
return ok({ include: [], exclude: [] });
}
if (!isPlainRecord(parsed)) {
return err(new Error("knowledge.yaml: root must be a mapping"));
}
const includeResult = parseStringList(parsed.include, "include");
if (!includeResult.ok) {
return includeResult;
}
const excludeResult = parseStringList(parsed.exclude, "exclude");
if (!excludeResult.ok) {
return excludeResult;
}
return ok({
include: includeResult.value,
exclude: excludeResult.value,
});
}
-348
View File
@@ -1,348 +0,0 @@
import { parse } from "yaml";
import {
DEFAULT_SENSE_SIGNAL_RETENTION,
type ExtractConfig,
type NerveApiConfig,
type NerveConfig,
type SenseConfig,
type WorkflowConfig,
} from "./config.js";
import { parseDurationStringToMs } from "./duration.js";
import { isPlainRecord } from "./is-plain-record.js";
import type { Result } from "./result.js";
import { err, ok } from "./result.js";
import { DEFAULT_ENGINE_MAX_ROUNDS } from "./workflow.js";
function isValidGroupName(value: string): boolean {
return /^[a-zA-Z0-9_-]+$/.test(value);
}
function parseRetentionField(name: string, field: unknown): Result<number> {
if (field === undefined || field === null) {
return ok(DEFAULT_SENSE_SIGNAL_RETENTION);
}
if (typeof field !== "number" || !Number.isInteger(field) || field < 1) {
return err(new Error(`senses.${name}.retention: must be a positive integer`));
}
return ok(field);
}
function parseDurationField(field: unknown, label: string): Result<number | null> {
if (field === undefined || field === null) return ok(null);
if (typeof field !== "string") {
return err(
new Error(`${label}: invalid duration "${field}" (expected e.g. "5s", "10m", "1h")`),
);
}
const msResult = parseDurationStringToMs(field);
if (!msResult.ok) {
return err(new Error(`${label}: ${msResult.error.message}`));
}
return ok(msResult.value);
}
function validateSenseConfig(name: string, raw: unknown): Result<SenseConfig> {
if (!isPlainRecord(raw)) {
return err(new Error(`senses.${name}: must be an object`));
}
const obj = raw;
if (typeof obj.group !== "string" || obj.group.trim() === "") {
return err(new Error(`senses.${name}.group: required string`));
}
if (!isValidGroupName(obj.group)) {
return err(
new Error(
`senses.${name}.group: invalid name "${obj.group}" (only alphanumeric, underscore, hyphen allowed)`,
),
);
}
const throttleResult = parseDurationField(obj.throttle, `senses.${name}.throttle`);
if (!throttleResult.ok) return throttleResult;
const timeoutResult = parseDurationField(obj.timeout, `senses.${name}.timeout`);
if (!timeoutResult.ok) return timeoutResult;
const graceResult = parseDurationField(obj.grace_period, `senses.${name}.grace_period`);
if (!graceResult.ok) return graceResult;
const retentionResult = parseRetentionField(name, obj.retention);
if (!retentionResult.ok) return retentionResult;
const intervalResult = parseDurationField(obj.interval, `senses.${name}.interval`);
if (!intervalResult.ok) return intervalResult;
let on: string[] = [];
if (obj.on !== undefined && obj.on !== null) {
if (
!Array.isArray(obj.on) ||
!obj.on.every((item: unknown): item is string => typeof item === "string")
) {
return err(new Error(`senses.${name}.on: must be an array of strings`));
}
on = obj.on;
}
return ok({
group: obj.group,
throttle: throttleResult.value,
timeout: timeoutResult.value,
gracePeriod: graceResult.value,
retention: retentionResult.value,
interval: intervalResult.value,
on,
});
}
function parseEngineMaxRounds(obj: Record<string, unknown>): Result<number> {
if (obj.max_rounds === undefined || obj.max_rounds === null) {
return ok(DEFAULT_ENGINE_MAX_ROUNDS);
}
if (
typeof obj.max_rounds !== "number" ||
!Number.isInteger(obj.max_rounds) ||
obj.max_rounds < 1
) {
return err(new Error("max_rounds: must be a positive integer"));
}
return ok(obj.max_rounds);
}
function validateWorkflowConfig(name: string, raw: unknown): Result<WorkflowConfig> {
if (!isPlainRecord(raw)) {
return err(new Error(`workflows.${name}: must be an object`));
}
const obj = raw;
if (
typeof obj.concurrency !== "number" ||
!Number.isInteger(obj.concurrency) ||
obj.concurrency < 1
) {
return err(new Error(`workflows.${name}.concurrency: must be a positive integer`));
}
if (obj.overflow !== "drop" && obj.overflow !== "queue") {
return err(new Error(`workflows.${name}.overflow: must be "drop" or "queue"`));
}
if (obj.overflow === "drop") {
if (obj.max_queue !== undefined && obj.max_queue !== null) {
return err(new Error(`workflows.${name}: max_queue is not allowed with overflow "drop"`));
}
return ok({
concurrency: obj.concurrency,
overflow: "drop" as const,
});
}
// overflow: "queue"
let maxQueue = 100; // default
if (obj.max_queue !== undefined && obj.max_queue !== null) {
if (
typeof obj.max_queue !== "number" ||
!Number.isInteger(obj.max_queue) ||
obj.max_queue < 1
) {
return err(new Error(`workflows.${name}.max_queue: must be a positive integer`));
}
maxQueue = obj.max_queue;
}
return ok({
concurrency: obj.concurrency,
overflow: "queue" as const,
maxQueue,
});
}
function parseSenses(
obj: Record<string, unknown>,
): Result<{ senses: Record<string, SenseConfig> }> {
if (!isPlainRecord(obj.senses)) {
return err(new Error("senses: required object"));
}
const sensesRaw = obj.senses;
const senses: Record<string, SenseConfig> = {};
for (const [name, senseRaw] of Object.entries(sensesRaw)) {
const result = validateSenseConfig(name, senseRaw);
if (!result.ok) return result;
senses[name] = result.value;
}
return ok({ senses });
}
const DEFAULT_API_BIND_HOST = "127.0.0.1";
/** Hosts that may bind the HTTP API without `api.token` (loopback-only). */
function isLoopbackOnlyApiHost(host: string): boolean {
const h = host.trim();
return h === "127.0.0.1" || h.toLowerCase() === "localhost";
}
function parseApiTokenField(api: Record<string, unknown>): Result<string | null> {
if (api.token === undefined || api.token === null) {
return ok(null);
}
if (typeof api.token !== "string") {
return err(new Error("api.token: must be a string when provided"));
}
if (api.token.length === 0) {
return err(new Error("api.token: must not be empty when provided"));
}
return ok(api.token);
}
function parseApiHostField(api: Record<string, unknown>): Result<string> {
if (api.host === undefined || api.host === null) {
return ok(DEFAULT_API_BIND_HOST);
}
if (typeof api.host !== "string") {
return err(new Error("api.host: must be a string when provided"));
}
if (api.host.length === 0) {
return err(new Error("api.host: must not be empty when provided"));
}
return ok(api.host);
}
function parseApiConfig(obj: Record<string, unknown>): Result<NerveApiConfig> {
if (obj.api === undefined || obj.api === null) {
return ok({ port: null, token: null, host: DEFAULT_API_BIND_HOST });
}
if (!isPlainRecord(obj.api)) {
return err(new Error("api: must be an object if provided"));
}
const api = obj.api;
if (api.port === undefined || api.port === null) {
return ok({ port: null, token: null, host: DEFAULT_API_BIND_HOST });
}
if (
typeof api.port !== "number" ||
!Number.isInteger(api.port) ||
api.port < 1 ||
api.port > 65_535
) {
return err(new Error("api.port: must be an integer between 1 and 65535 if provided"));
}
const tokenResult = parseApiTokenField(api);
if (!tokenResult.ok) return tokenResult;
const hostResult = parseApiHostField(api);
if (!hostResult.ok) return hostResult;
if (!isLoopbackOnlyApiHost(hostResult.value) && tokenResult.value === null) {
return err(
new Error("api.host binds to non-loopback address, api.token is required for security"),
);
}
return ok({ port: api.port, token: tokenResult.value, host: hostResult.value });
}
function parseWorkflows(obj: Record<string, unknown>): Result<Record<string, WorkflowConfig>> {
if (obj.workflows === undefined || obj.workflows === null) return ok({});
if (!isPlainRecord(obj.workflows)) {
return err(new Error("workflows: must be an object if provided"));
}
const workflowsRaw = obj.workflows;
const workflows: Record<string, WorkflowConfig> = {};
for (const [name, wfRaw] of Object.entries(workflowsRaw)) {
const result = validateWorkflowConfig(name, wfRaw);
if (!result.ok) return result;
workflows[name] = result.value;
}
return ok(workflows);
}
function parseExtract(obj: Record<string, unknown>): Result<ExtractConfig | null> {
if (obj.extract === undefined || obj.extract === null) {
return ok(null);
}
if (!isPlainRecord(obj.extract)) {
return err(new Error("extract: must be an object if provided"));
}
const ext = obj.extract;
if (typeof ext.provider !== "string" || ext.provider.trim() === "") {
return err(new Error("extract.provider: required non-empty string"));
}
if (typeof ext.model !== "string" || ext.model.trim() === "") {
return err(new Error("extract.model: required non-empty string"));
}
return ok({ provider: ext.provider, model: ext.model });
}
export function parseNerveConfig(raw: string): Result<NerveConfig> {
let parsed: unknown;
try {
parsed = parse(raw);
} catch (e) {
const message = e instanceof Error ? e.message : String(e);
return err(new Error(`YAML parse error: ${message}`));
}
if (!isPlainRecord(parsed)) {
return err(new Error("Config must be a YAML object"));
}
const obj = parsed;
const sensesResult = parseSenses(obj);
if (!sensesResult.ok) return sensesResult;
const { senses } = sensesResult.value;
// Legacy top-level `reflexes` is rejected; each sense carries `interval` / `on` for the sense scheduler.
if (Object.hasOwn(obj, "reflexes")) {
return err(
new Error(
"reflexes: top-level key is no longer supported; set `interval` and `on` on each sense under `senses.<name>`",
),
);
}
const workflowsResult = parseWorkflows(obj);
if (!workflowsResult.ok) return workflowsResult;
const maxRoundsResult = parseEngineMaxRounds(obj);
if (!maxRoundsResult.ok) return maxRoundsResult;
const apiResult = parseApiConfig(obj);
if (!apiResult.ok) return apiResult;
if (Object.hasOwn(obj, "agents")) {
return err(
new Error(
"agents: key is no longer supported — declare adapters on workflow roles (RFC-003)",
),
);
}
const extractResult = parseExtract(obj);
if (!extractResult.ok) return extractResult;
return ok({
maxRounds: maxRoundsResult.value,
senses,
workflows: workflowsResult.value,
api: apiResult.value,
extract: extractResult.value,
});
}
-9
View File
@@ -1,9 +0,0 @@
export type Result<T, E = Error> = { ok: true; value: T } | { ok: false; error: E };
export function ok<T>(value: T): Result<T, never> {
return { ok: true, value };
}
export function err<E = Error>(error: E): Result<never, E> {
return { ok: false, error };
}
-22
View File
@@ -1,22 +0,0 @@
import type { SQLiteTable } from "drizzle-orm/sqlite-core";
import type { ComputeResult } from "./config.js";
/**
* The function signature every sense `src/index.ts` must export as a named
* `compute` export.
*
* Pure: no DB, no peers.
* Return `null` to stay silent, or `{ signal, workflow }` to emit a Signal
* (and optionally trigger a Workflow).
* The runtime handles persistence via `db.insert(table).values(result.signal)`.
*/
export type SenseComputeFn<T = unknown> = () => Promise<ComputeResult<T>>;
/**
* The full shape a sense module (`src/index.ts`) must export.
* `compute` provides the data; `table` tells the runtime where to persist it.
*/
export type SenseModule<T = unknown> = {
compute: SenseComputeFn<T>;
table: SQLiteTable;
};
-40
View File
@@ -1,40 +0,0 @@
import type { SenseConfig } from "./config.js";
function formatIntervalMs(ms: number): string {
const totalSeconds = Math.floor(ms / 1000);
if (totalSeconds < 60) return `${totalSeconds}s`;
const minutes = Math.floor(totalSeconds / 60);
if (minutes < 60) return `${minutes}m`;
const hours = Math.floor(minutes / 60);
const remainingMinutes = minutes % 60;
return `${hours}h ${remainingMinutes}m`;
}
/** Human-readable label for a sense schedule (`interval` and/or `on`). */
export function labelSenseTrigger(slice: Pick<SenseConfig, "interval" | "on">): string {
const parts: string[] = [];
if (slice.interval !== null) {
parts.push(`every ${formatIntervalMs(slice.interval)}`);
}
if (slice.on.length > 0) {
parts.push(`on: ${slice.on.join(", ")}`);
}
if (parts.length === 0) {
return "trigger (no interval or on)";
}
return parts.join(" · ");
}
/**
* Human-readable trigger labels for a sense from its `SenseConfig.interval` / `.on`.
* Returns an empty array when the sense is missing or has no schedule.
*/
export function senseTriggerLabels(
senseName: string,
senses: Record<string, SenseConfig>,
): string[] {
const sc = senses[senseName];
if (sc === undefined) return [];
if (sc.interval === null && sc.on.length === 0) return [];
return [labelSenseTrigger({ interval: sc.interval, on: sc.on })];
}
@@ -1,56 +0,0 @@
import type { WorkflowTrigger } from "./config.js";
import { isPlainRecord } from "./is-plain-record.js";
import type { Result } from "./result.js";
import { err, ok } from "./result.js";
/** Normalized non-null compute output for the kernel (unknown signal payload). */
export type RoutedSenseOutput = {
signal: unknown;
workflow: WorkflowTrigger | null;
};
/**
* Validates a structured workflow trigger object from Sense compute or IPC.
*/
export function parseWorkflowTrigger(value: unknown): Result<WorkflowTrigger> {
if (!isPlainRecord(value)) {
return err(new Error("workflow trigger must be a plain object"));
}
const nameRaw = value.name;
if (typeof nameRaw !== "string" || nameRaw.trim().length === 0) {
return err(new Error('workflow trigger: "name" must be a non-empty string'));
}
const maxRounds = value.maxRounds;
if (typeof maxRounds !== "number" || !Number.isInteger(maxRounds) || maxRounds < 1) {
return err(new Error('workflow trigger: "maxRounds" must be an integer >= 1'));
}
const prompt = value.prompt;
if (typeof prompt !== "string") {
return err(new Error('workflow trigger: "prompt" must be a string'));
}
const dryRun = value.dryRun;
if (typeof dryRun !== "boolean") {
return err(new Error('workflow trigger: "dryRun" must be a boolean'));
}
return ok({ name: nameRaw.trim(), maxRounds, prompt, dryRun });
}
/**
* Interprets a Sense compute non-null return value for the engine.
* - Explicit `{ signal, workflow }` (workflow may be null): validates `workflow` when non-null.
* - Any other value: treated as `{ signal: payload, workflow: null }` (shorthand).
*/
export function routeSenseComputeOutput(payload: unknown): Result<RoutedSenseOutput> {
if (isPlainRecord(payload) && Object.hasOwn(payload, "signal")) {
const wfRaw = Object.hasOwn(payload, "workflow") ? payload.workflow : null;
if (wfRaw === null) {
return ok({ signal: payload.signal, workflow: null });
}
const parsed = parseWorkflowTrigger(wfRaw);
if (!parsed.ok) {
return ok({ signal: payload.signal, workflow: null });
}
return ok({ signal: payload.signal, workflow: parsed.value });
}
return ok({ signal: payload, workflow: null });
}
+116
View File
@@ -1,3 +1,8 @@
import type { SQLiteTable } from "drizzle-orm/sqlite-core";
import type { ComputeResult, SenseConfig, WorkflowTrigger } from "./config.js";
import { type Result, err, isPlainRecord, ok } from "./util.js";
export type Signal = {
id: number;
senseId: string;
@@ -15,3 +20,114 @@ export type SenseInfo = {
triggers: string[];
lastSignalTimestamp: number | null;
};
/**
* The function signature every sense `src/index.ts` must export as a named
* `compute` export.
*
* Pure: no DB, no peers.
* Return `null` to stay silent, or `{ signal, workflow }` to emit a Signal
* (and optionally trigger a Workflow).
* The runtime handles persistence via `db.insert(table).values(result.signal)`.
*/
export type SenseComputeFn<T = unknown> = () => Promise<ComputeResult<T>>;
/**
* The full shape a sense module (`src/index.ts`) must export.
* `compute` provides the data; `table` tells the runtime where to persist it.
*/
export type SenseModule<T = unknown> = {
compute: SenseComputeFn<T>;
table: SQLiteTable;
};
/** Normalized non-null compute output for the kernel (unknown signal payload). */
export type RoutedSenseOutput = {
signal: unknown;
workflow: WorkflowTrigger | null;
};
function formatIntervalMs(ms: number): string {
const totalSeconds = Math.floor(ms / 1000);
if (totalSeconds < 60) return `${totalSeconds}s`;
const minutes = Math.floor(totalSeconds / 60);
if (minutes < 60) return `${minutes}m`;
const hours = Math.floor(minutes / 60);
const remainingMinutes = minutes % 60;
return `${hours}h ${remainingMinutes}m`;
}
/** Human-readable label for a sense schedule (`interval` and/or `on`). */
export function labelSenseTrigger(slice: Pick<SenseConfig, "interval" | "on">): string {
const parts: string[] = [];
if (slice.interval !== null) {
parts.push(`every ${formatIntervalMs(slice.interval)}`);
}
if (slice.on.length > 0) {
parts.push(`on: ${slice.on.join(", ")}`);
}
if (parts.length === 0) {
return "trigger (no interval or on)";
}
return parts.join(" · ");
}
/**
* Human-readable trigger labels for a sense from its `SenseConfig.interval` / `.on`.
* Returns an empty array when the sense is missing or has no schedule.
*/
export function senseTriggerLabels(
senseName: string,
senses: Record<string, SenseConfig>,
): string[] {
const sc = senses[senseName];
if (sc === undefined) return [];
if (sc.interval === null && sc.on.length === 0) return [];
return [labelSenseTrigger({ interval: sc.interval, on: sc.on })];
}
/**
* Validates a structured workflow trigger object from Sense compute or IPC.
*/
export function parseWorkflowTrigger(value: unknown): Result<WorkflowTrigger> {
if (!isPlainRecord(value)) {
return err(new Error("workflow trigger must be a plain object"));
}
const nameRaw = value.name;
if (typeof nameRaw !== "string" || nameRaw.trim().length === 0) {
return err(new Error('workflow trigger: "name" must be a non-empty string'));
}
const maxRounds = value.maxRounds;
if (typeof maxRounds !== "number" || !Number.isInteger(maxRounds) || maxRounds < 1) {
return err(new Error('workflow trigger: "maxRounds" must be an integer >= 1'));
}
const prompt = value.prompt;
if (typeof prompt !== "string") {
return err(new Error('workflow trigger: "prompt" must be a string'));
}
const dryRun = value.dryRun;
if (typeof dryRun !== "boolean") {
return err(new Error('workflow trigger: "dryRun" must be a boolean'));
}
return ok({ name: nameRaw.trim(), maxRounds, prompt, dryRun });
}
/**
* Interprets a Sense compute non-null return value for the engine.
* - Explicit `{ signal, workflow }` (workflow may be null): validates `workflow` when non-null.
* - Any other value: treated as `{ signal: payload, workflow: null }` (shorthand).
*/
export function routeSenseComputeOutput(payload: unknown): Result<RoutedSenseOutput> {
if (isPlainRecord(payload) && Object.hasOwn(payload, "signal")) {
const wfRaw = Object.hasOwn(payload, "workflow") ? payload.workflow : null;
if (wfRaw === null) {
return ok({ signal: payload.signal, workflow: null });
}
const parsed = parseWorkflowTrigger(wfRaw);
if (!parsed.ok) {
return ok({ signal: payload.signal, workflow: null });
}
return ok({ signal: payload.signal, workflow: parsed.value });
}
return ok({ signal: payload, workflow: null });
}
@@ -1,7 +1,8 @@
import { spawn } from "node:child_process";
import { homedir } from "node:os";
import { join } from "node:path";
import { type Result, err, ok } from "./result.js";
export type Result<T, E = Error> = { ok: true; value: T } | { ok: false; error: E };
/** Compatible with `process.env` for `child_process.spawn`. */
export type SpawnEnv = Record<string, string | undefined>;
@@ -40,6 +41,42 @@ type SpawnSafeOptionsInput = SpawnSafeOptions | Omit<SpawnSafeOptions, "dryRun">
const DEFAULT_TIMEOUT_MS = 300_000;
export function ok<T>(value: T): Result<T, never> {
return { ok: true, value };
}
export function err<E = Error>(error: E): Result<never, E> {
return { ok: false, error };
}
/**
* Narrows `unknown` to a plain JSON-style object (not null, not array).
* Use after `JSON.parse` / YAML / IPC when validating structure field-by-field.
*/
export function isPlainRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}
const DURATION_RE = /^(\d+)([smh])$/;
const DURATION_MULTIPLIERS: Record<string, number> = {
s: 1_000,
m: 60_000,
h: 3_600_000,
};
/**
* Parse a duration string such as `5s`, `10m`, `1h` to milliseconds.
* Used by `parseNerveConfig` sense/workflow duration fields.
*/
export function parseDurationStringToMs(value: string): Result<number> {
const match = DURATION_RE.exec(value);
if (!match) {
return err(new Error(`invalid duration "${value}" (expected e.g. "5s", "10m", "1h")`));
}
return ok(Number(match[1]) * DURATION_MULTIPLIERS[match[2]]);
}
/**
* PATH and PNPM_HOME for running `pnpm` and `nerve` from workflow roles.
* Uses the pnpm store home only (no npm user bin); binaries must resolve via PATH.
@@ -28,6 +28,11 @@ function makeMockChild(pid = 1): MockChild {
child.connected = true;
child.exitCode = null;
child.pid = pid;
setImmediate(() => {
if (child.connected) {
child.emit("message", { type: "ready" });
}
});
child.send = vi.fn((msg: unknown) => {
if (
msg !== null &&
@@ -132,6 +137,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
mgr.startWorkflow("my-wf", { prompt: "test 1", maxRounds: 10, dryRun: false });
mgr.startWorkflow("my-wf", { prompt: "test 2", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
expect(mgr.activeCount("my-wf")).toBe(2);
// Simulate unexpected exit (not shutdown)
@@ -159,6 +165,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
expect(mgr.activeCount("my-wf")).toBe(2);
const child = mockChildren[0];
@@ -183,6 +190,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
expect(mockChildren).toHaveLength(1);
const child = mockChildren[0];
@@ -216,6 +224,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
const firstChild = mockChildren[0];
firstChild.exitCode = 1;
firstChild.connected = false;
@@ -260,6 +269,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
// Start one thread to fill the concurrency slot (so queued run stays queued on respawn)
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
const firstChild = mockChildren[0];
firstChild.exitCode = 1;
firstChild.connected = false;
@@ -285,6 +295,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
const child = mockChildren[0];
const startCall = (child.send as ReturnType<typeof vi.fn>).mock.calls[0];
@@ -322,6 +333,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
const launch = { prompt: "build-docker for myrepo", maxRounds: 10, dryRun: false };
mgr.startWorkflow("my-wf", launch);
await vi.runAllTimersAsync();
const startedCall = logStore.upsertWorkflowRun.mock.calls.find(
(args: any[]) => (args[0] as { type: string }).type === "started",
@@ -357,6 +369,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
// Start one thread to fill the concurrency slot
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
const firstChild = mockChildren[0];
// Crash once → respawn → crash again → second respawn
@@ -398,6 +411,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
const firstChild = mockChildren[0];
firstChild.exitCode = 1;
firstChild.connected = false;
@@ -428,6 +442,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("crash-wf", { prompt: "test", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
// Crash the worker 6 times in rapid succession (within CRASH_WINDOW_MS = 60s)
for (let i = 0; i < 6; i++) {
@@ -0,0 +1,9 @@
// Ready then crashes on a timer; still echoes IPC so parent tests can send after respawn
process.on("message", (msg) => {
if (msg && msg.type === "shutdown") {
process.exit(0);
}
process.send({ type: "echo", payload: msg });
});
process.send({ type: "ready" });
setTimeout(() => process.exit(1), 50);
@@ -0,0 +1,9 @@
// Simple test worker: sends ready, echoes messages, handles shutdown
process.on("message", (msg) => {
if (msg && msg.type === "shutdown") {
process.exit(0);
}
// Echo back with 'echo' type
process.send({ type: "echo", payload: msg });
});
process.send({ type: "ready" });
@@ -0,0 +1,9 @@
// Like echo-worker but writes stderr for tail diagnostics
console.error("stderr-marker");
process.on("message", (msg) => {
if (msg && msg.type === "shutdown") {
process.exit(0);
}
process.send({ type: "echo", payload: msg });
});
process.send({ type: "ready" });
@@ -33,6 +33,11 @@ function makeMockChild(pid = 1): MockChild {
child.connected = true;
child.exitCode = null;
child.pid = pid;
setImmediate(() => {
if (child.connected) {
child.emit("message", { type: "ready" });
}
});
child.send = vi.fn((msg: unknown) => {
if (
msg !== null &&
@@ -114,6 +119,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => {
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
expect(mockChildren).toHaveLength(1);
// Remove workflow from config before drain completes
@@ -134,6 +140,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => {
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
expect(mgr.activeCount("my-wf")).toBe(2);
const drainPromise = mgr.drainAndRespawn("my-wf", 5000);
@@ -165,6 +172,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => {
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
expect(mockChildren).toHaveLength(1);
const drainPromise = mgr.drainAndRespawn("my-wf", 5000);
@@ -181,6 +189,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => {
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
expect(mockChildren).toHaveLength(1);
const drainPromise = mgr.drainAndRespawn("my-wf", 5000);
@@ -198,6 +207,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => {
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
const drainPromise = mgr.drainAndRespawn("my-wf", 5000);
await vi.runAllTimersAsync();
@@ -223,6 +233,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => {
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", { prompt: "first", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
const drainPromise = mgr.drainAndRespawn("my-wf", 5000);
await vi.runAllTimersAsync();
@@ -230,6 +241,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => {
// Start a new thread on the fresh worker
mgr.startWorkflow("my-wf", { prompt: "second", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
const newChild = mockChildren[1];
const startCalls = (newChild.send as ReturnType<typeof vi.fn>).mock.calls.filter(
@@ -257,12 +269,13 @@ describe("WorkflowManager — drainWhenIdle (hot reload without interrupting in-
vi.clearAllMocks();
});
it("does not send shutdown while a thread is still active", () => {
it("does not send shutdown while a thread is still active", async () => {
const logStore = makeLogStore();
const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } });
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
const child = mockChildren[0];
mgr.drainWhenIdle("my-wf");
@@ -282,6 +295,7 @@ describe("WorkflowManager — drainWhenIdle (hot reload without interrupting in-
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
const child = mockChildren[0];
const runId = (child.send as ReturnType<typeof vi.fn>).mock.calls[0][0] as { runId: string };
@@ -311,6 +325,7 @@ describe("WorkflowManager — drainWhenIdle (hot reload without interrupting in-
mgr.startWorkflow("my-wf", { prompt: "a", maxRounds: 10, dryRun: false });
mgr.startWorkflow("my-wf", { prompt: "b", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
const child = mockChildren[0];
const sendMock = child.send as ReturnType<typeof vi.fn>;
const runIdA = (sendMock.mock.calls[0][0] as { runId: string }).runId;
@@ -355,6 +370,7 @@ describe("WorkflowManager — drainWhenIdle (hot reload without interrupting in-
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
const child = mockChildren[0];
const runId = (child.send as ReturnType<typeof vi.fn>).mock.calls[0][0] as { runId: string };
@@ -388,6 +404,7 @@ describe("WorkflowManager — drainWhenIdle (hot reload without interrupting in-
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
const child = mockChildren[0];
const runId = (child.send as ReturnType<typeof vi.fn>).mock.calls[0][0] as { runId: string };
@@ -414,6 +431,7 @@ describe("WorkflowManager — drainWhenIdle (hot reload without interrupting in-
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", { prompt: "once", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
const firstChild = mockChildren[0];
const runId = (firstChild.send as ReturnType<typeof vi.fn>).mock.calls[0][0] as {
runId: string;
@@ -471,6 +489,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
// Trigger a workflow thread so a worker is spawned
kernel.workflowManager.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
// Manually call drainAndRespawn (simulating what kernel does on workflow file change)
const drainPromise = kernel.workflowManager.drainAndRespawn("my-wf", 1000);
@@ -511,6 +530,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
maxRounds: 10,
dryRun: false,
});
await vi.runAllTimersAsync();
expect(mockChildren).toHaveLength(1);
// Reload config without old-wf
@@ -551,6 +571,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
});
kernel.workflowManager.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
const workersBefore = mockChildren.length;
// Reload with updated concurrency — should NOT spawn a new workflow worker
@@ -573,6 +594,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
// Can now start up to 5 concurrent threads (previously only 1)
kernel.workflowManager.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
kernel.workflowManager.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
expect(kernel.workflowManager.activeCount("my-wf")).toBe(3);
const stopPromise = kernel.stop();
@@ -70,6 +70,14 @@ const { createKernel } = await import("../kernel.js");
// Helpers
// ---------------------------------------------------------------------------
/** Sense worker `fork` runs on the next microtask per scheduled `start`. */
async function flushSenseWorkerForkMicrotasks(kernel: { groups: Set<string> }): Promise<void> {
const n = kernel.groups.size;
for (let i = 0; i < n; i++) {
await Promise.resolve();
}
}
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
return {
senses: {
@@ -142,6 +150,8 @@ describe("kernel — getHealth", () => {
},
});
const kernel = createKernel(config, nerveRoot);
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
const health = kernel.getHealth();
expect(health.activeSenses).toBe(3);
@@ -171,6 +181,8 @@ describe("kernel — restartGroup", () => {
it("sends shutdown to old worker and spawns new one", async () => {
const config = makeConfig();
const kernel = createKernel(config, nerveRoot);
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
expect(mockChildren.length).toBe(1);
const oldChild = mockChildren[0];
@@ -178,6 +190,7 @@ describe("kernel — restartGroup", () => {
const restartPromise = kernel.restartGroup("system");
// The shutdown message triggers exit in the mock
await restartPromise;
await vi.runAllTimersAsync();
// A new child should have been spawned
expect(mockChildren.length).toBe(2);
@@ -191,6 +204,8 @@ describe("kernel — restartGroup", () => {
it("restartGroup on unknown group does nothing", async () => {
const config = makeConfig();
const kernel = createKernel(config, nerveRoot);
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
expect(mockChildren.length).toBe(1);
await kernel.restartGroup("nonexistent");
@@ -218,6 +233,8 @@ describe("kernel — reloadConfig", () => {
it("adds new group worker when new sense group appears", async () => {
const config = makeConfig();
const kernel = createKernel(config, nerveRoot);
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
expect(mockChildren.length).toBe(1); // only system group
expect(kernel.groups.has("network")).toBe(false);
@@ -249,6 +266,9 @@ describe("kernel — reloadConfig", () => {
api: { port: null, token: null, host: "127.0.0.1" },
});
await Promise.resolve();
await vi.runAllTimersAsync();
expect(kernel.groups.has("network")).toBe(true);
expect(mockChildren.length).toBe(2); // system + network
@@ -283,6 +303,8 @@ describe("kernel — reloadConfig", () => {
api: { port: null, token: null, host: "127.0.0.1" },
};
const kernel = createKernel(config, nerveRoot);
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
expect(mockChildren.length).toBe(2);
expect(kernel.groups.has("network")).toBe(true);
@@ -308,6 +330,7 @@ describe("kernel — reloadConfig", () => {
});
expect(kernel.groups.has("network")).toBe(false);
await vi.runAllTimersAsync();
// Network child should have received shutdown
expect(networkChild.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" }));
@@ -317,6 +340,8 @@ describe("kernel — reloadConfig", () => {
it("health reflects updated sense count after reloadConfig", async () => {
const config = makeConfig();
const kernel = createKernel(config, nerveRoot);
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
expect(kernel.getHealth().activeSenses).toBe(1);
@@ -29,6 +29,9 @@ type MockChild = EventEmitter & {
function makeMockChild(pid = 1): MockChild {
const child = new EventEmitter() as MockChild;
child.connected = true;
setImmediate(() => {
child.emit("message", { type: "ready" });
});
child.send = vi.fn((msg: unknown) => {
if (
msg !== null &&
@@ -136,6 +139,7 @@ describe("kernel.triggerSense()", () => {
logStore: makeMockLogStore() as never,
});
await vi.runAllTimersAsync();
expect(() => kernel.triggerSense("no-such-sense")).toThrow(/Unknown sense/);
await kernel.stop();
@@ -169,6 +173,7 @@ describe("kernel.triggerSense()", () => {
logStore: makeMockLogStore() as never,
});
await vi.runAllTimersAsync();
// Two groups → two workers
expect(mockChildren.length).toBe(2);
@@ -214,6 +219,7 @@ describe("kernel.triggerSense()", () => {
logStore: makeMockLogStore() as never,
});
await vi.runAllTimersAsync();
// Both senses share the "system" group → one worker only
expect(mockChildren.length).toBe(1);
const worker = mockChildren[0];
@@ -237,6 +243,7 @@ describe("kernel.triggerSense()", () => {
logStore: makeMockLogStore() as never,
});
await new Promise<void>((resolve) => setImmediate(resolve));
const worker = mockChildren[0];
worker.connected = false;
@@ -102,6 +102,13 @@ function makeLogStore() {
};
}
async function flushSenseWorkerForkMicrotasks(kernel: { groups: Set<string> }): Promise<void> {
const n = kernel.groups.size;
for (let i = 0; i < n; i++) {
await Promise.resolve();
}
}
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
return {
senses: {
@@ -164,6 +171,8 @@ describe("kernel + workflowManager integration", () => {
workerScript: "fake-worker.js",
logStore,
});
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
// Simulate a sense worker sending a signal with workflow launch payload
// The kernel's handleWorkerMessage processes "signal" type messages
@@ -185,6 +194,8 @@ describe("kernel + workflowManager integration", () => {
});
}
await vi.runAllTimersAsync();
// A workflow worker should be spawned and a start-thread message sent
const workflowWorker = mockChildren.find((c) =>
(c.send as ReturnType<typeof vi.fn>).mock.calls.some(
@@ -222,6 +233,8 @@ describe("kernel + workflowManager integration", () => {
workerScript: "fake-worker.js",
logStore,
});
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
// Simulate sense worker returning a signal plus workflow launch
const workerPool = mockChildren[0];
@@ -241,6 +254,8 @@ describe("kernel + workflowManager integration", () => {
});
}
await vi.runAllTimersAsync();
// Find the start-thread call and verify triggerPayload
const startThreadCall = mockChildren
.flatMap((c) => (c.send as ReturnType<typeof vi.fn>).mock.calls as [unknown][])
@@ -275,6 +290,8 @@ describe("kernel + workflowManager integration", () => {
workerScript: "fake-worker.js",
logStore,
});
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
const workerPool = mockChildren[0];
if (workerPool) {
@@ -293,6 +310,8 @@ describe("kernel + workflowManager integration", () => {
});
}
await vi.runAllTimersAsync();
const senseEntries = logStore.append.mock.calls
.map((c) => c[0] as { source: string; type: string; refId: string | null })
.filter((e) => e.source === "sense" && e.refId === "cpu-usage");
@@ -337,6 +356,8 @@ describe("kernel + workflowManager integration", () => {
workerScript: "fake-worker.js",
logStore,
});
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
// Emit a regular signal (shorthand payload) — should NOT trigger any workflow
const workerPool = mockChildren[0];
@@ -387,6 +408,8 @@ describe("kernel + workflowManager integration", () => {
workerScript: "fake-worker.js",
logStore,
});
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
// Simulate sense compute returning a signal plus workflow launch
const workerPool = mockChildren[0];
@@ -406,6 +429,8 @@ describe("kernel + workflowManager integration", () => {
});
}
await vi.runAllTimersAsync();
expect(logStore.upsertWorkflowRun).toHaveBeenCalledWith(
expect.objectContaining({ source: "workflow", type: "started" }),
expect.objectContaining({ workflow: "log-test-workflow", status: "started" }),
@@ -440,6 +465,8 @@ describe("kernel + workflowManager integration", () => {
workerScript: "fake-worker.js",
logStore,
});
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
// Reload with a workflow added
const newConfig: NerveConfig = {
@@ -479,6 +506,8 @@ describe("kernel + workflowManager integration", () => {
});
}
await vi.runAllTimersAsync();
const startThreadCall = mockChildren
.flatMap((c) => (c.send as ReturnType<typeof vi.fn>).mock.calls as [unknown][])
.find(
@@ -517,6 +546,8 @@ describe("kernel + workflowManager integration", () => {
workerScript: "fake-worker.js",
logStore,
});
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
// Reload with the workflow removed
const newConfig: NerveConfig = {
@@ -561,6 +592,8 @@ describe("kernel + workflowManager integration", () => {
});
}
await vi.runAllTimersAsync();
const startThreadCall = mockChildren
.flatMap((c) => (c.send as ReturnType<typeof vi.fn>).mock.calls as [unknown][])
.find(
@@ -600,6 +633,8 @@ describe("kernel + workflowManager integration", () => {
workerScript: "fake-worker.js",
logStore,
});
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
// Trigger a workflow via sense compute return value
const workerPool = mockChildren[0];
@@ -619,6 +654,8 @@ describe("kernel + workflowManager integration", () => {
});
}
await vi.runAllTimersAsync();
const stopPromise = kernel.stop();
await vi.runAllTimersAsync();
await expect(stopPromise).resolves.toBeUndefined();
@@ -664,6 +701,8 @@ describe("kernel + workflowManager integration", () => {
workerScript: "fake-worker.js",
logStore,
});
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
const health = kernel.getHealth();
expect(health).toHaveProperty("activeWorkflows");
+22 -2
View File
@@ -16,10 +16,12 @@ type MockChild = EventEmitter & {
send: ReturnType<typeof vi.fn>;
kill: ReturnType<typeof vi.fn>;
pid: number;
connected: boolean;
};
function makeMockChild(pid = 1): MockChild {
const child = new EventEmitter() as MockChild;
child.connected = true;
setImmediate(() => {
child.emit("message", { type: "ready" });
});
@@ -27,7 +29,10 @@ function makeMockChild(pid = 1): MockChild {
if (msg === null || typeof msg !== "object") return;
const m = msg as Record<string, unknown>;
if (m.type === "shutdown") {
setImmediate(() => child.emit("exit", 0, null));
setImmediate(() => {
child.connected = false;
child.emit("exit", 0, null);
});
return;
}
if (m.type === "compute" && typeof m.sense === "string") {
@@ -37,6 +42,7 @@ function makeMockChild(pid = 1): MockChild {
}
});
child.kill = vi.fn((_signal?: string) => {
child.connected = false;
child.emit("exit", null, _signal ?? "SIGKILL");
});
child.pid = pid;
@@ -59,6 +65,14 @@ const { createLogStore } = await import("@uncaged/nerve-store");
// Helpers
// ---------------------------------------------------------------------------
/** `WorkerRuntime.start` schedules `fork` on the next microtask — flush one tick per initial group. */
async function flushSenseWorkerForkMicrotasks(kernel: { groups: Set<string> }): Promise<void> {
const n = kernel.groups.size;
for (let i = 0; i < n; i++) {
await Promise.resolve();
}
}
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
return {
senses: {
@@ -173,6 +187,7 @@ describe("kernel — message routing", () => {
},
});
const kernel = createKernel(config, nerveRoot);
await flushSenseWorkerForkMicrotasks(kernel);
const child = mockChildren[0];
child.emit("message", { type: "error", sense: "cpu-usage", error: "compute failed" });
@@ -201,6 +216,7 @@ describe("kernel — message routing", () => {
},
});
const kernel = createKernel(config, nerveRoot);
await flushSenseWorkerForkMicrotasks(kernel);
const child = mockChildren[0];
const callsBefore = stderrSpy.mock.calls.length;
@@ -228,6 +244,7 @@ describe("kernel — message routing", () => {
},
});
const kernel = createKernel(config, nerveRoot);
await flushSenseWorkerForkMicrotasks(kernel);
const child = mockChildren[0];
expect(() => child.emit("message", { type: "unknown-type" })).not.toThrow();
@@ -290,6 +307,7 @@ describe("kernel — groupForSense mapping", () => {
api: { port: null, token: null, host: "127.0.0.1" },
};
const kernel = createKernel(config, nerveRoot);
await flushSenseWorkerForkMicrotasks(kernel);
// system and network = 2 unique groups
expect(mockChildren.length).toBe(2);
@@ -311,8 +329,10 @@ describe("kernel — groupForSense mapping", () => {
},
});
const kernel = createKernel(config, nerveRoot);
await flushSenseWorkerForkMicrotasks(kernel);
const child = mockChildren[0];
child.emit("message", { type: "ready" });
vi.advanceTimersByTime(500);
expect(child.send).toHaveBeenCalledWith(
@@ -50,6 +50,7 @@ async function startWorkerWithReady(
group: string,
): Promise<void> {
const pr = pool.startWorker(group);
await Promise.resolve();
const child = mockChildren[mockChildren.length - 1];
child.emit("message", { type: "ready" });
await pr;
@@ -137,6 +138,7 @@ describe("createSenseWorkerPool", () => {
expect(pool.activeGroupCount()).toBe(1);
pool.evictGroup("x");
expect(pool.hasWorkerForGroup("x")).toBe(false);
await Promise.resolve();
expect(mockChildren[0].send).toHaveBeenCalledWith(
expect.objectContaining({ type: "shutdown" }),
);
@@ -159,6 +161,7 @@ describe("createSenseWorkerPool", () => {
const p = pool.restartGroup("g");
expect(onBeforeGroupRestart).toHaveBeenCalledWith("g");
await Promise.resolve();
expect(mockChildren[0].send).toHaveBeenCalledWith(
expect.objectContaining({ type: "shutdown" }),
);
@@ -171,7 +174,7 @@ describe("createSenseWorkerPool", () => {
});
it("onWorkerCrashed runs and schedules respawn after non-zero exit", async () => {
vi.useFakeTimers({ shouldAdvanceTime: true });
vi.useFakeTimers();
const onWorkerCrashed = vi.fn();
const pool = createSenseWorkerPool({
nerveRoot: "/tmp/n",
@@ -0,0 +1,181 @@
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import { afterEach, describe, expect, it, vi } from "vitest";
import { createWorkerRuntime } from "../worker-runtime.js";
const fixturesDir = join(dirname(fileURLToPath(import.meta.url)), "fixtures");
const echoWorkerPath = join(fixturesDir, "echo-worker.js");
const crashWorkerPath = join(fixturesDir, "crash-worker.js");
const stderrWorkerPath = join(fixturesDir, "stderr-worker.js");
function baseConfig(script: string) {
return {
script,
argsForKey: () => [],
forwardStderr: true,
onMessage: vi.fn(),
onReady: vi.fn(),
onExit: vi.fn(),
onCrashLimitReached: null,
respawn: {
enabled: true,
maxCrashes: 6,
windowMs: 60_000,
delayMs: 80,
allowRespawn: null,
},
shutdownTimeoutMs: 5000,
};
}
describe("createWorkerRuntime", () => {
const runtimes: Array<{ shutdown: () => Promise<void> }> = [];
afterEach(async () => {
await Promise.all(runtimes.splice(0).map((r) => r.shutdown()));
});
function track<R extends { shutdown: () => Promise<void> }>(r: R): R {
runtimes.push(r);
return r;
}
it("start + send message + receive echo", async () => {
const incoming: unknown[] = [];
const rt = track(
createWorkerRuntime({
...baseConfig(echoWorkerPath),
onMessage: (_key, msg) => {
incoming.push(msg);
},
}),
);
await rt.start("a");
expect(rt.has("a")).toBe(true);
await rt.send("a", { type: "ping", n: 1 });
await vi.waitFor(() => {
expect(incoming.some((m) => isEchoOf(m, { type: "ping", n: 1 }))).toBe(true);
});
await rt.shutdown();
});
it("cold start on send (no explicit start)", async () => {
const incoming: unknown[] = [];
const rt = track(
createWorkerRuntime({
...baseConfig(echoWorkerPath),
onMessage: (_key, msg) => {
incoming.push(msg);
},
}),
);
expect(rt.has("x")).toBe(false);
await rt.send("x", { type: "hi" });
await vi.waitFor(() => {
expect(rt.has("x")).toBe(true);
expect(incoming.some((m) => isEchoOf(m, { type: "hi" }))).toBe(true);
});
await rt.shutdown();
});
it("evict stops worker; has() is false", async () => {
const rt = track(createWorkerRuntime(baseConfig(echoWorkerPath)));
await rt.start("k");
expect(rt.has("k")).toBe(true);
await rt.evict("k", null);
expect(rt.has("k")).toBe(false);
await rt.shutdown();
});
it("drain stops and respawns (new pid)", async () => {
const rt = track(createWorkerRuntime(baseConfig(echoWorkerPath)));
await rt.start("k");
const before = rt.pid("k");
expect(before).not.toBeNull();
await rt.drain("k", null);
const after = rt.pid("k");
expect(after).not.toBeNull();
expect(after).not.toBe(before);
await rt.shutdown();
});
it("crash triggers auto-respawn", async () => {
const incoming: unknown[] = [];
const onExit = vi.fn();
const rt = track(
createWorkerRuntime({
...baseConfig(crashWorkerPath),
onExit,
onMessage: (_key, msg) => {
incoming.push(msg);
},
}),
);
await rt.start("c");
await vi.waitFor(() => expect(onExit.mock.calls.length).toBeGreaterThanOrEqual(1), {
timeout: 3000,
});
await vi.waitFor(() => expect(rt.has("c")).toBe(true), { timeout: 3000 });
await rt.send("c", { type: "after-crash" });
await vi.waitFor(() => {
expect(incoming.some((m) => isEchoOf(m, { type: "after-crash" }))).toBe(true);
});
await rt.shutdown();
});
it("crash limit reached → no more automatic respawns", async () => {
const rt = track(
createWorkerRuntime({
...baseConfig(crashWorkerPath),
respawn: {
enabled: true,
maxCrashes: 2,
windowMs: 60_000,
delayMs: 50,
allowRespawn: null,
},
}),
);
await rt.start("z");
await vi.waitFor(() => expect(rt.has("z")).toBe(false), { timeout: 8000 });
await rt.shutdown();
});
it("shutdown stops all workers", async () => {
const rt = track(createWorkerRuntime(baseConfig(echoWorkerPath)));
await rt.start("a");
await rt.start("b");
expect(rt.keys().sort()).toEqual(["a", "b"].sort());
await rt.shutdown();
expect(rt.keys()).toEqual([]);
expect(rt.has("a")).toBe(false);
expect(rt.has("b")).toBe(false);
});
it("stderrTail captures stderr output", async () => {
const rt = track(createWorkerRuntime(baseConfig(stderrWorkerPath)));
await rt.start("s");
await vi.waitFor(() => {
expect(rt.stderrTail("s")).toContain("stderr-marker");
});
await rt.shutdown();
});
});
function isEchoOf(msg: unknown, payload: unknown): boolean {
return (
typeof msg === "object" &&
msg !== null &&
(msg as Record<string, unknown>).type === "echo" &&
JSON.stringify((msg as Record<string, unknown>).payload) === JSON.stringify(payload)
);
}
@@ -26,6 +26,11 @@ function makeMockChild(pid = 1): MockChild {
child.connected = true;
child.exitCode = null;
child.pid = pid;
setImmediate(() => {
if (child.connected) {
child.emit("message", { type: "ready" });
}
});
child.send = vi.fn((msg: unknown) => {
if (
msg !== null &&
@@ -110,7 +115,7 @@ describe("WorkflowManager", () => {
});
describe("startWorkflow under concurrency limit dispatches thread", () => {
it("forks a worker and sends start-thread when active < concurrency", () => {
it("forks a worker and sends start-thread when active < concurrency", async () => {
const logStore = makeLogStore();
const config = makeConfig({
"my-workflow": { concurrency: 2, overflow: "drop" },
@@ -118,6 +123,7 @@ describe("WorkflowManager", () => {
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-workflow", { prompt: "test", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
expect(mockChildren).toHaveLength(1);
expect(mockChildren[0].send).toHaveBeenCalledWith(
@@ -126,7 +132,7 @@ describe("WorkflowManager", () => {
expect(mgr.activeCount("my-workflow")).toBe(1);
});
it("reuses the same worker for a second thread under the limit", () => {
it("reuses the same worker for a second thread under the limit", async () => {
const logStore = makeLogStore();
const config = makeConfig({
"my-workflow": { concurrency: 3, overflow: "drop" },
@@ -135,6 +141,7 @@ describe("WorkflowManager", () => {
mgr.startWorkflow("my-workflow", { prompt: "test 1", maxRounds: 10, dryRun: false });
mgr.startWorkflow("my-workflow", { prompt: "test 2", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
// Only one forked child — worker is reused
expect(mockChildren).toHaveLength(1);
@@ -142,7 +149,7 @@ describe("WorkflowManager", () => {
expect(mgr.activeCount("my-workflow")).toBe(2);
});
it("logs a 'started' event for each dispatched thread", () => {
it("logs a 'started' event for each dispatched thread", async () => {
const logStore = makeLogStore();
const config = makeConfig({
"my-workflow": { concurrency: 2, overflow: "drop" },
@@ -150,6 +157,7 @@ describe("WorkflowManager", () => {
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-workflow", { prompt: "test", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
expect(logStore.upsertWorkflowRun).toHaveBeenCalledWith(
expect.objectContaining({ source: "workflow", type: "started" }),
@@ -159,7 +167,7 @@ describe("WorkflowManager", () => {
});
describe("startWorkflow at limit with drop overflow drops the request", () => {
it("does NOT send start-thread when at concurrency limit with overflow=drop", () => {
it("does NOT send start-thread when at concurrency limit with overflow=drop", async () => {
const logStore = makeLogStore();
const config = makeConfig({
"drop-wf": { concurrency: 1, overflow: "drop" },
@@ -169,6 +177,7 @@ describe("WorkflowManager", () => {
mgr.startWorkflow("drop-wf", { prompt: "first", maxRounds: 10, dryRun: false });
// now at limit — second call should be dropped
mgr.startWorkflow("drop-wf", { prompt: "second", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
expect(mgr.activeCount("drop-wf")).toBe(1);
expect(mgr.queueLength("drop-wf")).toBe(0);
@@ -254,7 +263,7 @@ describe("WorkflowManager", () => {
});
describe("completing a thread dequeues the next one", () => {
it("dispatches the next queued thread when the active thread sends completed", () => {
it("dispatches the next queued thread when the active thread sends completed", async () => {
const logStore = makeLogStore();
const config = makeConfig({
"queue-wf": { concurrency: 1, overflow: "queue", maxQueue: 5 },
@@ -263,6 +272,7 @@ describe("WorkflowManager", () => {
mgr.startWorkflow("queue-wf", { prompt: "first", maxRounds: 10, dryRun: false });
mgr.startWorkflow("queue-wf", { prompt: "second", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
expect(mgr.activeCount("queue-wf")).toBe(1);
expect(mgr.queueLength("queue-wf")).toBe(1);
@@ -289,7 +299,7 @@ describe("WorkflowManager", () => {
);
});
it("dispatches next queued thread when active thread sends failed", () => {
it("dispatches next queued thread when active thread sends failed", async () => {
const logStore = makeLogStore();
const config = makeConfig({
"queue-wf": { concurrency: 1, overflow: "queue", maxQueue: 5 },
@@ -298,6 +308,7 @@ describe("WorkflowManager", () => {
mgr.startWorkflow("queue-wf", { prompt: "first", maxRounds: 10, dryRun: false });
mgr.startWorkflow("queue-wf", { prompt: "second", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
const child = mockChildren[0];
const firstRunId = (child.send as ReturnType<typeof vi.fn>).mock.calls[0][0].runId as string;
@@ -325,6 +336,7 @@ describe("WorkflowManager", () => {
mgr.startWorkflow("wf-a", { prompt: "test", maxRounds: 10, dryRun: false });
mgr.startWorkflow("wf-b", { prompt: "test", maxRounds: 10, dryRun: false });
await vi.runAllTimersAsync();
// Two distinct workers should have been forked
expect(mockChildren).toHaveLength(2);
+6 -12
View File
@@ -7,7 +7,7 @@
* IPC event loop.
*
* Layout assumptions (nerve user config at `~/.uncaged-nerve/`):
* senses/<name>/index.js ← compiled compute
* dist/senses/<name>/index.js ← bundled compute (esbuild)
* senses/<name>/migrations/ ← SQL migration files
* data/senses/<name>.db ← SQLite data file
* nerve.yaml ← config
@@ -19,13 +19,13 @@ import { readFileSync } from "node:fs";
import { join, resolve } from "node:path";
import { parseNerveConfig } from "@uncaged/nerve-core";
import type { NerveConfig, WorkflowTrigger } from "@uncaged/nerve-core";
import type { NerveConfig } from "@uncaged/nerve-core";
import type { WorkerToParentMessage } from "./ipc.js";
import { parseParentMessage } from "./ipc.js";
import { executeCompute, loadSenseModule, openSenseDb } from "./sense-runtime.js";
import type { SenseRuntime } from "./sense-runtime.js";
import { ignoreSessionBroadcastSignals } from "./worker-fork-support.js";
import { ignoreSessionBroadcastSignals } from "./worker-signals.js";
// ---------------------------------------------------------------------------
// IPC helpers
@@ -49,10 +49,6 @@ function sendError(sense: string, error: string): void {
send({ type: "error", sense, error });
}
function sendWorkflowTrigger(sense: string, workflow: WorkflowTrigger): void {
send({ type: "sense-workflow-trigger", sense, workflow });
}
// ---------------------------------------------------------------------------
// Initialisation helpers
// ---------------------------------------------------------------------------
@@ -83,7 +79,7 @@ async function initSense(
): Promise<SenseRuntime> {
const dbPath = join(nerveRoot, "data", "senses", `${senseName}.db`);
const migrationsDir = join(nerveRoot, "senses", senseName, "migrations");
const senseIndexPath = resolve(join(nerveRoot, "senses", senseName, "index.js"));
const senseIndexPath = resolve(join(nerveRoot, "dist", "senses", senseName, "index.js"));
const dbResult = openSenseDb(dbPath, migrationsDir, retention);
if (!dbResult.ok) {
@@ -154,10 +150,8 @@ async function runCompute(
}
clearGracePeriodTimer(senseName);
if (result.value != null) {
sendSignal(senseName, result.value.signal);
if (result.value.workflow !== null) {
sendWorkflowTrigger(senseName, result.value.workflow);
}
// Single IPC message: kernel uses routeSenseComputeOutput(payload) for signal + optional workflow.
sendSignal(senseName, result.value);
}
} catch (e: unknown) {
const errMsg = e instanceof Error ? e.message : String(e);
@@ -1,45 +0,0 @@
import type { ChildProcess } from "node:child_process";
const STDERR_TAIL_MAX_CHARS = 16_384;
/**
* Forked workers inherit the parent's process group. In foreground `nerve dev`,
* terminal-driven SIGINT/SIGTERM is delivered to the whole group, so workers can exit
* on the default handler before the kernel sends `{ type: "shutdown" }` over IPC.
* Swallow these in worker processes so the parent coordinates shutdown (issue #55).
* Only call when `process.send` is defined (fork IPC); standalone `node …-worker.js` keeps default Ctrl+C behaviour.
*/
export function ignoreSessionBroadcastSignals(): void {
const swallow = (): void => {};
process.on("SIGINT", swallow);
process.on("SIGTERM", swallow);
}
export function teeCapturedStderr(child: ChildProcess, tail: { value: string }): void {
const stream = child.stderr;
if (stream === null || stream === undefined) return;
stream.setEncoding("utf8");
stream.on("data", (chunk: string | Buffer) => {
const text = typeof chunk === "string" ? chunk : chunk.toString("utf8");
process.stderr.write(text);
tail.value = (tail.value + text).slice(-STDERR_TAIL_MAX_CHARS);
});
}
export function formatChildExitSummary(code: number | null, signal: NodeJS.Signals | null): string {
const codeStr = code === null || code === undefined ? "null" : String(code);
if (signal) {
return `code=${codeStr} signal=${signal}`;
}
return `code=${codeStr}`;
}
export function formatCapturedStderrTail(tail: string, maxChars = 800): string {
const trimmed = tail.trim();
if (trimmed.length === 0) return "";
const normalized = trimmed.replace(/\r?\n/g, "\\n");
if (normalized.length <= maxChars) {
return ` worker_stderr=${normalized}`;
}
return ` worker_stderr=…${normalized.slice(-maxChars)}`;
}
+74 -119
View File
@@ -1,19 +1,16 @@
/**
* Sense worker pool — forked child processes per sense group (IPC lifecycle).
* Sense worker pool — thin wrapper around WorkerRuntime (RFC-006): one fork per sense group.
*/
import { fork } from "node:child_process";
import type { ChildProcess } from "node:child_process";
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import type { ComputeMessage, ShutdownMessage } from "./ipc.js";
import { parseWorkerMessage } from "./ipc.js";
import type { ComputeMessage } from "./ipc.js";
import {
createWorkerRuntime,
formatCapturedStderrTail,
formatChildExitSummary,
teeCapturedStderr,
} from "./worker-fork-support.js";
} from "./worker-runtime.js";
export function resolveWorkerScript(): string {
const __filename = fileURLToPath(import.meta.url);
@@ -21,17 +18,12 @@ export function resolveWorkerScript(): string {
return join(__dir, "sense-worker.js");
}
type WorkerEntry = {
group: string;
process: ChildProcess;
};
export type SenseWorkerPoolOptions = {
nerveRoot: string;
workerScript: string;
/** Invoked for every IPC message from a worker (including ready / signal / error). */
onWorkerMessage: (raw: unknown) => void;
/** Sense names in a group — used when clearing scheduler state on crash or restart. */
/** Sense names in a group — reserved for scheduler-aligned cleanup (kernel passes current config). */
sensesForGroup: (group: string) => string[];
/**
* Called when a worker exits with non-zero code before scheduling a respawn
@@ -58,144 +50,107 @@ export type SenseWorkerPool = {
activeGroupCount: () => number;
};
function spawnWorker(
nerveRoot: string,
group: string,
workerScript: string,
stderrTail: { value: string },
): ChildProcess {
const child = fork(workerScript, ["--group", group, "--root", nerveRoot], {
stdio: ["ignore", "inherit", "pipe", "ipc"],
});
teeCapturedStderr(child, stderrTail);
child.on("error", (err) => {
if ((err as NodeJS.ErrnoException).code !== "EPIPE") {
console.error("[worker] error:", err.message);
}
});
return child;
}
function sendComputeToProcess(worker: ChildProcess, senseName: string): void {
if (worker.connected === false) return;
const msg: ComputeMessage = { type: "compute", sense: senseName };
try {
worker.send(msg);
} catch {
// IPC channel closed between connected check and send
}
}
function sendShutdownToProcess(worker: ChildProcess): void {
if (worker.connected === false) return;
const msg: ShutdownMessage = { type: "shutdown" };
try {
worker.send(msg);
} catch {
// IPC channel closed between connected check and send
}
}
function waitForExit(child: ChildProcess, timeoutMs: number): Promise<void> {
return new Promise((resolve) => {
const timer = setTimeout(() => {
child.kill("SIGKILL");
resolve();
}, timeoutMs);
child.once("exit", () => {
clearTimeout(timer);
resolve();
});
});
}
/** Matches legacy pool: long crash window, 1s respawn delay, practical unlimited respawns. */
const SENSE_WORKER_RESPAWN = {
enabled: true,
maxCrashes: 100_000,
windowMs: 86_400_000,
delayMs: 1000,
} as const;
export function createSenseWorkerPool(options: SenseWorkerPoolOptions): SenseWorkerPool {
const workers = new Map<string, WorkerEntry>();
function startWorker(group: string): Promise<void> {
const stderrTail = { value: "" };
const child = spawnWorker(options.nerveRoot, group, options.workerScript, stderrTail);
let workerReadyResolve: (() => void) | undefined;
const workerReady = new Promise<void>((resolve) => {
workerReadyResolve = resolve;
});
child.on("message", (raw: unknown) => {
const result = parseWorkerMessage(raw);
if (result.ok && result.value.type === "ready") {
workerReadyResolve?.();
}
const runtime = createWorkerRuntime<string>({
script: options.workerScript,
argsForKey: (group) => ["--group", group, "--root", options.nerveRoot],
forwardStderr: true,
onMessage: (_key, raw) => {
options.onWorkerMessage(raw);
});
child.on("exit", (code, signal) => {
const summary = formatChildExitSummary(code, signal ?? null);
},
onReady: (_key, msg) => {
options.onWorkerMessage(msg);
},
onCrashLimitReached: null,
onExit: (group, code, signal) => {
const sig =
signal === null || signal === undefined || signal === ""
? null
: (signal as NodeJS.Signals);
const summary = formatChildExitSummary(code, sig);
process.stderr.write(
`[kernel] worker for group "${group}" exited (${summary})${formatCapturedStderrTail(stderrTail.value)}\n`,
`[kernel] worker for group "${group}" exited (${summary})${formatCapturedStderrTail(runtime.stderrTail(group))}\n`,
);
workerReadyResolve?.();
if (!options.isStopped() && code !== 0) {
process.stderr.write(`[kernel] respawning worker for group "${group}" in 1s\n`);
options.onWorkerCrashed(group);
setTimeout(() => {
if (!options.isStopped()) {
startWorker(group);
}
}, 1000);
}
});
},
respawn: {
...SENSE_WORKER_RESPAWN,
allowRespawn: (_group) => !options.isStopped(),
},
shutdownTimeoutMs: 5000,
});
workers.set(group, { group, process: child });
return workerReady;
/** Groups we have ever started — mirrors legacy Map presence for `restartGroup` no-op when unknown. */
const trackedGroups = new Set<string>();
/** Marks groups mid-evict so `hasWorkerForGroup` drops immediately (legacy synchronous eviction). */
const evicting = new Set<string>();
async function startWorker(group: string): Promise<void> {
trackedGroups.add(group);
await runtime.start(group);
}
async function restartGroup(group: string): Promise<void> {
const entry = workers.get(group);
if (entry === undefined) return;
options.onBeforeGroupRestart(group);
sendShutdownToProcess(entry.process);
await waitForExit(entry.process, 5000);
if (!options.isStopped()) {
await startWorker(group);
if (!trackedGroups.has(group)) {
return;
}
options.onBeforeGroupRestart(group);
await runtime.drain(group, null);
}
function evictGroup(group: string): void {
const entry = workers.get(group);
if (entry === undefined) return;
sendShutdownToProcess(entry.process);
workers.delete(group);
trackedGroups.delete(group);
evicting.add(group);
void runtime.evict(group, null).finally(() => {
evicting.delete(group);
});
}
async function shutdownAll(): Promise<void> {
const exitPromises: Promise<void>[] = [];
for (const entry of workers.values()) {
sendShutdownToProcess(entry.process);
exitPromises.push(waitForExit(entry.process, 5000));
}
await Promise.all(exitPromises);
await runtime.shutdown();
trackedGroups.clear();
evicting.clear();
}
function sendCompute(group: string, senseName: string): void {
const entry = workers.get(group);
if (entry === undefined) return;
sendComputeToProcess(entry.process, senseName);
if (!trackedGroups.has(group) || evicting.has(group)) {
return;
}
// Legacy pool: `child.send` no-op when IPC is closed (still allow cold start: child === null).
if (runtime.hasDisconnectedChild(group)) {
return;
}
const msg: ComputeMessage = { type: "compute", sense: senseName };
if (!runtime.trySendSync(group, msg)) {
void runtime.send(group, msg).catch(() => {
// IPC channel may close between scheduling and send — same as legacy try/catch on child.send
});
}
}
function getWorkerPid(group: string): number | null {
return workers.get(group)?.process.pid ?? null;
return runtime.pid(group);
}
/** True once `startWorker` has been called for the group and it is not mid-evict (matches legacy Map key). */
function hasWorkerForGroup(group: string): boolean {
return workers.has(group);
return trackedGroups.has(group) && !evicting.has(group);
}
/** Count of sense groups with a worker slot (includes not-yet-ready), excluding evicted keys. */
function activeGroupCount(): number {
return workers.size;
return trackedGroups.size;
}
return {
+440
View File
@@ -0,0 +1,440 @@
/**
* Generic message-routed worker process manager (RFC-006).
* One forked Node child per key; cold start, crash respawn, drain/evict, shutdown.
*/
import { type ChildProcess, type Serializable, fork } from "node:child_process";
import { isPlainRecord } from "@uncaged/nerve-core";
const STDERR_TAIL_MAX_CHARS = 2048;
export function formatChildExitSummary(code: number | null, signal: NodeJS.Signals | null): string {
const codeStr = code === null || code === undefined ? "null" : String(code);
if (signal) {
return `code=${codeStr} signal=${signal}`;
}
return `code=${codeStr}`;
}
export function formatCapturedStderrTail(tail: string, maxChars = 800): string {
const trimmed = tail.trim();
if (trimmed.length === 0) return "";
const normalized = trimmed.replace(/\r?\n/g, "\\n");
if (normalized.length <= maxChars) {
return ` worker_stderr=${normalized}`;
}
return ` worker_stderr=…${normalized.slice(-maxChars)}`;
}
export type WorkerDrainOpts = {
shutdownTimeoutMs: number | null;
};
export type WorkerRuntimeConfig<K extends string> = {
script: string;
argsForKey: (key: K) => string[];
/** When false, stderr is not captured into `stderrTail` (e.g. tests without a pipe). */
forwardStderr: boolean;
onMessage: (key: K, msg: unknown) => void;
onReady: (key: K, msg: unknown) => void;
onExit: (key: K, code: number | null, signal: string | null) => void;
/** Invoked when automatic respawn is skipped because `maxCrashes` was exceeded in `windowMs`. */
onCrashLimitReached: ((key: K) => void) | null;
respawn: {
enabled: boolean;
maxCrashes: number;
windowMs: number;
delayMs: number;
/** When non-null, return false to skip automatic respawn after an unexpected exit. */
allowRespawn: ((key: K) => boolean) | null;
};
shutdownTimeoutMs: number;
};
export type WorkerRuntime<K extends string> = {
send: (key: K, msg: unknown) => Promise<void>;
/** When the worker is already ready and IPC-connected, sends synchronously (returns true). Otherwise false — caller may fall back to `send`. */
trySendSync: (key: K, msg: unknown) => boolean;
start: (key: K) => Promise<void>;
evict: (key: K, opts: WorkerDrainOpts | null) => Promise<void>;
drain: (key: K, opts: WorkerDrainOpts | null) => Promise<void>;
shutdown: () => Promise<void>;
has: (key: K) => boolean;
/** True when a child exists but IPC is disconnected (legacy pool skipped sends in this case). */
hasDisconnectedChild: (key: K) => boolean;
pid: (key: K) => number | null;
keys: () => K[];
stderrTail: (key: K) => string;
};
type WorkerMachineState = "stopped" | "starting" | "ready" | "draining";
type ReadyWaiter = {
resolve: () => void;
reject: (err: Error) => void;
};
/** Internal: one forked process slot (ManagedWorker). */
type WorkerSlot<K extends string> = {
key: K;
state: WorkerMachineState;
child: ChildProcess | null;
pid: number | null;
stderrTail: string;
crashTimestamps: number[];
expectExit: boolean;
readyWaiters: ReadyWaiter[];
opChain: Promise<void>;
};
function isReadyIpcMessage(raw: unknown): boolean {
return isPlainRecord(raw) && raw.type === "ready";
}
function signalToString(signal: NodeJS.Signals | null): string | null {
if (signal === null) {
return null;
}
return String(signal);
}
function attachStderrTail<K extends string>(child: ChildProcess, slot: WorkerSlot<K>): void {
const stream = child.stderr;
if (stream == null) {
return;
}
stream.setEncoding("utf8");
stream.on("data", (chunk: string | Buffer) => {
const text = typeof chunk === "string" ? chunk : chunk.toString("utf8");
slot.stderrTail = (slot.stderrTail + text).slice(-STDERR_TAIL_MAX_CHARS);
});
}
function enqueueOp<K extends string>(slot: WorkerSlot<K>, fn: () => Promise<void>): Promise<void> {
const run = slot.opChain.then(fn, fn);
slot.opChain = run.then(
() => {},
() => {},
);
return run;
}
function resolveReadyWaiters<K extends string>(slot: WorkerSlot<K>): void {
const waiters = slot.readyWaiters;
slot.readyWaiters = [];
for (const w of waiters) {
w.resolve();
}
}
function rejectReadyWaiters<K extends string>(slot: WorkerSlot<K>, err: Error): void {
const waiters = slot.readyWaiters;
slot.readyWaiters = [];
for (const w of waiters) {
w.reject(err);
}
}
function waitForReady<K extends string>(
slot: WorkerSlot<K>,
shutdownTimeoutMs: number,
): Promise<void> {
if (slot.state === "ready" && slot.child !== null && slot.child.connected) {
return Promise.resolve();
}
return new Promise((resolve, reject) => {
let settled = false;
const timer = setTimeout(() => {
if (!settled) {
settled = true;
reject(new Error(`Worker "${String(slot.key)}" ready timeout`));
}
}, shutdownTimeoutMs);
slot.readyWaiters.push({
resolve: () => {
if (settled) {
return;
}
settled = true;
clearTimeout(timer);
resolve();
},
reject: (err: Error) => {
if (settled) {
return;
}
settled = true;
clearTimeout(timer);
reject(err);
},
});
});
}
async function waitForChildExit(child: ChildProcess, timeoutMs: number): Promise<void> {
await new Promise<void>((resolve) => {
const timer = setTimeout(() => {
child.kill("SIGKILL");
}, timeoutMs);
child.once("exit", () => {
clearTimeout(timer);
resolve();
});
});
}
export function createWorkerRuntime<K extends string>(
config: WorkerRuntimeConfig<K>,
): WorkerRuntime<K> {
const workers = new Map<K, WorkerSlot<K>>();
function getOrCreateSlot(key: K): WorkerSlot<K> {
let slot = workers.get(key);
if (slot === undefined) {
slot = {
key,
state: "stopped",
child: null,
pid: null,
stderrTail: "",
crashTimestamps: [],
expectExit: false,
readyWaiters: [],
opChain: Promise.resolve(),
};
workers.set(key, slot);
}
return slot;
}
function handleWorkerMessage(slot: WorkerSlot<K>, msg: unknown): void {
if (isReadyIpcMessage(msg)) {
if (slot.state === "starting") {
slot.state = "ready";
config.onReady(slot.key, msg);
resolveReadyWaiters(slot);
}
return;
}
config.onMessage(slot.key, msg);
}
function onChildExit(
slot: WorkerSlot<K>,
code: number | null,
signal: NodeJS.Signals | null,
): void {
config.onExit(slot.key, code, signalToString(signal));
if (slot.child !== null) {
slot.child.removeAllListeners("message");
slot.child.removeAllListeners("exit");
}
const wasExpect = slot.expectExit;
slot.expectExit = false;
slot.child = null;
slot.pid = null;
if (wasExpect) {
slot.state = "stopped";
return;
}
rejectReadyWaiters(slot, new Error(`Worker "${String(slot.key)}" exited unexpectedly`));
slot.state = "stopped";
void enqueueOp(slot, async () => {
await handleUnexpectedCrashRecovery(slot);
});
}
function registerChild(slot: WorkerSlot<K>, child: ChildProcess): void {
slot.child = child;
slot.pid = child.pid ?? null;
if (config.forwardStderr) {
attachStderrTail(child, slot);
}
child.on("message", (msg: unknown) => {
handleWorkerMessage(slot, msg);
});
child.on("exit", (code, sig) => {
onChildExit(slot, code, sig ?? null);
});
}
async function forkAndWaitReady(slot: WorkerSlot<K>): Promise<void> {
if (slot.state === "ready" && slot.child !== null && slot.child.connected) {
return;
}
slot.state = "starting";
let child: ChildProcess;
try {
child = fork(config.script, config.argsForKey(slot.key), {
stdio: ["ignore", "inherit", "pipe", "ipc"],
env: process.env,
});
} catch (e) {
slot.state = "stopped";
const err = e instanceof Error ? e : new Error(String(e));
rejectReadyWaiters(slot, err);
throw err;
}
registerChild(slot, child);
await waitForReady(slot, config.shutdownTimeoutMs);
}
function resolveShutdownTimeoutMs(opts: WorkerDrainOpts | null): number {
if (opts !== null && opts.shutdownTimeoutMs !== null) {
return opts.shutdownTimeoutMs;
}
return config.shutdownTimeoutMs;
}
async function gracefulStop(slot: WorkerSlot<K>, shutdownTimeoutMs: number): Promise<void> {
if (slot.child === null) {
return;
}
slot.expectExit = true;
slot.state = "draining";
const child = slot.child;
try {
child.send({ type: "shutdown" });
} catch {
// IPC channel may have closed between null-check and send
}
await waitForChildExit(child, shutdownTimeoutMs);
}
async function handleUnexpectedCrashRecovery(slot: WorkerSlot<K>): Promise<void> {
if (!config.respawn.enabled) {
return;
}
if (config.respawn.allowRespawn !== null && !config.respawn.allowRespawn(slot.key)) {
return;
}
const now = Date.now();
slot.crashTimestamps.push(now);
slot.crashTimestamps = slot.crashTimestamps.filter((t) => now - t <= config.respawn.windowMs);
if (slot.crashTimestamps.length >= config.respawn.maxCrashes) {
console.error(
`[WorkerRuntime] worker "${String(slot.key)}" exceeded crash limit (${String(config.respawn.maxCrashes)} in ${String(config.respawn.windowMs)}ms); not respawning`,
);
if (config.onCrashLimitReached !== null) {
config.onCrashLimitReached(slot.key);
}
return;
}
await new Promise<void>((resolve) => setTimeout(resolve, config.respawn.delayMs));
await forkAndWaitReady(slot);
}
async function shutdownWorker(slot: WorkerSlot<K>): Promise<void> {
await gracefulStop(slot, config.shutdownTimeoutMs);
workers.delete(slot.key);
}
function isActive(slot: WorkerSlot<K>): boolean {
return slot.state === "ready" && slot.child !== null && slot.child.connected;
}
return {
send: async (key: K, msg: unknown) => {
const slot = getOrCreateSlot(key);
await enqueueOp(slot, async () => {
await forkAndWaitReady(slot);
const child = slot.child;
if (child === null || !child.connected) {
throw new Error(`Worker "${String(key)}" is not connected`);
}
child.send(msg as Serializable);
});
},
trySendSync: (key: K, msg: unknown): boolean => {
const slot = workers.get(key);
if (slot === undefined || !isActive(slot)) {
return false;
}
const child = slot.child;
if (child === null || !child.connected) {
return false;
}
try {
child.send(msg as Serializable);
return true;
} catch {
return false;
}
},
start: async (key: K) => {
const slot = getOrCreateSlot(key);
await enqueueOp(slot, async () => {
await forkAndWaitReady(slot);
});
},
evict: async (key: K, opts: WorkerDrainOpts | null) => {
const slot = getOrCreateSlot(key);
const shutdownMs = resolveShutdownTimeoutMs(opts);
await enqueueOp(slot, async () => {
await gracefulStop(slot, shutdownMs);
workers.delete(key);
});
},
drain: async (key: K, opts: WorkerDrainOpts | null) => {
const slot = getOrCreateSlot(key);
const shutdownMs = resolveShutdownTimeoutMs(opts);
await enqueueOp(slot, async () => {
if (slot.child === null) {
await forkAndWaitReady(slot);
return;
}
await gracefulStop(slot, shutdownMs);
await forkAndWaitReady(slot);
});
},
shutdown: async () => {
const snapshot = [...workers.values()];
await Promise.all(snapshot.map((slot) => enqueueOp(slot, () => shutdownWorker(slot))));
},
has: (key: K) => {
const slot = workers.get(key);
return slot !== undefined && isActive(slot);
},
hasDisconnectedChild: (key: K): boolean => {
const slot = workers.get(key);
if (slot === undefined || slot.child === null) {
return false;
}
return !slot.child.connected;
},
pid: (key: K) => {
const slot = workers.get(key);
if (slot === undefined || !isActive(slot) || slot.pid === null) {
return null;
}
return slot.pid;
},
keys: () => [...workers.values()].filter((slot) => isActive(slot)).map((slot) => slot.key),
stderrTail: (key: K) => {
const slot = workers.get(key);
return slot === undefined ? "" : slot.stderrTail;
},
};
}
+17
View File
@@ -0,0 +1,17 @@
/**
* Worker-process signal handling (fork IPC children only).
* Worker entrypoints import this module — not worker-runtime.ts (parent/kernel code).
*/
/**
* Forked workers inherit the parent's process group. In foreground `nerve dev`,
* terminal-driven SIGINT/SIGTERM is delivered to the whole group, so workers can exit
* on the default handler before the kernel sends `{ type: "shutdown" }` over IPC.
* Swallow these in worker processes so the parent coordinates shutdown (issue #55).
* Only call when `process.send` is defined (fork IPC); standalone `node …-worker.js` keeps default Ctrl+C behaviour.
*/
export function ignoreSessionBroadcastSignals(): void {
const swallow = (): void => {};
process.on("SIGINT", swallow);
process.on("SIGTERM", swallow);
}
@@ -0,0 +1,256 @@
/**
* Pure helpers and IPC branching for workflow-manager (keeps workflow-manager.ts lean).
*/
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import type { WorkflowMessage } from "@uncaged/nerve-core";
import { START, isPlainRecord } from "@uncaged/nerve-core";
import type { LogStore, WorkflowRunStatus } from "@uncaged/nerve-store";
import type { ResumeThreadMessage, ThreadEventMessage } from "./ipc.js";
import type { WorkerToParentMessage } from "./ipc.js";
export type PendingThread = {
runId: string;
prompt: string;
maxRounds: number;
dryRun: boolean;
};
export type WorkflowState = {
active: Set<string>;
queue: PendingThread[];
};
/** Matches legacy manager: 6 crashes within 60s stops respawn (was `length > 5`). */
export const WORKFLOW_WORKER_RESPAWN = {
enabled: true,
maxCrashes: 6,
windowMs: 60_000,
delayMs: 0,
} as const;
/**
* Worker shutdown timeout — must stay in sync with SHUTDOWN_TIMEOUT_MS in workflow-worker.ts.
* The drain timeout passed to drainAndRespawn must be >= this value so the worker has
* enough time to finish in-flight threads before the parent force-kills it.
*/
export const WORKER_SHUTDOWN_TIMEOUT_MS = 10_000;
export const DEFAULT_MAX_QUEUE = 100;
export function readLaunchFromTriggerPayload(
raw: unknown,
engineDefaultMaxRounds: number,
): { prompt: string; maxRounds: number; dryRun: boolean } {
if (isPlainRecord(raw)) {
const o = raw;
if (typeof o.prompt === "string" && typeof o.maxRounds === "number") {
const dryRun = typeof o.dryRun === "boolean" ? o.dryRun : false;
return { prompt: o.prompt, maxRounds: o.maxRounds, dryRun };
}
}
return { prompt: "", maxRounds: engineDefaultMaxRounds, dryRun: false };
}
export function ensureThreadMessagesWithStart(
messages: Array<{ role: string; content: string; meta: unknown; timestamp: number }>,
threadId: string,
fallbackPrompt: string,
fallbackMaxRounds: number,
): WorkflowMessage[] {
const mapped: WorkflowMessage[] = messages.map((m) => ({
role: m.role,
content: m.content,
meta: m.meta,
timestamp: m.timestamp,
}));
if (mapped.length > 0 && mapped[0].role === START) {
return mapped;
}
const start: WorkflowMessage = {
role: START,
content: fallbackPrompt,
meta: { maxRounds: fallbackMaxRounds, threadId },
timestamp: Date.now(),
};
return [start, ...mapped];
}
export function resolveWorkflowWorkerScript(): string {
const __filename = fileURLToPath(import.meta.url);
const __dir = dirname(__filename);
return join(__dir, "workflow-worker.js");
}
export function mapWorkflowRunStatus(eventType: string): WorkflowRunStatus | null {
const map: Record<string, WorkflowRunStatus> = {
started: "started",
queued: "queued",
completed: "completed",
failed: "failed",
crashed: "crashed",
dropped: "dropped",
interrupted: "interrupted",
killed: "killed",
};
return map[eventType] ?? null;
}
export function extractExitCodeFromPayload(payload: unknown): number | null {
if (isPlainRecord(payload) && typeof payload.exitCode === "number") {
return payload.exitCode;
}
return null;
}
export function appendWorkflowRunLog(
logStore: LogStore,
workflowName: string,
runId: string,
eventType: string,
payload: unknown | undefined,
exitCode: number | null,
): void {
const timestamp = Date.now();
const serialised = payload !== undefined ? JSON.stringify(payload) : null;
const status = mapWorkflowRunStatus(eventType);
if (status !== null) {
logStore.upsertWorkflowRun(
{
source: "workflow",
type: eventType,
refId: runId,
payload: serialised,
timestamp,
},
{ runId, workflow: workflowName, status, timestamp, exitCode },
);
} else {
logStore.append({
source: "workflow",
type: eventType,
refId: runId,
payload: serialised,
timestamp,
});
}
}
export function recoverQueuedRun(
workflowName: string,
runId: string,
state: WorkflowState,
logStore: LogStore,
engineMaxRounds: number,
): void {
if (state.queue.some((q) => q.runId === runId)) return;
const launch = readLaunchFromTriggerPayload(logStore.getTriggerPayload(runId), engineMaxRounds);
state.queue.push({
runId,
prompt: launch.prompt,
maxRounds: launch.maxRounds,
dryRun: launch.dryRun,
});
process.stderr.write(
`[workflow-manager] crash-recovery: re-queued thread "${runId}" for "${workflowName}"\n`,
);
}
export function recoverStartedRun(
workflowName: string,
runId: string,
state: WorkflowState,
logStore: LogStore,
engineMaxRounds: number,
sendResume: (wf: string, msg: ResumeThreadMessage) => void,
): void {
if (state.active.has(runId)) return;
const rawMessages = logStore.getThreadMessages(runId);
const launch = readLaunchFromTriggerPayload(logStore.getTriggerPayload(runId), engineMaxRounds);
const messages = ensureThreadMessagesWithStart(
rawMessages,
runId,
launch.prompt,
launch.maxRounds,
);
state.active.add(runId);
const msg: ResumeThreadMessage = {
type: "resume-thread",
runId,
messages,
maxRounds: launch.maxRounds,
dryRun: launch.dryRun,
};
sendResume(workflowName, msg);
process.stderr.write(
`[workflow-manager] crash-recovery: resuming thread "${runId}" for "${workflowName}" (${String(messages.length)} messages)\n`,
);
}
export function recoverThreadsFromStore(
workflowName: string,
logStore: LogStore,
engineMaxRounds: number,
getOrCreateState: (name: string) => WorkflowState,
sendResume: (wf: string, msg: ResumeThreadMessage) => void,
): void {
const activeRuns = logStore.getActiveWorkflowRuns(workflowName);
const state = getOrCreateState(workflowName);
for (const run of activeRuns) {
if (run.status === "queued") {
recoverQueuedRun(workflowName, run.runId, state, logStore, engineMaxRounds);
} else if (run.status === "started") {
recoverStartedRun(workflowName, run.runId, state, logStore, engineMaxRounds, sendResume);
}
}
}
export type WorkflowManagerMessageDeps = {
logStore: LogStore;
handleThreadEvent: (workflowName: string, msg: ThreadEventMessage) => void;
onWorkflowRoleError: (
workflowName: string,
runId: string,
error: string,
exitCode: number,
) => void;
};
export function dispatchWorkflowWorkerMessage(
workflowName: string,
msg: WorkerToParentMessage,
deps: WorkflowManagerMessageDeps,
): void {
if (msg.type === "thread-event") {
deps.handleThreadEvent(workflowName, msg);
return;
}
if (msg.type === "thread-workflow-message") {
deps.logStore.append({
source: "workflow",
type: "thread_workflow_message",
refId: msg.runId,
payload: JSON.stringify(msg.message),
timestamp: Date.now(),
});
return;
}
if (msg.type === "workflow-error") {
process.stderr.write(
`[workflow-manager] workflow-error for runId "${msg.runId}" in "${workflowName}": ${msg.error}\n`,
);
deps.onWorkflowRoleError(workflowName, msg.runId, msg.error, msg.exitCode);
return;
}
if (msg.type === "error") {
process.stderr.write(`[workflow-manager] error from "${workflowName}" worker: ${msg.error}\n`);
}
}
+176 -446
View File
@@ -6,33 +6,27 @@
* Concurrency and overflow (drop/queue) are enforced here in the parent process.
*/
import { fork } from "node:child_process";
import type { ChildProcess } from "node:child_process";
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import type { NerveConfig, WorkflowConfig, WorkflowStatus } from "@uncaged/nerve-core";
import type {
NerveConfig,
WorkflowConfig,
WorkflowMessage,
WorkflowStatus,
} from "@uncaged/nerve-core";
import { START, isPlainRecord } from "@uncaged/nerve-core";
import type { LogStore, WorkflowRunStatus } from "@uncaged/nerve-store";
import type {
KillThreadMessage,
ResumeThreadMessage,
ShutdownMessage,
StartThreadMessage,
ThreadEventMessage,
} from "./ipc.js";
import type { LogStore } from "@uncaged/nerve-store";
import type { KillThreadMessage, StartThreadMessage, ThreadEventMessage } from "./ipc.js";
import { parseWorkerMessage } from "./ipc.js";
import {
createWorkerRuntime,
formatCapturedStderrTail,
formatChildExitSummary,
teeCapturedStderr,
} from "./worker-fork-support.js";
} from "./worker-runtime.js";
import {
DEFAULT_MAX_QUEUE,
WORKER_SHUTDOWN_TIMEOUT_MS,
WORKFLOW_WORKER_RESPAWN,
type WorkflowState,
appendWorkflowRunLog,
dispatchWorkflowWorkerMessage,
extractExitCodeFromPayload,
recoverThreadsFromStore,
resolveWorkflowWorkerScript,
} from "./workflow-manager-support.js";
export type WorkflowLaunchParams = {
prompt: string;
@@ -60,7 +54,7 @@ export type WorkflowManager = {
updateConfig: (newConfig: NerveConfig) => void;
/**
* Drain active threads for a workflow, then respawn its worker process.
* Used for hot reload when bundled workflow output under workflows/<name>/dist/ changes.
* Used for hot reload when bundled workflow output under dist/workflows/<name>/ changes.
* Waits up to `drainTimeoutMs` for threads to complete before force-killing.
*/
drainAndRespawn: (workflowName: string, drainTimeoutMs?: number) => Promise<void>;
@@ -74,169 +68,109 @@ export type WorkflowManager = {
stop: () => Promise<void>;
};
type PendingThread = {
runId: string;
prompt: string;
maxRounds: number;
dryRun: boolean;
};
type WorkflowState = {
active: Set<string>;
queue: PendingThread[];
};
type WorkerEntry = {
workflowName: string;
process: ChildProcess;
stopping: boolean;
/** When set, the worker is draining before a hot-reload respawn. */
draining: boolean;
stderrTail: { value: string };
};
// Crash respawn backoff: track crash timestamps per workflow.
const MAX_CRASHES_IN_WINDOW = 5;
const CRASH_WINDOW_MS = 60_000;
/**
* Worker shutdown timeout — must stay in sync with SHUTDOWN_TIMEOUT_MS in workflow-worker.ts.
* The drain timeout passed to drainAndRespawn must be >= this value so the worker has
* enough time to finish in-flight threads before the parent force-kills it.
*/
const WORKER_SHUTDOWN_TIMEOUT_MS = 10_000;
const DEFAULT_MAX_QUEUE = 100;
function readLaunchFromTriggerPayload(
raw: unknown,
engineDefaultMaxRounds: number,
): { prompt: string; maxRounds: number; dryRun: boolean } {
if (isPlainRecord(raw)) {
const o = raw;
if (typeof o.prompt === "string" && typeof o.maxRounds === "number") {
const dryRun = typeof o.dryRun === "boolean" ? o.dryRun : false;
return { prompt: o.prompt, maxRounds: o.maxRounds, dryRun };
}
}
return { prompt: "", maxRounds: engineDefaultMaxRounds, dryRun: false };
}
function ensureThreadMessagesWithStart(
messages: Array<{ role: string; content: string; meta: unknown; timestamp: number }>,
threadId: string,
fallbackPrompt: string,
fallbackMaxRounds: number,
): WorkflowMessage[] {
const mapped: WorkflowMessage[] = messages.map((m) => ({
role: m.role,
content: m.content,
meta: m.meta,
timestamp: m.timestamp,
}));
if (mapped.length > 0 && mapped[0].role === START) {
return mapped;
}
const start: WorkflowMessage = {
role: START,
content: fallbackPrompt,
meta: { maxRounds: fallbackMaxRounds, threadId },
timestamp: Date.now(),
};
return [start, ...mapped];
}
function resolveWorkerScript(): string {
const __filename = fileURLToPath(import.meta.url);
const __dir = dirname(__filename);
return join(__dir, "workflow-worker.js");
}
function spawnWorkflowWorker(
nerveRoot: string,
workflowName: string,
workerScript: string,
stderrTail: { value: string },
): ChildProcess {
const child = fork(workerScript, ["--workflow", workflowName, "--root", nerveRoot], {
stdio: ["ignore", "inherit", "pipe", "ipc"],
});
teeCapturedStderr(child, stderrTail);
// Prevent unhandled EPIPE when writing to a child whose IPC channel closed
child.on("error", (err) => {
if ((err as NodeJS.ErrnoException).code !== "EPIPE") {
console.error("[worker] error:", err.message);
}
});
return child;
}
function sendStartThread(worker: ChildProcess, msg: StartThreadMessage): void {
if (worker.connected === false) return;
try {
worker.send(msg);
} catch {
// IPC channel closed between connected check and send
}
}
function sendShutdown(worker: ChildProcess, entry: WorkerEntry): void {
entry.stopping = true;
if (worker.connected === false) return;
const msg: ShutdownMessage = { type: "shutdown" };
try {
worker.send(msg);
} catch {
// IPC channel closed between connected check and send
}
}
function sendResumeThread(worker: ChildProcess, msg: ResumeThreadMessage): void {
if (worker.connected === false) return;
try {
worker.send(msg);
} catch {
// IPC channel closed between connected check and send
}
}
function sendKillThread(worker: ChildProcess, runId: string): void {
if (worker.connected === false) return;
const msg: KillThreadMessage = { type: "kill-thread", runId };
try {
worker.send(msg);
} catch {
// IPC channel closed between connected check and send
}
}
function waitForExit(child: ChildProcess, timeoutMs: number): Promise<void> {
return new Promise((resolve) => {
const timer = setTimeout(() => {
child.kill("SIGKILL");
resolve();
}, timeoutMs);
child.once("exit", () => {
clearTimeout(timer);
resolve();
});
});
}
export function createWorkflowManager(
nerveRoot: string,
initialConfig: NerveConfig,
logStore: LogStore,
): WorkflowManager {
const workerScript = resolveWorkerScript();
const workerScript = resolveWorkflowWorkerScript();
/**
* Default drain timeout must be at least WORKER_SHUTDOWN_TIMEOUT_MS so the worker
* has enough time to finish in-flight threads before the parent force-kills it.
*/
const DEFAULT_DRAIN_TIMEOUT_MS = Math.max(30_000, WORKER_SHUTDOWN_TIMEOUT_MS + 5_000);
const states = new Map<string, WorkflowState>();
const workers = new Map<string, WorkerEntry>();
const crashTimestamps = new Map<string, number[]>();
const trackedWorkflows = new Set<string>();
const hotReloadEvicting = new Set<string>();
const crashRecoveryPending = new Set<string>();
const crashLimitBlocked = new Set<string>();
let stopped = false;
let config = initialConfig;
const pendingDrains = new Set<string>();
function logWorkflowEvent(
workflowName: string,
runId: string,
eventType: string,
payload: unknown | null = null,
exitCode: number | null = null,
): void {
appendWorkflowRunLog(logStore, workflowName, runId, eventType, payload, exitCode);
}
const runtime = createWorkerRuntime<string>({
script: workerScript,
argsForKey: (workflowName) => ["--workflow", workflowName, "--root", nerveRoot],
forwardStderr: true,
onMessage: (workflowName, raw) => {
handleWorkerMessage(workflowName, raw);
},
onReady: (workflowName, _msg) => {
if (crashRecoveryPending.has(workflowName)) {
crashRecoveryPending.delete(workflowName);
recoverThreadsFromStore(
workflowName,
logStore,
config.maxRounds,
getOrCreateState,
(wf, msg) => {
sendToWorker(wf, msg);
},
);
}
},
onExit: (workflowName, code, signalStr) => {
const sig =
signalStr === null || signalStr === undefined || signalStr === ""
? null
: (signalStr as NodeJS.Signals);
if (hotReloadEvicting.has(workflowName)) {
hotReloadEvicting.delete(workflowName);
markActiveRunsInterrupted(workflowName);
if (!stopped && workflowConfig(workflowName) !== null) {
process.stderr.write(
`[workflow-manager] worker for "${workflowName}" drained, respawning\n`,
);
}
return;
}
if (stopped) {
const state = states.get(workflowName);
if (state !== undefined) {
state.active.clear();
}
crashRecoveryPending.delete(workflowName);
return;
}
const summary = formatChildExitSummary(code, sig);
const stderrExtra = formatCapturedStderrTail(runtime.stderrTail(workflowName));
process.stderr.write(
`[workflow-manager] worker for "${workflowName}" exited (${summary})${stderrExtra}\n`,
);
cleanupAfterUnexpectedWorkerExit(workflowName);
crashRecoveryPending.add(workflowName);
},
onCrashLimitReached: (workflowName) => {
crashRecoveryPending.delete(workflowName);
trackedWorkflows.delete(workflowName);
crashLimitBlocked.add(workflowName);
process.stderr.write(
`[workflow-manager] worker for "${workflowName}" exceeded crash limit (${String(WORKFLOW_WORKER_RESPAWN.maxCrashes)} in ${String(WORKFLOW_WORKER_RESPAWN.windowMs)}ms) — stopping respawn\n`,
);
},
respawn: {
...WORKFLOW_WORKER_RESPAWN,
allowRespawn: (_wf) => !stopped,
},
shutdownTimeoutMs: DEFAULT_DRAIN_TIMEOUT_MS,
});
function getOrCreateState(workflowName: string): WorkflowState {
let state = states.get(workflowName);
if (state === undefined) {
@@ -250,60 +184,38 @@ export function createWorkflowManager(
return config.workflows[workflowName] ?? null;
}
function toWorkflowRunStatus(eventType: string): WorkflowRunStatus | null {
const map: Record<string, WorkflowRunStatus> = {
started: "started",
queued: "queued",
completed: "completed",
failed: "failed",
crashed: "crashed",
dropped: "dropped",
interrupted: "interrupted",
killed: "killed",
};
return map[eventType] ?? null;
}
function extractExitCode(payload: unknown): number | null {
if (isPlainRecord(payload) && typeof payload.exitCode === "number") {
return payload.exitCode;
/** IPC send — matches legacy pool: no-op when IPC is disconnected; cold-start via WorkerRuntime.send. */
function sendToWorker(workflowName: string, msg: unknown): void {
if (crashLimitBlocked.has(workflowName)) {
return;
}
return null;
}
function logWorkflowEvent(
workflowName: string,
runId: string,
eventType: string,
payload?: unknown,
exitCode: number | null = null,
): void {
const timestamp = Date.now();
const serialised = payload !== undefined ? JSON.stringify(payload) : null;
const status = toWorkflowRunStatus(eventType);
if (status !== null) {
logStore.upsertWorkflowRun(
{
source: "workflow",
type: eventType,
refId: runId,
payload: serialised,
timestamp,
},
{ runId, workflow: workflowName, status, timestamp, exitCode },
);
} else {
logStore.append({
source: "workflow",
type: eventType,
refId: runId,
payload: serialised,
timestamp,
trackedWorkflows.add(workflowName);
if (runtime.hasDisconnectedChild(workflowName)) {
return;
}
if (!runtime.trySendSync(workflowName, msg)) {
void runtime.send(workflowName, msg).catch(() => {
// IPC channel closed — mark any thread from this message as failed
if (isStartThreadMsg(msg)) {
const state = states.get(workflowName);
if (state?.active.has(msg.runId)) {
state.active.delete(msg.runId);
logWorkflowEvent(workflowName, msg.runId, "failed", { error: "IPC channel closed" }, 1);
dequeueNext(workflowName);
}
}
});
}
}
function isStartThreadMsg(msg: unknown): msg is StartThreadMessage {
return (
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "start-thread"
);
}
function dispatchThread(
workflowName: string,
runId: string,
@@ -314,7 +226,6 @@ export function createWorkflowManager(
const state = getOrCreateState(workflowName);
state.active.add(runId);
const worker = getOrSpawnWorker(workflowName);
const msg: StartThreadMessage = {
type: "start-thread",
runId,
@@ -323,7 +234,7 @@ export function createWorkflowManager(
maxRounds,
dryRun,
};
sendStartThread(worker.process, msg);
sendToWorker(workflowName, msg);
logWorkflowEvent(workflowName, runId, "started", { prompt, maxRounds, dryRun });
}
@@ -367,92 +278,20 @@ export function createWorkflowManager(
if (msg.eventType === "completed" || msg.eventType === "failed" || msg.eventType === "killed") {
state.active.delete(msg.runId);
dequeueNext(workflowName);
const exitCode = extractExitCode(msg.payload);
const exitCode = extractExitCodeFromPayload(msg.payload);
logWorkflowEvent(workflowName, msg.runId, msg.eventType, msg.payload, exitCode);
maybeDeferredHotReloadDrain(workflowName);
}
}
function recoverQueuedRun(workflowName: string, runId: string, state: WorkflowState): void {
if (state.queue.some((q) => q.runId === runId)) return;
const launch = readLaunchFromTriggerPayload(
logStore.getTriggerPayload(runId),
config.maxRounds,
);
state.queue.push({
runId,
prompt: launch.prompt,
maxRounds: launch.maxRounds,
dryRun: launch.dryRun,
});
process.stderr.write(
`[workflow-manager] crash-recovery: re-queued thread "${runId}" for "${workflowName}"\n`,
);
}
function recoverStartedRun(
workflowName: string,
runId: string,
state: WorkflowState,
worker: WorkerEntry,
): void {
if (state.active.has(runId)) return;
const rawMessages = logStore.getThreadMessages(runId);
const launch = readLaunchFromTriggerPayload(
logStore.getTriggerPayload(runId),
config.maxRounds,
);
const messages = ensureThreadMessagesWithStart(
rawMessages,
runId,
launch.prompt,
launch.maxRounds,
);
state.active.add(runId);
const msg: ResumeThreadMessage = {
type: "resume-thread",
runId,
messages,
maxRounds: launch.maxRounds,
dryRun: launch.dryRun,
};
sendResumeThread(worker.process, msg);
process.stderr.write(
`[workflow-manager] crash-recovery: resuming thread "${runId}" for "${workflowName}" (${messages.length} messages)\n`,
);
}
function recoverThreadsForWorker(workflowName: string, worker: WorkerEntry): void {
const activeRuns = logStore.getActiveWorkflowRuns(workflowName);
const state = getOrCreateState(workflowName);
for (const run of activeRuns) {
if (run.status === "queued") {
recoverQueuedRun(workflowName, run.runId, state);
} else if (run.status === "started") {
recoverStartedRun(workflowName, run.runId, state, worker);
}
}
}
function recordCrashAndCheckLimit(workflowName: string): boolean {
const now = Date.now();
const timestamps = (crashTimestamps.get(workflowName) ?? []).filter(
(t) => now - t < CRASH_WINDOW_MS,
);
timestamps.push(now);
crashTimestamps.set(workflowName, timestamps);
return timestamps.length > MAX_CRASHES_IN_WINDOW;
}
function handleWorkerCrash(workflowName: string): void {
function cleanupAfterUnexpectedWorkerExit(workflowName: string): void {
const state = states.get(workflowName);
if (state === undefined) return;
const crashedCount = state.active.size;
if (crashedCount > 0) {
process.stderr.write(
`[workflow-manager] worker for "${workflowName}" crashed with ${crashedCount} active thread(s)\n`,
`[workflow-manager] worker for "${workflowName}" crashed with ${String(crashedCount)} active thread(s)\n`,
);
for (const runId of state.active) {
logWorkflowEvent(workflowName, runId, "crashed", undefined, 255);
@@ -460,26 +299,13 @@ export function createWorkflowManager(
}
state.active.clear();
workers.delete(workflowName);
pendingDrains.delete(workflowName);
if (stopped || workflowConfig(workflowName) === null) return;
if (recordCrashAndCheckLimit(workflowName)) {
const count = crashTimestamps.get(workflowName)?.length ?? 0;
if (!stopped && !crashLimitBlocked.has(workflowName) && workflowConfig(workflowName) !== null) {
process.stderr.write(
`[workflow-manager] worker for "${workflowName}" crashed ${count} times in ${CRASH_WINDOW_MS}ms — stopping respawn\n`,
`[workflow-manager] respawning worker for "${workflowName}" after crash\n`,
);
return;
}
process.stderr.write(
`[workflow-manager] respawning worker for "${workflowName}" after crash\n`,
);
const newWorker = getOrSpawnWorker(workflowName);
setImmediate(() => {
recoverThreadsForWorker(workflowName, newWorker);
});
}
function handleWorkerMessage(workflowName: string, raw: unknown): void {
@@ -490,43 +316,19 @@ export function createWorkflowManager(
);
return;
}
const msg = result.value;
if (msg.type === "thread-event") {
handleThreadEvent(workflowName, msg);
return;
}
if (msg.type === "thread-workflow-message") {
logStore.append({
source: "workflow",
type: "thread_workflow_message",
refId: msg.runId,
payload: JSON.stringify(msg.message),
timestamp: Date.now(),
});
return;
}
if (msg.type === "workflow-error") {
process.stderr.write(
`[workflow-manager] workflow-error for runId "${msg.runId}" in "${workflowName}": ${msg.error}\n`,
);
const state = states.get(workflowName);
if (state !== undefined) {
state.active.delete(msg.runId);
dequeueNext(workflowName);
}
logWorkflowEvent(workflowName, msg.runId, "failed", { error: msg.error }, msg.exitCode);
maybeDeferredHotReloadDrain(workflowName);
return;
}
if (msg.type === "error") {
process.stderr.write(
`[workflow-manager] error from "${workflowName}" worker: ${msg.error}\n`,
);
}
dispatchWorkflowWorkerMessage(workflowName, result.value, {
logStore,
handleThreadEvent,
onWorkflowRoleError: (wf, runId, error, exitCode) => {
const state = states.get(wf);
if (state !== undefined) {
state.active.delete(runId);
dequeueNext(wf);
}
logWorkflowEvent(wf, runId, "failed", { error }, exitCode);
maybeDeferredHotReloadDrain(wf);
},
});
}
function markActiveRunsInterrupted(workflowName: string): void {
@@ -538,67 +340,6 @@ export function createWorkflowManager(
state.active.clear();
}
function handleWorkerExit(
workflowName: string,
code: number | null,
signal: NodeJS.Signals | null,
): void {
const entry = workers.get(workflowName);
if (entry?.draining) {
workers.delete(workflowName);
markActiveRunsInterrupted(workflowName);
if (!stopped && workflowConfig(workflowName) !== null) {
process.stderr.write(
`[workflow-manager] worker for "${workflowName}" drained, respawning\n`,
);
getOrSpawnWorker(workflowName);
}
return;
}
if (entry?.stopping) {
workers.delete(workflowName);
const state = states.get(workflowName);
if (state !== undefined) {
state.active.clear();
}
return;
}
const summary = formatChildExitSummary(code, signal);
const stderrExtra = entry !== undefined ? formatCapturedStderrTail(entry.stderrTail.value) : "";
process.stderr.write(
`[workflow-manager] worker for "${workflowName}" exited (${summary})${stderrExtra}\n`,
);
handleWorkerCrash(workflowName);
}
function getOrSpawnWorker(workflowName: string): WorkerEntry {
const existing = workers.get(workflowName);
if (existing !== undefined && existing.process.exitCode === null) {
return existing;
}
const stderrTail = { value: "" };
const child = spawnWorkflowWorker(nerveRoot, workflowName, workerScript, stderrTail);
child.on("message", (raw: unknown) => {
handleWorkerMessage(workflowName, raw);
});
child.on("exit", (code, signal) => {
handleWorkerExit(workflowName, code, signal ?? null);
});
const entry: WorkerEntry = {
workflowName,
process: child,
stopping: false,
draining: false,
stderrTail,
};
workers.set(workflowName, entry);
return entry;
}
function killThread(runId: string): boolean {
for (const [workflowName, state] of states) {
const queueIdx = state.queue.findIndex((q) => q.runId === runId);
@@ -609,10 +350,8 @@ export function createWorkflowManager(
}
if (state.active.has(runId)) {
const workerEntry = workers.get(workflowName);
if (workerEntry !== undefined) {
sendKillThread(workerEntry.process, runId);
}
const msg: KillThreadMessage = { type: "kill-thread", runId };
sendToWorker(workflowName, msg);
return true;
}
}
@@ -663,7 +402,7 @@ export function createWorkflowManager(
state.queue.push({ runId, prompt, maxRounds, dryRun });
logWorkflowEvent(workflowName, runId, "queued");
process.stderr.write(
`[workflow-manager] queued thread for "${workflowName}" runId "${runId}" (queue length: ${state.queue.length})\n`,
`[workflow-manager] queued thread for "${workflowName}" runId "${runId}" (queue length: ${String(state.queue.length)})\n`,
);
}
@@ -707,35 +446,29 @@ export function createWorkflowManager(
config = newConfig;
}
/**
* Default drain timeout must be at least WORKER_SHUTDOWN_TIMEOUT_MS so the worker
* has enough time to finish in-flight threads before the parent force-kills it.
*/
const DEFAULT_DRAIN_TIMEOUT_MS = Math.max(30_000, WORKER_SHUTDOWN_TIMEOUT_MS + 5_000);
async function drainAndRespawn(
workflowName: string,
drainTimeoutMs: number = DEFAULT_DRAIN_TIMEOUT_MS,
): Promise<void> {
const entry = workers.get(workflowName);
if (entry === undefined) {
// No active worker — nothing to drain
if (!trackedWorkflows.has(workflowName)) {
return;
}
entry.draining = true;
// Send shutdown without setting stopping=true (so the exit handler uses the draining branch)
if (entry.process.connected) {
const msg: ShutdownMessage = { type: "shutdown" };
try {
entry.process.send(msg);
} catch {
// IPC closed
const shutdownMs = Math.max(drainTimeoutMs, WORKER_SHUTDOWN_TIMEOUT_MS);
hotReloadEvicting.add(workflowName);
try {
await runtime.evict(workflowName, { shutdownTimeoutMs: shutdownMs });
trackedWorkflows.delete(workflowName);
if (!stopped && workflowConfig(workflowName) !== null) {
trackedWorkflows.add(workflowName);
await runtime.start(workflowName);
}
} finally {
hotReloadEvicting.delete(workflowName);
}
await waitForExit(entry.process, drainTimeoutMs);
// The exit handler (draining branch) will respawn the worker automatically
}
function drainWhenIdle(workflowName: string): void {
const state = states.get(workflowName);
const hasActiveRuns = state !== undefined && state.active.size > 0;
@@ -761,20 +494,17 @@ export function createWorkflowManager(
pendingDrains.add(workflowName);
process.stderr.write(
`[workflow-manager] deferring hot-reload for "${workflowName}" until ${state.active.size} active run(s) complete\n`,
`[workflow-manager] deferring hot-reload for "${workflowName}" until ${String(state.active.size)} active run(s) complete\n`,
);
}
async function stop(): Promise<void> {
stopped = true;
pendingDrains.clear();
const exitPromises: Promise<void>[] = [];
for (const entry of workers.values()) {
sendShutdown(entry.process, entry);
exitPromises.push(waitForExit(entry.process, 5000));
}
await Promise.all(exitPromises);
workers.clear();
hotReloadEvicting.clear();
crashRecoveryPending.clear();
await runtime.shutdown();
trackedWorkflows.clear();
}
return {
+2 -2
View File
@@ -30,7 +30,7 @@ import type {
WorkerToParentMessage,
} from "./ipc.js";
import { parseParentMessage } from "./ipc.js";
import { ignoreSessionBroadcastSignals } from "./worker-fork-support.js";
import { ignoreSessionBroadcastSignals } from "./worker-signals.js";
// ---------------------------------------------------------------------------
// IPC helpers
@@ -303,7 +303,7 @@ async function loadWorkflowDefinition(
nerveRoot: string,
workflowName: string,
): Promise<WorkflowDefinition<RoleMeta>> {
const indexPath = resolve(join(nerveRoot, "workflows", workflowName, "dist", "index.js"));
const indexPath = resolve(join(nerveRoot, "dist", "workflows", workflowName, "index.js"));
if (!existsSync(indexPath)) {
throw new Error(
`Workflow definition not found for "${workflowName}". Expected:\n ${indexPath}`,
@@ -10,7 +10,7 @@ export type CoderMeta = z.infer<typeof coderMetaSchema>;
export function coderPrompt({ threadId }: { threadId: string }): string {
return `Read the workflow thread for the planner's sense design and any tester feedback: \`nerve thread ${threadId}\`
Read the nerve-dev skill for sense file structure and conventions: \`cat node_modules/@uncaged/nerve-skills/nerve-dev/SKILL.md\`
Read \`cat AGENT.md\` from the repository root, then \`CONVENTIONS.md\` and \`.knowledge/sense.md\` if present.
## Your task
@@ -20,21 +20,21 @@ Implement (or fix) the sense the planner designed. If there is tester feedback i
You do NOT need to finish everything in one pass. You may return \`done: false\` to continue in the next iteration.
## File structure for each sense
## File structure for each sense (flat workspace)
- \`senses/<name>/src/index.ts\` — TypeScript compute source; import schema as \`./schema.ts\`
The workspace has **one root** \`package.json\` and root \`scripts/build.mjs\` (or equivalent) that bundles all senses. There is **no** per-sense \`package.json\`. Bundled output is \`dist/senses/<name>/index.js\` after a root build.
- \`senses/<name>/src/index.ts\` — compute entry; import schema as \`./schema.ts\`
- \`senses/<name>/src/schema.ts\` — Drizzle schema (TypeScript)
- \`senses/<name>/migrations/\`Drizzle migration files (at sense root, not inside src/)
- \`senses/<name>/package.json\` — with esbuild build script
- \`senses/<name>/index.js\` — bundled output generated by \`pnpm build\` (do NOT edit by hand)
- \`senses/<name>/migrations/\`SQL migration files (at sense root, not inside \`src/\`)
Look at existing senses for the package.json template and patterns.
Look at existing senses for patterns.
## When to return done: true
Return \`done: true\` ONLY when ALL of the following are true:
- All required files are created
- \`pnpm install --no-cache && pnpm build\` succeeds (run it!)
- From the **workspace root**, \`pnpm run build\` or \`npm run build\` succeeds (run it!) and \`dist/senses/<name>/index.js\` exists
- \`nerve.yaml\` is updated with the sense config
Return \`done: false\` if you made progress but there is still work to do.`;
@@ -12,7 +12,7 @@ export function plannerPrompt({ threadId }: { threadId: string }): string {
return `You are planning a new Nerve sense.
Read the workflow thread for the user's request: \`nerve thread ${threadId}\`
Read the nerve-dev skill for sense conventions: \`cat node_modules/@uncaged/nerve-skills/nerve-dev/SKILL.md\`
Read the workspace guide: \`cat AGENT.md\` from the repository root (created by \`nerve init\`). Also read \`CONVENTIONS.md\` and \`.knowledge/sense.md\` if present. Optional skills live under \`.cursor/skills/\`.
Also look at existing senses in the \`senses/\` directory for patterns.
Pick a good kebab-case name for this sense. Produce a PLAN (not code) in markdown:
@@ -17,21 +17,20 @@ export function testerPrompt({
**IMPORTANT: The Nerve workspace is at \`${nerveRoot}\`. All paths below are relative to this directory. Always \`cd ${nerveRoot}\` first.**
Read the workflow thread for context: \`nerve thread ${threadId}\`
Read the nerve-dev skill for expected file structure: \`cat ${nerveRoot}/node_modules/@uncaged/nerve-skills/nerve-dev/SKILL.md\`
Read \`cat ${nerveRoot}/AGENT.md\`, then \`${nerveRoot}/CONVENTIONS.md\` and \`${nerveRoot}/.knowledge/sense.md\` if they exist.
Verify the full lifecycle in this order:
1. **File check** — all required sense files exist:
1. **File check** — all required sense files exist (no per-sense \`package.json\`):
- \`senses/<name>/src/index.ts\`
- \`senses/<name>/src/schema.ts\`
- \`senses/<name>/migrations/\`
- \`senses/<name>/package.json\`
2. **Build** — run inside the sense directory:
2. **Build** — from the workspace root:
\`\`\`
cd ${nerveRoot}/senses/<name> && pnpm install --no-cache && pnpm build
cd ${nerveRoot} && pnpm run build
\`\`\`
Must produce \`index.js\` at sense root without errors.
(or \`npm run build\` per root \`package.json\`.) Must produce \`${nerveRoot}/dist/senses/<name>/index.js\` without errors.
3. **Config check** — \`nerve validate\` passes, confirming nerve.yaml is valid.
@@ -10,7 +10,7 @@ export type CoderMeta = z.infer<typeof coderMetaSchema>;
export function coderPrompt({ threadId }: { threadId: string }): string {
return `Read the workflow thread to get the planner's design and any reviewer/tester/committer feedback: \`nerve thread ${threadId}\`
Read the nerve-dev skill for workflow file structure and conventions: \`cat node_modules/@uncaged/nerve-skills/nerve-dev/SKILL.md\`
Read \`cat AGENT.md\` from the repository root, then \`CONVENTIONS.md\` and \`.knowledge/workflow.md\` if present. Optional skills live under \`.cursor/skills/\`.
Also look at existing workflows in the \`workflows/\` directory for patterns.
## Your task
@@ -29,15 +29,13 @@ You do NOT need to finish everything in one pass. You may return \`done: false\`
2. Second pass: implement role logic
3. Third pass: fix build/lint errors
## Workflow file structure
## Workflow file structure (flat workspace)
The workspace has **one root** \`package.json\` and **one** root build (\`pnpm run build\` or \`npm run build\`), implemented by \`scripts/build.mjs\`, which emits bundles under \`dist/workflows/<name>/index.js\`. There is **no** per-workflow \`package.json\` or \`tsconfig.json\`.
Each workflow must have:
- \`workflows/<name>/index.ts\`WorkflowDefinition default export
- \`workflows/<name>/build.ts\` — factory function
- \`workflows/<name>/moderator.ts\` — moderator + meta types
- \`workflows/<name>/roles/<role>.ts\` — meta schema and prompt function per role
- \`workflows/<name>/package.json\` — with esbuild build script
- \`workflows/<name>/tsconfig.json\` — TypeScript config
- \`workflows/<name>/index.ts\`default export \`WorkflowDefinition\` (moderator and meta types typically live here or are imported from co-located modules)
- \`workflows/<name>/roles/<role>.ts\` — one TypeScript file per role (schemas, prompts, \`createRole\` wiring, or plain async role functions)
For **new workflows**, also update \`nerve.yaml\` with \`workflows.<name>\`.
@@ -53,7 +51,7 @@ For **new workflows**, also update \`nerve.yaml\` with \`workflows.<name>\`.
Return \`done: true\` ONLY when ALL of the following are true:
- All changes from the plan are implemented
- \`cd workflows/<name> && pnpm install --no-cache && pnpm build\` succeeds (run it!)
- From the **workspace root**, \`pnpm run build\` or \`npm run build\` succeeds (run it!) so \`dist/workflows/<name>/index.js\` is produced
- No lint or type errors remain
Return \`done: false\` if you made progress but there is still work to do, or if build/lint has errors you plan to fix in the next iteration.`;
@@ -12,18 +12,18 @@ export function plannerPrompt({ threadId }: { threadId: string }): string {
return `You are a Nerve workflow planner. You can **create new workflows** or **modify existing ones**.
Read the workflow thread for the user's request: \`nerve thread ${threadId}\`
Read the nerve-dev skill for workflow conventions: \`cat node_modules/@uncaged/nerve-skills/nerve-dev/SKILL.md\`
Read the workspace guide: \`cat AGENT.md\` from the repository root (created by \`nerve init\`). Also read \`CONVENTIONS.md\` if it exists; if \`.knowledge/workflow.md\` exists (e.g. Nerve monorepo), read it for layout and engine behavior. Optional Cursor skills live under \`.cursor/skills/\`.
List existing workflows: \`ls workflows/\`
## Determine the task type
1. If the user wants to **modify an existing workflow** — read its current code (\`cat workflows/<name>/moderator.ts\`, \`cat workflows/<name>/build.ts\`, \`ls workflows/<name>/roles/\`, etc.) and understand its current structure before planning changes.
1. If the user wants to **modify an existing workflow** — read its current code (\`cat workflows/<name>/index.ts\`, \`ls workflows/<name>/roles/\`, \`cat workflows/<name>/roles/<role>.ts\`, etc.) and understand its current structure before planning changes.
2. If the user wants to **create a new workflow** — look at existing workflows in \`workflows/\` for patterns to follow.
## Produce a PLAN (not code) in markdown
For **new workflows**:
- Workflow name (kebab-case)
- Workflow name — **verb-first** kebab-case phrase (e.g. \`review-pull-request\`, \`deploy-staging\`), not a bare noun
- Roles list (name, purpose, tool)
- Flow transitions / moderator routing logic
- Validation loops design
@@ -17,24 +17,22 @@ export function testerPrompt({
**IMPORTANT: The Nerve workspace is at \`${nerveRoot}\`. All paths below are relative to this directory. Always \`cd ${nerveRoot}\` first.**
Read the workflow thread for context: \`nerve thread ${threadId}\`
Read the nerve-dev skill for expected file structure: \`cat ${nerveRoot}/node_modules/@uncaged/nerve-skills/nerve-dev/SKILL.md\`
Read \`cat ${nerveRoot}/AGENT.md\`, then \`${nerveRoot}/CONVENTIONS.md\` and \`${nerveRoot}/.knowledge/workflow.md\` if they exist.
Get the workflow name from the thread (the planner's output).
Verify the full lifecycle in this order:
1. **File check** all required workflow files exist (under \`${nerveRoot}/\`):
1. **File check** all required workflow sources exist (under \`${nerveRoot}/\`):
- \`workflows/<name>/index.ts\`
- \`workflows/<name>/build.ts\`
- \`workflows/<name>/moderator.ts\`
- \`workflows/<name>/roles/\` with one \`.ts\` file per role
- \`workflows/<name>/package.json\`
- \`workflows/<name>/roles/\` with one \`.ts\` file per role (flat files, not per-role packages)
- **No** \`workflows/<name>/package.json\` or \`tsconfig.json\` expected
2. **Build** run inside the workflow directory:
2. **Build** from the workspace root:
\`\`\`
cd ${nerveRoot}/workflows/<name> && pnpm install --no-cache && pnpm build
cd ${nerveRoot} && pnpm run build
\`\`\`
Must produce \`dist/index.js\` without errors.
(or \`npm run build\` if that is what the root \`package.json\` defines.) Must produce \`${nerveRoot}/dist/workflows/<name>/index.js\` without errors.
3. **Config check** \`cd ${nerveRoot} && nerve validate\` passes, confirming nerve.yaml is valid.
@@ -0,0 +1,54 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { START, type ThreadContext } from "@uncaged/nerve-core";
import { createLlmAdapter } from "../create-llm-adapter.js";
function makeCtx(threadId: string, userContent: string): ThreadContext {
return {
threadId,
start: {
role: START,
content: userContent,
meta: { maxRounds: 10, threadId },
timestamp: 1,
},
steps: [],
};
}
describe("createLlmAdapter", () => {
afterEach(() => {
vi.unstubAllGlobals();
vi.restoreAllMocks();
});
it("posts system + user (start.content) and returns assistant text", async () => {
const fetchMock = vi.fn().mockResolvedValue({
ok: true,
status: 200,
text: async () =>
JSON.stringify({
choices: [{ message: { content: "model reply" } }],
}),
});
vi.stubGlobal("fetch", fetchMock);
const provider = { baseUrl: "https://api.example/v1", apiKey: "k", model: "m" };
const adapter = createLlmAdapter(provider);
const out = await adapter(makeCtx("t1", "trigger text"), "system instructions");
expect(out).toBe("model reply");
expect(fetchMock).toHaveBeenCalledTimes(1);
const [, init] = fetchMock.mock.calls[0] as [string, RequestInit];
const body = JSON.parse(init.body as string) as {
model: string;
messages: Array<{ role: string; content: string }>;
};
expect(body.model).toBe("m");
expect(body.messages).toEqual([
{ role: "system", content: "system instructions" },
{ role: "user", content: "trigger text" },
]);
});
});
@@ -0,0 +1,22 @@
import type { AgentFn, ThreadContext } from "@uncaged/nerve-core";
import { formatLlmError } from "./shared/format-error.js";
import { chatCompletionText } from "./shared/llm-chat.js";
import type { LlmProvider } from "./shared/llm-extract.js";
/** Single-turn chat adapter: system comes from `createRole` prompt; user is the thread start frame. */
export function createLlmAdapter(provider: LlmProvider): AgentFn {
return async (ctx: ThreadContext, systemPrompt: string) => {
const result = await chatCompletionText({
provider,
messages: [
{ role: "system", content: systemPrompt },
{ role: "user", content: ctx.start.content },
],
});
if (!result.ok) {
throw new Error(`llm: ${formatLlmError(result.error)}`);
}
return result.value;
};
}
+2 -19
View File
@@ -1,9 +1,6 @@
// Primary API — role factory templates
export { createLlmAdapter } from "./create-llm-adapter.js";
export { createRole, type LlmExtractorConfig } from "./create-role.js";
export { createCursorRole } from "./role-cursor.js";
export { createHermesRole } from "./role-hermes.js";
export { createLlmRole } from "./role-llm.js";
export { createReActRole } from "./role-react.js";
export { llmExtract, llmExtractWithRetry } from "./shared/llm-extract.js";
export { mergeExtractConfig, type ExtractConfigLayer } from "./shared/merge-extract-config.js";
export {
@@ -37,19 +34,5 @@ export {
} from "@uncaged/nerve-core";
export type { LlmError, LlmProvider } from "./shared/llm-extract.js";
export { isDryRun } from "./role-types.js";
export type {
CliPromptFn,
CursorRoleDefaults,
CursorRoleRequired,
HermesRoleDefaults,
HermesRoleRequired,
LlmMessage,
LlmPromptFn,
LlmRoleDefaults,
LlmRoleRequired,
MetaExtractConfig,
ReActRoleDefaults,
ReActRoleRequired,
ReActTool,
} from "./role-types.js";
export type { LlmMessage, MetaExtractConfig } from "./role-types.js";
export type { LlmChatError } from "./shared/llm-chat.js";