Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 0455f928f5 | |||
| dc4454d23e | |||
| 082d2e72f2 | |||
| fbf63e0266 | |||
| 7d89e8ab61 | |||
| 06b1e3d785 | |||
| f828ebc28b | |||
| 809a11afe3 |
+22
-1
@@ -28,8 +28,29 @@ For long-running or incremental agent outputs:
|
||||
|---------|---------|------|
|
||||
| `@uncaged/nerve-adapter-cursor` | `cursorAdapter` / `createCursorAdapter()` | cursor-agent CLI |
|
||||
| `@uncaged/nerve-adapter-hermes` | `hermesAdapter` / `createHermesAdapter()` | hermes chat CLI |
|
||||
| `@uncaged/nerve-workflow-utils` | `createLlmAdapter(provider)` | OpenAI-compatible HTTP chat (single-turn) |
|
||||
|
||||
Each exports a **default instance** (sensible defaults) and a **factory** for custom config.
|
||||
The Cursor and Hermes adapter packages each export a **default instance** (sensible defaults) and a **factory** for custom config. `createLlmAdapter` is a factory on `@uncaged/nerve-workflow-utils` only.
|
||||
|
||||
## createLlmAdapter
|
||||
|
||||
`createLlmAdapter` builds an `AgentFn` from an `LlmProvider` (`baseUrl`, `apiKey`, `model`). One chat completion per role step: **system** = the string passed by `createRole` (your prompt); **user** = `ctx.start.content` (the thread’s start frame). On failure it throws with a formatted LLM error.
|
||||
|
||||
```ts
|
||||
import { createLlmAdapter, createRole } from "@uncaged/nerve-workflow-utils";
|
||||
import { z } from "zod";
|
||||
|
||||
const metaSchema = z.object({ ok: z.boolean() });
|
||||
|
||||
const planner = createRole(
|
||||
createLlmAdapter({ baseUrl: "https://api.example.com/v1", apiKey: "…", model: "gpt-4o-mini" }),
|
||||
"You are a planner…",
|
||||
metaSchema,
|
||||
extractConfig,
|
||||
);
|
||||
```
|
||||
|
||||
Use this when you want a role backed by an HTTP LLM instead of a subprocess CLI adapter.
|
||||
|
||||
## Usage in Workflows
|
||||
|
||||
|
||||
@@ -2,6 +2,20 @@
|
||||
|
||||
Stateful multi-step execution driven by Roles and a Moderator.
|
||||
|
||||
## Workspace Layout (authoring)
|
||||
|
||||
User Nerve workspaces use a **flat** build: one root `package.json`, one root bundle script (typically `scripts/build.mjs` wired from `scripts.build`), and **no** per-workflow `package.json` or `tsconfig.json`.
|
||||
|
||||
| Location | Purpose |
|
||||
|----------|---------|
|
||||
| `workflows/<name>/index.ts` | Default export: `WorkflowDefinition` (moderator + role map). |
|
||||
| `workflows/<name>/roles/<role>.ts` | One module per role — schemas, prompts, `createRole` factories, or hand-written async role functions. |
|
||||
| `dist/workflows/<name>/index.js` | Emit of the root build; this is what the daemon loads. |
|
||||
|
||||
**Naming:** Workflow ids should be **verb-first** kebab-case phrases (e.g. `deploy-staging`, `scan-dependencies`), not opaque nouns alone.
|
||||
|
||||
Senses follow the same flat pattern: `senses/<name>/src/*.ts`, `migrations/`, root build → `dist/senses/<name>/index.js`. See `.knowledge/sense.md`.
|
||||
|
||||
## Core Concepts
|
||||
|
||||
- **Workflow** — definition with concurrency strategy
|
||||
|
||||
@@ -0,0 +1,505 @@
|
||||
---
|
||||
name: nerve
|
||||
version: 0.5.0
|
||||
description: >
|
||||
Nerve — AI agent 观测引擎。掌握 nerve 的核心概念、CLI 操作、sense/workflow 开发。
|
||||
加载此 skill 后你可以:查看系统状态、监控 sense、触发 workflow、开发新 sense 和 workflow。
|
||||
metadata:
|
||||
hermes:
|
||||
tags: [nerve, sense, workflow, monitoring, agent-kernel]
|
||||
homepage: https://git.shazhou.work/uncaged/nerve
|
||||
---
|
||||
|
||||
# Nerve — AI Agent 观测引擎
|
||||
|
||||
Nerve 是一个轻量级观测引擎守护进程。它持续观测外部状态,通过声明式规则响应变化,编排多步骤工作流。
|
||||
|
||||
## 核心架构
|
||||
|
||||
```
|
||||
External World → Sense → Signal → Workflow → Log
|
||||
```
|
||||
|
||||
| 概念 | 说明 |
|
||||
|------|------|
|
||||
| **Sense** | 观测函数,`compute()` 采样或推导数据。返回非 null 则发出 Signal,可选触发 Workflow。每个 Sense 有独立 SQLite 数据库。 |
|
||||
| **Signal** | Sense 返回非 null 时发出的通知。纯事实,无意图。通过内存 Signal Bus 分发,不持久化。 |
|
||||
| **Workflow** | 有状态的多步骤执行。包含 Role(有副作用的执行者)和 Moderator(纯路由器)。每个实例是一个 Thread,有唯一 runId。 |
|
||||
| **Log** | 不可变审计日志。记录执行、状态转换、错误。不能触发 Sense(防止反馈循环)。 |
|
||||
| **Engine** | 内核,持有 Signal Bus、Process Manager、Workflow Manager。不直接加载用户代码。 |
|
||||
| **Daemon** | 引擎运行时,作为后台进程运行。 |
|
||||
|
||||
**关键规则:**
|
||||
- 因果链单向:External → Sense → Signal → Workflow + Log
|
||||
- 进程隔离:每个 Sense group 一个 worker(长期),每个 Workflow 类型一个 worker(按需)
|
||||
- 两个扩展点:Sense(观测什么 + 何时)、Workflow(做什么)
|
||||
|
||||
## 工作区结构
|
||||
|
||||
```
|
||||
~/.uncaged-nerve/ # 默认工作区(nerve init 创建)
|
||||
├── nerve.yaml # 核心配置
|
||||
├── senses/
|
||||
│ └── <name>/
|
||||
│ ├── src/index.ts # exports compute() + table
|
||||
│ ├── src/schema.ts # drizzle 表定义
|
||||
│ └── migrations/ # SQL 迁移
|
||||
├── workflows/
|
||||
│ └── <name>/
|
||||
│ ├── index.ts # exports WorkflowDefinition
|
||||
│ └── roles/<role>/
|
||||
│ ├── index.ts # role 实现
|
||||
│ └── prompt.md # 可选 system prompt
|
||||
└── data/ # 运行时数据(SQLite、blobs)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## CLI 完整参考
|
||||
|
||||
全局选项:`--host <host:port>`(连接远程 daemon)、`--api-token <secret>`(Bearer 认证)
|
||||
|
||||
### 初始化与脚手架
|
||||
|
||||
```bash
|
||||
nerve init # 初始化工作区
|
||||
nerve init --from <git-url> # 从 git 仓库克隆工作区
|
||||
nerve init workspace # 只初始化工作区结构
|
||||
|
||||
nerve create sense <name> # 创建 sense 脚手架
|
||||
nerve create sense <name> --force # 覆盖已有
|
||||
nerve create workflow <name> # 创建 workflow 脚手架
|
||||
nerve create workflow <name> --force
|
||||
|
||||
nerve validate # 验证 nerve.yaml 配置
|
||||
```
|
||||
|
||||
### Daemon 管理
|
||||
|
||||
```bash
|
||||
nerve daemon start # 启动后台 daemon
|
||||
nerve daemon start --port 3000 # 指定 HTTP API 端口
|
||||
nerve daemon stop # 停止 daemon
|
||||
nerve daemon restart # 重启
|
||||
nerve daemon status # 查看状态
|
||||
nerve daemon logs # 查看日志
|
||||
nerve daemon logs --follow # 实时日志
|
||||
nerve daemon logs --n 50 # 最近 50 行
|
||||
|
||||
nerve dev # 前台开发模式(不 fork daemon)
|
||||
nerve dev --port 3000 # 指定端口
|
||||
```
|
||||
|
||||
### Sense 操作
|
||||
|
||||
```bash
|
||||
nerve sense list # 列出所有注册的 sense
|
||||
nerve sense trigger <name> # 手动触发 sense 计算
|
||||
nerve sense schema <name> # 查看 sense 数据库表结构
|
||||
nerve sense schema <name> --json # JSON 格式
|
||||
nerve sense query <name> <sql> # 对 sense 数据库执行只读 SQL
|
||||
nerve sense query <name> "SELECT * FROM samples ORDER BY ts DESC LIMIT 10" --json
|
||||
```
|
||||
|
||||
### Workflow 操作
|
||||
|
||||
```bash
|
||||
nerve workflow list # 列出 nerve.yaml 中定义的 workflow
|
||||
nerve workflow status # 查看运行中的 workflow 状态
|
||||
nerve workflow trigger <name> # 触发 workflow
|
||||
nerve workflow trigger <name> --prompt "检查生产环境"
|
||||
nerve workflow trigger <name> --maxRounds 50
|
||||
nerve workflow trigger <name> --dryRun # 干跑模式
|
||||
```
|
||||
|
||||
### Thread(Workflow 执行记录)
|
||||
|
||||
```bash
|
||||
nerve thread list # 列出最近的 workflow 执行
|
||||
nerve thread list --all # 包含已完成/失败的
|
||||
nerve thread list --workflow <name> # 按 workflow 过滤
|
||||
nerve thread list --limit 50 # 最多 50 条
|
||||
|
||||
nerve thread show <runId> # 查看 role 对话轮次
|
||||
nerve thread show <runId> --budget 16000 # 增大输出预算(默认 8000 字符)
|
||||
|
||||
nerve thread inspect <runId> # 查看详情和事件
|
||||
|
||||
nerve thread kill <runId> # 终止运行中/排队中的 thread
|
||||
```
|
||||
|
||||
### Store(日志归档)
|
||||
|
||||
```bash
|
||||
nerve store archive # 导出旧日志到 JSONL 归档
|
||||
nerve store archive --vacuum # 归档后 VACUUM 数据库
|
||||
```
|
||||
|
||||
### Knowledge(知识库)
|
||||
|
||||
```bash
|
||||
nerve knowledge sync # 从 knowledge.yaml 重建索引
|
||||
nerve knowledge query "搜索内容" # 搜索知识库
|
||||
nerve knowledge query "内容" --limit 5
|
||||
nerve knowledge query "内容" -g # 搜索所有注册仓库
|
||||
```
|
||||
|
||||
### Remote(远程 daemon)
|
||||
|
||||
```bash
|
||||
nerve remote add <name> <host:port> --token <secret>
|
||||
nerve remote list
|
||||
nerve remote show <name>
|
||||
nerve remote set-url <name> <host>
|
||||
nerve remote set-token <name> <token>
|
||||
nerve remote remove <name>
|
||||
nerve remote default <name> # 设为默认远程
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## nerve.yaml 配置参考
|
||||
|
||||
```yaml
|
||||
# 引擎全局配置
|
||||
max_rounds: 100 # moderator 最大轮次(默认 100)
|
||||
|
||||
# Sense 配置
|
||||
senses:
|
||||
cpu-usage:
|
||||
group: system # 必填,同 group 的 sense 共享 worker
|
||||
interval: 10s # 轮询间隔(duration: 5s, 10m, 1h)
|
||||
throttle: 5s # 最小计算间隔
|
||||
timeout: 10s # compute 超时
|
||||
grace_period: null # 优雅关闭等待
|
||||
retention: 10000 # _signals 表最大行数(默认 10000)
|
||||
|
||||
system-health:
|
||||
group: derived
|
||||
on: [cpu-usage, disk-usage] # 响应式:被列出的 sense 发出 signal 时触发
|
||||
throttle: null
|
||||
timeout: null
|
||||
|
||||
# Workflow 配置
|
||||
workflows:
|
||||
my-workflow:
|
||||
concurrency: 1 # 必填,并发数
|
||||
overflow: drop # 必填,超并发时处理:drop | queue
|
||||
max_queue: 100 # overflow=queue 时的队列上限(默认 100)
|
||||
|
||||
# HTTP API
|
||||
api:
|
||||
port: 3000 # null = 不启用 HTTP
|
||||
host: "127.0.0.1" # 监听地址
|
||||
token: null # 非 loopback 时必填
|
||||
|
||||
# LLM Extract(可选)
|
||||
extract:
|
||||
provider: anthropic
|
||||
model: claude-sonnet-4-20250514
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Sense 开发指南
|
||||
|
||||
### compute 函数签名
|
||||
|
||||
```typescript
|
||||
import type { LibSQLDatabase } from "drizzle-orm/libsql";
|
||||
import type { ComputeResult, WorkflowTrigger } from "@uncaged/nerve-core";
|
||||
|
||||
export async function compute(
|
||||
db: LibSQLDatabase, // 此 sense 的 Drizzle ORM 数据库
|
||||
peers: Record<string, LibSQLDatabase>, // 其他 sense 的数据库(只读)
|
||||
options: { signal: AbortSignal }, // 超时 abort signal
|
||||
): Promise<ComputeResult<T>>
|
||||
```
|
||||
|
||||
### 返回值
|
||||
|
||||
```typescript
|
||||
// 返回 null = 静默,不发 signal
|
||||
// 返回非 null = 发出 signal,可选触发 workflow
|
||||
type ComputeResult<T> =
|
||||
| null
|
||||
| { signal: T; workflow: WorkflowTrigger | null };
|
||||
|
||||
type WorkflowTrigger = {
|
||||
name: string; // workflow 名称(对应 nerve.yaml 中的 key)
|
||||
maxRounds: number; // moderator 最大轮次
|
||||
prompt: string; // 初始 prompt
|
||||
dryRun: boolean; // 干跑模式
|
||||
};
|
||||
```
|
||||
|
||||
### Sense 模块导出
|
||||
|
||||
```typescript
|
||||
// senses/<name>/src/index.ts
|
||||
import type { SenseModule, ComputeResult } from "@uncaged/nerve-core";
|
||||
import { table } from "./schema.js";
|
||||
|
||||
export async function compute(
|
||||
db: LibSQLDatabase,
|
||||
_peers: Record<string, LibSQLDatabase>,
|
||||
_options: { signal: AbortSignal },
|
||||
): Promise<ComputeResult<number>> {
|
||||
const value = Math.random(); // 替换为真实观测逻辑
|
||||
await db.insert(table).values({ ts: Date.now(), value });
|
||||
return { signal: value, workflow: null };
|
||||
}
|
||||
|
||||
export { table };
|
||||
```
|
||||
|
||||
### Schema 定义
|
||||
|
||||
```typescript
|
||||
// senses/<name>/src/schema.ts
|
||||
import { sqliteTable, integer, real } from "drizzle-orm/sqlite-core";
|
||||
|
||||
export const table = sqliteTable("samples", {
|
||||
ts: integer("ts").notNull(),
|
||||
value: real("value").notNull(),
|
||||
});
|
||||
```
|
||||
|
||||
### 调度方式
|
||||
|
||||
1. **interval 轮询**:`interval: 10s` — 每 10 秒执行一次
|
||||
2. **响应式触发**:`on: [cpu-usage]` — 当 cpu-usage 发出 signal 时触发
|
||||
3. 两者可以组合
|
||||
|
||||
### 调试
|
||||
|
||||
```bash
|
||||
nerve dev # 前台运行,看实时输出
|
||||
nerve sense trigger <name> # 手动触发一次
|
||||
nerve sense query <name> "SELECT * FROM samples ORDER BY ts DESC LIMIT 5"
|
||||
```
|
||||
|
||||
### 完整示例:CPU 监控
|
||||
|
||||
```typescript
|
||||
// senses/cpu-usage/src/schema.ts
|
||||
import { sqliteTable, integer, real } from "drizzle-orm/sqlite-core";
|
||||
|
||||
export const table = sqliteTable("samples", {
|
||||
ts: integer("ts").notNull(),
|
||||
value: real("value").notNull(),
|
||||
});
|
||||
|
||||
// senses/cpu-usage/src/index.ts
|
||||
import os from "node:os";
|
||||
import type { LibSQLDatabase } from "drizzle-orm/libsql";
|
||||
import type { ComputeResult } from "@uncaged/nerve-core";
|
||||
import { table } from "./schema.js";
|
||||
|
||||
export async function compute(
|
||||
db: LibSQLDatabase,
|
||||
_peers: Record<string, LibSQLDatabase>,
|
||||
_options: { signal: AbortSignal },
|
||||
): Promise<ComputeResult<number>> {
|
||||
const oneMin = os.loadavg()[0];
|
||||
await db.insert(table).values({ ts: Date.now(), value: oneMin });
|
||||
return { signal: oneMin, workflow: null };
|
||||
}
|
||||
|
||||
export { table };
|
||||
```
|
||||
|
||||
nerve.yaml:
|
||||
```yaml
|
||||
senses:
|
||||
cpu-usage:
|
||||
group: system
|
||||
interval: 10s
|
||||
throttle: 5s
|
||||
timeout: 10s
|
||||
retention: 10000
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Workflow 开发指南
|
||||
|
||||
### 核心类型
|
||||
|
||||
```typescript
|
||||
import type {
|
||||
WorkflowDefinition,
|
||||
RoleResult,
|
||||
ThreadContext,
|
||||
StartStep,
|
||||
RoleStep,
|
||||
} from "@uncaged/nerve-core";
|
||||
import { END } from "@uncaged/nerve-core";
|
||||
|
||||
// Role:执行者,接收上下文返回结果
|
||||
type Role<Meta> = (ctx: ThreadContext) => Promise<RoleResult<Meta>>;
|
||||
type RoleResult<Meta> = { content: string; meta: Meta };
|
||||
|
||||
// Moderator:路由器,决定下一个 role 或结束
|
||||
type Moderator<M> = (ctx: ThreadContext<M>) => (keyof M & string) | typeof END;
|
||||
|
||||
// ThreadContext:对话上下文
|
||||
type ThreadContext<M = RoleMeta> = {
|
||||
threadId: string;
|
||||
start: StartStep; // 初始 prompt(role: "__start__")
|
||||
steps: RoleStep<M>[]; // 所有 role 的执行记录
|
||||
};
|
||||
|
||||
// WorkflowDefinition:完整定义
|
||||
type WorkflowDefinition<M> = {
|
||||
name: string;
|
||||
roles: { [K in keyof M & string]: Role<M[K]> };
|
||||
moderator: Moderator<M>;
|
||||
};
|
||||
```
|
||||
|
||||
### 基本 Workflow 示例
|
||||
|
||||
```typescript
|
||||
// workflows/example/index.ts
|
||||
import type { RoleResult, ThreadContext, WorkflowDefinition } from "@uncaged/nerve-core";
|
||||
import { END } from "@uncaged/nerve-core";
|
||||
|
||||
type Meta = Record<"main", { round: number }>;
|
||||
|
||||
async function main(ctx: ThreadContext): Promise<RoleResult<{ round: number }>> {
|
||||
const prompt = ctx.start.content;
|
||||
return {
|
||||
content: `处理完成: ${prompt}`,
|
||||
meta: { round: ctx.steps.length },
|
||||
};
|
||||
}
|
||||
|
||||
const workflow: WorkflowDefinition<Meta> = {
|
||||
name: "example",
|
||||
roles: { main },
|
||||
moderator(ctx: ThreadContext<Meta>) {
|
||||
// 执行一次 main 就结束
|
||||
return ctx.steps.length === 0 ? "main" : END;
|
||||
},
|
||||
};
|
||||
|
||||
export default workflow;
|
||||
```
|
||||
|
||||
### 多 Role Workflow 示例
|
||||
|
||||
```typescript
|
||||
import type { WorkflowDefinition, RoleResult, ThreadContext } from "@uncaged/nerve-core";
|
||||
import { END } from "@uncaged/nerve-core";
|
||||
|
||||
type Roles = Record<"planner" | "executor" | "reviewer", { status: string }>;
|
||||
|
||||
async function planner(ctx: ThreadContext): Promise<RoleResult<{ status: string }>> {
|
||||
return { content: "计划: ...", meta: { status: "planned" } };
|
||||
}
|
||||
|
||||
async function executor(ctx: ThreadContext): Promise<RoleResult<{ status: string }>> {
|
||||
return { content: "执行: ...", meta: { status: "executed" } };
|
||||
}
|
||||
|
||||
async function reviewer(ctx: ThreadContext): Promise<RoleResult<{ status: string }>> {
|
||||
return { content: "审核通过", meta: { status: "approved" } };
|
||||
}
|
||||
|
||||
const workflow: WorkflowDefinition<Roles> = {
|
||||
name: "plan-execute-review",
|
||||
roles: { planner, executor, reviewer },
|
||||
moderator(ctx: ThreadContext<Roles>) {
|
||||
if (ctx.steps.length === 0) return "planner";
|
||||
const last = ctx.steps[ctx.steps.length - 1];
|
||||
if (last.role === "planner") return "executor";
|
||||
if (last.role === "executor") return "reviewer";
|
||||
return END;
|
||||
},
|
||||
};
|
||||
|
||||
export default workflow;
|
||||
```
|
||||
|
||||
### Agent 适配器
|
||||
|
||||
Workflow role 可以集成 AI agent。已知适配器 ID:`echo`、`cursor`、`hermes`、`codex`。
|
||||
|
||||
```typescript
|
||||
type AgentFn = (ctx: ThreadContext, systemPrompt: string) => Promise<string>;
|
||||
```
|
||||
|
||||
### Workflow 运行状态
|
||||
|
||||
`queued` → `started` → `completed` | `failed` | `crashed` | `killed` | `interrupted` | `dropped`
|
||||
|
||||
---
|
||||
|
||||
## 日常操作 Pattern
|
||||
|
||||
### 查看系统整体状态
|
||||
|
||||
```bash
|
||||
nerve daemon status # daemon 是否在运行
|
||||
nerve sense list # 所有 sense 及其调度配置
|
||||
nerve workflow status # 运行中的 workflow
|
||||
nerve thread list # 最近的 workflow 执行记录
|
||||
```
|
||||
|
||||
### 检查某个 sense 的历史数据
|
||||
|
||||
```bash
|
||||
nerve sense query cpu-usage "SELECT * FROM samples ORDER BY ts DESC LIMIT 10" --json
|
||||
nerve sense schema cpu-usage # 查看表结构
|
||||
```
|
||||
|
||||
### 手动触发 workflow
|
||||
|
||||
```bash
|
||||
nerve workflow trigger my-workflow --prompt "手动检查"
|
||||
nerve thread list --workflow my-workflow # 查看执行状态
|
||||
nerve thread show <runId> # 查看对话详情
|
||||
```
|
||||
|
||||
### 排查 sense 报错
|
||||
|
||||
```bash
|
||||
nerve daemon logs --follow # 查看实时日志
|
||||
nerve sense trigger <name> # 手动触发看报错
|
||||
nerve dev # 前台模式,更详细的输出
|
||||
```
|
||||
|
||||
### 开发新 sense
|
||||
|
||||
```bash
|
||||
nerve create sense my-sensor # 脚手架
|
||||
# 编辑 senses/my-sensor/src/index.ts 和 schema.ts
|
||||
nerve validate # 验证配置
|
||||
nerve dev # 前台测试
|
||||
nerve sense trigger my-sensor # 单次触发验证
|
||||
nerve sense query my-sensor "SELECT * FROM ..." # 检查数据
|
||||
```
|
||||
|
||||
### 开发新 workflow
|
||||
|
||||
```bash
|
||||
nerve create workflow my-flow # 脚手架
|
||||
# 编辑 workflows/my-flow/index.ts 和 roles/
|
||||
nerve validate # 验证配置
|
||||
nerve workflow trigger my-flow --prompt "测试" --dryRun # 干跑
|
||||
nerve thread show <runId> # 查看执行轨迹
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Pitfalls
|
||||
|
||||
- **Sense 返回值**:返回 `null` 表示静默(不发 signal);返回 `{ signal, workflow }` 才发 signal。不要返回 undefined。
|
||||
- **no optional properties**:nerve 代码规范禁止 `?:`,用 `T | null` 代替。
|
||||
- **函数式风格**:用 `function` + `type`,不用 `class` + `interface`。
|
||||
- **workflow 用 default export**:这是唯一允许 default export 的场景。
|
||||
- **_signals 表**:每个 sense 自动有 `_signals` 表记录 signal 历史,受 `retention` 配置限制。
|
||||
- **peers 只读**:sense 的 `peers` 参数提供其他 sense 数据库的只读访问,不要写入。
|
||||
- **concurrency + overflow**:workflow 必须配置并发策略,否则验证失败。
|
||||
- **moderator 是同步函数**:不要加 async,moderator 是纯路由逻辑,不能有副作用。
|
||||
@@ -205,6 +205,10 @@ describe("e2e init", () => {
|
||||
expect(existsSync(join(nerveRoot, "scripts", "build.mjs"))).toBe(true);
|
||||
expect(existsSync(join(nerveRoot, "biome.json"))).toBe(true);
|
||||
expect(existsSync(join(nerveRoot, ".gitignore"))).toBe(true);
|
||||
expect(existsSync(join(nerveRoot, "AGENT.md"))).toBe(true);
|
||||
const agentMd = readFileSync(join(nerveRoot, "AGENT.md"), "utf8");
|
||||
expect(agentMd).toContain("verb-first");
|
||||
expect(agentMd).toContain("createRole");
|
||||
expect(existsSync(join(nerveRoot, "senses", "cpu-usage", "src", "index.ts"))).toBe(true);
|
||||
expect(existsSync(join(nerveRoot, "senses", "cpu-usage", "src", "schema.ts"))).toBe(true);
|
||||
expect(existsSync(join(nerveRoot, "senses", "cpu-usage", "migrations", "0001_init.sql"))).toBe(
|
||||
|
||||
@@ -127,6 +127,70 @@ node_modules/
|
||||
knowledge.db
|
||||
`;
|
||||
|
||||
/** Generated at workspace root so agents can \`cat AGENT.md\` instead of npm skill paths. */
|
||||
const AGENT_MD = `# Nerve workspace — agent guide
|
||||
|
||||
This file is created by \`nerve init\`. Read it before implementing senses or workflows.
|
||||
|
||||
## Directory layout
|
||||
|
||||
| Path | Purpose |
|
||||
|------|---------|
|
||||
| \`nerve.yaml\` | Senses, workflows, intervals, groups |
|
||||
| \`package.json\` | Single root package — no per-sense/per-workflow packages |
|
||||
| \`scripts/build.mjs\` | Root esbuild step; output under \`dist/\` |
|
||||
| \`senses/<name>/src/index.ts\` | Sense \`compute()\` entry |
|
||||
| \`senses/<name>/src/schema.ts\` | Drizzle SQLite schema (TypeScript) |
|
||||
| \`senses/<name>/migrations/*.sql\` | SQL migrations (next to \`src/\`, not inside it) |
|
||||
| \`workflows/<name>/index.ts\` | Default export: \`WorkflowDefinition\` |
|
||||
| \`workflows/<name>/roles/<role>.ts\` | One TypeScript file per role |
|
||||
| \`dist/senses/<name>/index.js\` | Bundled sense (after build) |
|
||||
| \`dist/workflows/<name>/index.js\` | Bundled workflow (after build) |
|
||||
|
||||
There is **no** \`package.json\` or \`tsconfig.json\` inside individual senses or workflows.
|
||||
|
||||
## Naming
|
||||
|
||||
- **Workflows:** verb-first kebab-case (e.g. \`review-pull-request\`, \`deploy-staging\`). Avoid bare nouns like \`notifications\`.
|
||||
- **Senses:** kebab-case descriptive nouns (e.g. \`cpu-usage\`).
|
||||
|
||||
## Workflow roles — four-tuple pattern
|
||||
|
||||
Wire each role with \`createRole\` from \`@uncaged/nerve-workflow-utils\`:
|
||||
|
||||
1. **Adapter** — \`AgentFn\` (LLM call)
|
||||
2. **Prompt builder** — \`async (ctx: ThreadContext) => string\`
|
||||
3. **Meta schema** — Zod object (routing / structured output from the model)
|
||||
4. **Extractor config** — how JSON meta is parsed from replies
|
||||
|
||||
Keep meta small (often one boolean per role). The **moderator** in \`WorkflowDefinition\` routes between role names.
|
||||
|
||||
## Build commands
|
||||
|
||||
Always run from the **workspace root**:
|
||||
|
||||
\`\`\`bash
|
||||
pnpm run build
|
||||
# or: npm run build
|
||||
\`\`\`
|
||||
|
||||
Fix errors until this succeeds. New workflows must appear under \`workflows/<name>/\` and be registered in \`nerve.yaml\`; new senses under \`senses/<name>/\` with matching \`nerve.yaml\` entries.
|
||||
|
||||
## Coding style (Nerve conventions)
|
||||
|
||||
- Use \`type\`, not \`interface\`; prefer \`function\` over classes (except errors / library requirements).
|
||||
- **Named exports only** — no \`export default\` (exception: \`workflows/<name>/index.ts\` uses default export for the daemon loader).
|
||||
- Nullable fields: \`T | null\`, not TypeScript optional \`?:\`.
|
||||
- No dynamic \`import()\` in workspace code (bundling and tooling assume static imports).
|
||||
- Use \`async\`/\`await\`; use a \`Result\` type for expected failures instead of control-flow try/catch.
|
||||
|
||||
## Extra references (optional)
|
||||
|
||||
- \`CONVENTIONS.md\` — project-specific overrides at repo root.
|
||||
- \`.knowledge/*.md\` — deeper docs when working inside the Nerve monorepo.
|
||||
- \`.cursor/skills/\` — Cursor Agent Skills (\`SKILL.md\` per skill).
|
||||
`;
|
||||
|
||||
const NERVE_SKILLS_MDC = `---
|
||||
description: >-
|
||||
Where Agent Skills live in this Nerve workspace and how to use them with Cursor
|
||||
@@ -362,6 +426,7 @@ async function runInitWorkspace(force: boolean, skipInstall = false): Promise<vo
|
||||
writeFile(join(nerveRoot, "scripts", "build.mjs"), BUILD_MJS);
|
||||
writeFile(join(nerveRoot, "biome.json"), BIOME_JSON);
|
||||
writeFile(join(nerveRoot, ".gitignore"), GITIGNORE);
|
||||
writeFile(join(nerveRoot, "AGENT.md"), AGENT_MD);
|
||||
writeFile(join(nerveRoot, "senses", "cpu-usage", "src", "index.ts"), CPU_INDEX_TS);
|
||||
writeFile(join(nerveRoot, "senses", "cpu-usage", "src", "schema.ts"), CPU_SCHEMA_TS);
|
||||
writeFile(
|
||||
|
||||
@@ -28,6 +28,11 @@ function makeMockChild(pid = 1): MockChild {
|
||||
child.connected = true;
|
||||
child.exitCode = null;
|
||||
child.pid = pid;
|
||||
setImmediate(() => {
|
||||
if (child.connected) {
|
||||
child.emit("message", { type: "ready" });
|
||||
}
|
||||
});
|
||||
child.send = vi.fn((msg: unknown) => {
|
||||
if (
|
||||
msg !== null &&
|
||||
@@ -132,6 +137,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test 1", maxRounds: 10, dryRun: false });
|
||||
mgr.startWorkflow("my-wf", { prompt: "test 2", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
expect(mgr.activeCount("my-wf")).toBe(2);
|
||||
|
||||
// Simulate unexpected exit (not shutdown)
|
||||
@@ -159,6 +165,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
expect(mgr.activeCount("my-wf")).toBe(2);
|
||||
|
||||
const child = mockChildren[0];
|
||||
@@ -183,6 +190,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
expect(mockChildren).toHaveLength(1);
|
||||
|
||||
const child = mockChildren[0];
|
||||
@@ -216,6 +224,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
const firstChild = mockChildren[0];
|
||||
firstChild.exitCode = 1;
|
||||
firstChild.connected = false;
|
||||
@@ -260,6 +269,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
|
||||
|
||||
// Start one thread to fill the concurrency slot (so queued run stays queued on respawn)
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
const firstChild = mockChildren[0];
|
||||
firstChild.exitCode = 1;
|
||||
firstChild.connected = false;
|
||||
@@ -285,6 +295,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
const child = mockChildren[0];
|
||||
const startCall = (child.send as ReturnType<typeof vi.fn>).mock.calls[0];
|
||||
@@ -322,6 +333,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
|
||||
|
||||
const launch = { prompt: "build-docker for myrepo", maxRounds: 10, dryRun: false };
|
||||
mgr.startWorkflow("my-wf", launch);
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
const startedCall = logStore.upsertWorkflowRun.mock.calls.find(
|
||||
(args: any[]) => (args[0] as { type: string }).type === "started",
|
||||
@@ -357,6 +369,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
|
||||
|
||||
// Start one thread to fill the concurrency slot
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
const firstChild = mockChildren[0];
|
||||
|
||||
// Crash once → respawn → crash again → second respawn
|
||||
@@ -398,6 +411,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
const firstChild = mockChildren[0];
|
||||
firstChild.exitCode = 1;
|
||||
firstChild.connected = false;
|
||||
@@ -428,6 +442,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("crash-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
// Crash the worker 6 times in rapid succession (within CRASH_WINDOW_MS = 60s)
|
||||
for (let i = 0; i < 6; i++) {
|
||||
|
||||
@@ -33,6 +33,11 @@ function makeMockChild(pid = 1): MockChild {
|
||||
child.connected = true;
|
||||
child.exitCode = null;
|
||||
child.pid = pid;
|
||||
setImmediate(() => {
|
||||
if (child.connected) {
|
||||
child.emit("message", { type: "ready" });
|
||||
}
|
||||
});
|
||||
child.send = vi.fn((msg: unknown) => {
|
||||
if (
|
||||
msg !== null &&
|
||||
@@ -114,6 +119,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => {
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
expect(mockChildren).toHaveLength(1);
|
||||
|
||||
// Remove workflow from config before drain completes
|
||||
@@ -134,6 +140,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => {
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
expect(mgr.activeCount("my-wf")).toBe(2);
|
||||
|
||||
const drainPromise = mgr.drainAndRespawn("my-wf", 5000);
|
||||
@@ -165,6 +172,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => {
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
expect(mockChildren).toHaveLength(1);
|
||||
|
||||
const drainPromise = mgr.drainAndRespawn("my-wf", 5000);
|
||||
@@ -181,6 +189,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => {
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
expect(mockChildren).toHaveLength(1);
|
||||
|
||||
const drainPromise = mgr.drainAndRespawn("my-wf", 5000);
|
||||
@@ -198,6 +207,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => {
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
const drainPromise = mgr.drainAndRespawn("my-wf", 5000);
|
||||
await vi.runAllTimersAsync();
|
||||
@@ -223,6 +233,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => {
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "first", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
const drainPromise = mgr.drainAndRespawn("my-wf", 5000);
|
||||
await vi.runAllTimersAsync();
|
||||
@@ -230,6 +241,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => {
|
||||
|
||||
// Start a new thread on the fresh worker
|
||||
mgr.startWorkflow("my-wf", { prompt: "second", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
const newChild = mockChildren[1];
|
||||
const startCalls = (newChild.send as ReturnType<typeof vi.fn>).mock.calls.filter(
|
||||
@@ -257,12 +269,13 @@ describe("WorkflowManager — drainWhenIdle (hot reload without interrupting in-
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
it("does not send shutdown while a thread is still active", () => {
|
||||
it("does not send shutdown while a thread is still active", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } });
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
const child = mockChildren[0];
|
||||
|
||||
mgr.drainWhenIdle("my-wf");
|
||||
@@ -282,6 +295,7 @@ describe("WorkflowManager — drainWhenIdle (hot reload without interrupting in-
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
const child = mockChildren[0];
|
||||
const runId = (child.send as ReturnType<typeof vi.fn>).mock.calls[0][0] as { runId: string };
|
||||
|
||||
@@ -311,6 +325,7 @@ describe("WorkflowManager — drainWhenIdle (hot reload without interrupting in-
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "a", maxRounds: 10, dryRun: false });
|
||||
mgr.startWorkflow("my-wf", { prompt: "b", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
const child = mockChildren[0];
|
||||
const sendMock = child.send as ReturnType<typeof vi.fn>;
|
||||
const runIdA = (sendMock.mock.calls[0][0] as { runId: string }).runId;
|
||||
@@ -355,6 +370,7 @@ describe("WorkflowManager — drainWhenIdle (hot reload without interrupting in-
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
const child = mockChildren[0];
|
||||
const runId = (child.send as ReturnType<typeof vi.fn>).mock.calls[0][0] as { runId: string };
|
||||
|
||||
@@ -388,6 +404,7 @@ describe("WorkflowManager — drainWhenIdle (hot reload without interrupting in-
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
const child = mockChildren[0];
|
||||
const runId = (child.send as ReturnType<typeof vi.fn>).mock.calls[0][0] as { runId: string };
|
||||
|
||||
@@ -414,6 +431,7 @@ describe("WorkflowManager — drainWhenIdle (hot reload without interrupting in-
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-wf", { prompt: "once", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
const firstChild = mockChildren[0];
|
||||
const runId = (firstChild.send as ReturnType<typeof vi.fn>).mock.calls[0][0] as {
|
||||
runId: string;
|
||||
@@ -471,6 +489,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
|
||||
|
||||
// Trigger a workflow thread so a worker is spawned
|
||||
kernel.workflowManager.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
// Manually call drainAndRespawn (simulating what kernel does on workflow file change)
|
||||
const drainPromise = kernel.workflowManager.drainAndRespawn("my-wf", 1000);
|
||||
@@ -511,6 +530,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
|
||||
maxRounds: 10,
|
||||
dryRun: false,
|
||||
});
|
||||
await vi.runAllTimersAsync();
|
||||
expect(mockChildren).toHaveLength(1);
|
||||
|
||||
// Reload config without old-wf
|
||||
@@ -551,6 +571,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
|
||||
});
|
||||
|
||||
kernel.workflowManager.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
const workersBefore = mockChildren.length;
|
||||
|
||||
// Reload with updated concurrency — should NOT spawn a new workflow worker
|
||||
@@ -573,6 +594,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
|
||||
// Can now start up to 5 concurrent threads (previously only 1)
|
||||
kernel.workflowManager.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
kernel.workflowManager.startWorkflow("my-wf", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
expect(kernel.workflowManager.activeCount("my-wf")).toBe(3);
|
||||
|
||||
const stopPromise = kernel.stop();
|
||||
|
||||
@@ -194,6 +194,8 @@ describe("kernel + workflowManager integration", () => {
|
||||
});
|
||||
}
|
||||
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
// A workflow worker should be spawned and a start-thread message sent
|
||||
const workflowWorker = mockChildren.find((c) =>
|
||||
(c.send as ReturnType<typeof vi.fn>).mock.calls.some(
|
||||
@@ -252,6 +254,8 @@ describe("kernel + workflowManager integration", () => {
|
||||
});
|
||||
}
|
||||
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
// Find the start-thread call and verify triggerPayload
|
||||
const startThreadCall = mockChildren
|
||||
.flatMap((c) => (c.send as ReturnType<typeof vi.fn>).mock.calls as [unknown][])
|
||||
@@ -306,6 +310,8 @@ describe("kernel + workflowManager integration", () => {
|
||||
});
|
||||
}
|
||||
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
const senseEntries = logStore.append.mock.calls
|
||||
.map((c) => c[0] as { source: string; type: string; refId: string | null })
|
||||
.filter((e) => e.source === "sense" && e.refId === "cpu-usage");
|
||||
@@ -423,6 +429,8 @@ describe("kernel + workflowManager integration", () => {
|
||||
});
|
||||
}
|
||||
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
expect(logStore.upsertWorkflowRun).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ source: "workflow", type: "started" }),
|
||||
expect.objectContaining({ workflow: "log-test-workflow", status: "started" }),
|
||||
@@ -498,6 +506,8 @@ describe("kernel + workflowManager integration", () => {
|
||||
});
|
||||
}
|
||||
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
const startThreadCall = mockChildren
|
||||
.flatMap((c) => (c.send as ReturnType<typeof vi.fn>).mock.calls as [unknown][])
|
||||
.find(
|
||||
@@ -582,6 +592,8 @@ describe("kernel + workflowManager integration", () => {
|
||||
});
|
||||
}
|
||||
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
const startThreadCall = mockChildren
|
||||
.flatMap((c) => (c.send as ReturnType<typeof vi.fn>).mock.calls as [unknown][])
|
||||
.find(
|
||||
@@ -642,6 +654,8 @@ describe("kernel + workflowManager integration", () => {
|
||||
});
|
||||
}
|
||||
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
const stopPromise = kernel.stop();
|
||||
await vi.runAllTimersAsync();
|
||||
await expect(stopPromise).resolves.toBeUndefined();
|
||||
|
||||
@@ -16,6 +16,7 @@ function baseConfig(script: string) {
|
||||
onMessage: vi.fn(),
|
||||
onReady: vi.fn(),
|
||||
onExit: vi.fn(),
|
||||
onCrashLimitReached: null,
|
||||
respawn: {
|
||||
enabled: true,
|
||||
maxCrashes: 6,
|
||||
@@ -84,7 +85,7 @@ describe("createWorkerRuntime", () => {
|
||||
const rt = track(createWorkerRuntime(baseConfig(echoWorkerPath)));
|
||||
await rt.start("k");
|
||||
expect(rt.has("k")).toBe(true);
|
||||
await rt.evict("k");
|
||||
await rt.evict("k", null);
|
||||
expect(rt.has("k")).toBe(false);
|
||||
await rt.shutdown();
|
||||
});
|
||||
@@ -94,7 +95,7 @@ describe("createWorkerRuntime", () => {
|
||||
await rt.start("k");
|
||||
const before = rt.pid("k");
|
||||
expect(before).not.toBeNull();
|
||||
await rt.drain("k");
|
||||
await rt.drain("k", null);
|
||||
const after = rt.pid("k");
|
||||
expect(after).not.toBeNull();
|
||||
expect(after).not.toBe(before);
|
||||
|
||||
@@ -26,6 +26,11 @@ function makeMockChild(pid = 1): MockChild {
|
||||
child.connected = true;
|
||||
child.exitCode = null;
|
||||
child.pid = pid;
|
||||
setImmediate(() => {
|
||||
if (child.connected) {
|
||||
child.emit("message", { type: "ready" });
|
||||
}
|
||||
});
|
||||
child.send = vi.fn((msg: unknown) => {
|
||||
if (
|
||||
msg !== null &&
|
||||
@@ -110,7 +115,7 @@ describe("WorkflowManager", () => {
|
||||
});
|
||||
|
||||
describe("startWorkflow under concurrency limit dispatches thread", () => {
|
||||
it("forks a worker and sends start-thread when active < concurrency", () => {
|
||||
it("forks a worker and sends start-thread when active < concurrency", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeConfig({
|
||||
"my-workflow": { concurrency: 2, overflow: "drop" },
|
||||
@@ -118,6 +123,7 @@ describe("WorkflowManager", () => {
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-workflow", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
expect(mockChildren).toHaveLength(1);
|
||||
expect(mockChildren[0].send).toHaveBeenCalledWith(
|
||||
@@ -126,7 +132,7 @@ describe("WorkflowManager", () => {
|
||||
expect(mgr.activeCount("my-workflow")).toBe(1);
|
||||
});
|
||||
|
||||
it("reuses the same worker for a second thread under the limit", () => {
|
||||
it("reuses the same worker for a second thread under the limit", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeConfig({
|
||||
"my-workflow": { concurrency: 3, overflow: "drop" },
|
||||
@@ -135,6 +141,7 @@ describe("WorkflowManager", () => {
|
||||
|
||||
mgr.startWorkflow("my-workflow", { prompt: "test 1", maxRounds: 10, dryRun: false });
|
||||
mgr.startWorkflow("my-workflow", { prompt: "test 2", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
// Only one forked child — worker is reused
|
||||
expect(mockChildren).toHaveLength(1);
|
||||
@@ -142,7 +149,7 @@ describe("WorkflowManager", () => {
|
||||
expect(mgr.activeCount("my-workflow")).toBe(2);
|
||||
});
|
||||
|
||||
it("logs a 'started' event for each dispatched thread", () => {
|
||||
it("logs a 'started' event for each dispatched thread", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeConfig({
|
||||
"my-workflow": { concurrency: 2, overflow: "drop" },
|
||||
@@ -150,6 +157,7 @@ describe("WorkflowManager", () => {
|
||||
const mgr = createWorkflowManager("/nerve-root", config, logStore);
|
||||
|
||||
mgr.startWorkflow("my-workflow", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
expect(logStore.upsertWorkflowRun).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ source: "workflow", type: "started" }),
|
||||
@@ -159,7 +167,7 @@ describe("WorkflowManager", () => {
|
||||
});
|
||||
|
||||
describe("startWorkflow at limit with drop overflow drops the request", () => {
|
||||
it("does NOT send start-thread when at concurrency limit with overflow=drop", () => {
|
||||
it("does NOT send start-thread when at concurrency limit with overflow=drop", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeConfig({
|
||||
"drop-wf": { concurrency: 1, overflow: "drop" },
|
||||
@@ -169,6 +177,7 @@ describe("WorkflowManager", () => {
|
||||
mgr.startWorkflow("drop-wf", { prompt: "first", maxRounds: 10, dryRun: false });
|
||||
// now at limit — second call should be dropped
|
||||
mgr.startWorkflow("drop-wf", { prompt: "second", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
expect(mgr.activeCount("drop-wf")).toBe(1);
|
||||
expect(mgr.queueLength("drop-wf")).toBe(0);
|
||||
@@ -254,7 +263,7 @@ describe("WorkflowManager", () => {
|
||||
});
|
||||
|
||||
describe("completing a thread dequeues the next one", () => {
|
||||
it("dispatches the next queued thread when the active thread sends completed", () => {
|
||||
it("dispatches the next queued thread when the active thread sends completed", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeConfig({
|
||||
"queue-wf": { concurrency: 1, overflow: "queue", maxQueue: 5 },
|
||||
@@ -263,6 +272,7 @@ describe("WorkflowManager", () => {
|
||||
|
||||
mgr.startWorkflow("queue-wf", { prompt: "first", maxRounds: 10, dryRun: false });
|
||||
mgr.startWorkflow("queue-wf", { prompt: "second", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
expect(mgr.activeCount("queue-wf")).toBe(1);
|
||||
expect(mgr.queueLength("queue-wf")).toBe(1);
|
||||
@@ -289,7 +299,7 @@ describe("WorkflowManager", () => {
|
||||
);
|
||||
});
|
||||
|
||||
it("dispatches next queued thread when active thread sends failed", () => {
|
||||
it("dispatches next queued thread when active thread sends failed", async () => {
|
||||
const logStore = makeLogStore();
|
||||
const config = makeConfig({
|
||||
"queue-wf": { concurrency: 1, overflow: "queue", maxQueue: 5 },
|
||||
@@ -298,6 +308,7 @@ describe("WorkflowManager", () => {
|
||||
|
||||
mgr.startWorkflow("queue-wf", { prompt: "first", maxRounds: 10, dryRun: false });
|
||||
mgr.startWorkflow("queue-wf", { prompt: "second", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
const child = mockChildren[0];
|
||||
const firstRunId = (child.send as ReturnType<typeof vi.fn>).mock.calls[0][0].runId as string;
|
||||
@@ -325,6 +336,7 @@ describe("WorkflowManager", () => {
|
||||
|
||||
mgr.startWorkflow("wf-a", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
mgr.startWorkflow("wf-b", { prompt: "test", maxRounds: 10, dryRun: false });
|
||||
await vi.runAllTimersAsync();
|
||||
|
||||
// Two distinct workers should have been forked
|
||||
expect(mockChildren).toHaveLength(2);
|
||||
|
||||
@@ -66,6 +66,7 @@ export function createSenseWorkerPool(options: SenseWorkerPoolOptions): SenseWor
|
||||
onReady: (_key, msg) => {
|
||||
options.onWorkerMessage(msg);
|
||||
},
|
||||
onCrashLimitReached: null,
|
||||
onExit: (group, code, signal) => {
|
||||
const sig =
|
||||
signal === null || signal === undefined || signal === ""
|
||||
@@ -102,13 +103,13 @@ export function createSenseWorkerPool(options: SenseWorkerPoolOptions): SenseWor
|
||||
return;
|
||||
}
|
||||
options.onBeforeGroupRestart(group);
|
||||
await runtime.drain(group);
|
||||
await runtime.drain(group, null);
|
||||
}
|
||||
|
||||
function evictGroup(group: string): void {
|
||||
trackedGroups.delete(group);
|
||||
evicting.add(group);
|
||||
void runtime.evict(group).finally(() => {
|
||||
void runtime.evict(group, null).finally(() => {
|
||||
evicting.delete(group);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -8,6 +8,10 @@ import { isPlainRecord } from "@uncaged/nerve-core";
|
||||
|
||||
const STDERR_TAIL_MAX_CHARS = 2048;
|
||||
|
||||
export type WorkerDrainOpts = {
|
||||
shutdownTimeoutMs: number | null;
|
||||
};
|
||||
|
||||
export type WorkerRuntimeConfig<K extends string> = {
|
||||
script: string;
|
||||
argsForKey: (key: K) => string[];
|
||||
@@ -16,6 +20,8 @@ export type WorkerRuntimeConfig<K extends string> = {
|
||||
onMessage: (key: K, msg: unknown) => void;
|
||||
onReady: (key: K, msg: unknown) => void;
|
||||
onExit: (key: K, code: number | null, signal: string | null) => void;
|
||||
/** Invoked when automatic respawn is skipped because `maxCrashes` was exceeded in `windowMs`. */
|
||||
onCrashLimitReached: ((key: K) => void) | null;
|
||||
respawn: {
|
||||
enabled: boolean;
|
||||
maxCrashes: number;
|
||||
@@ -32,8 +38,8 @@ export type WorkerRuntime<K extends string> = {
|
||||
/** When the worker is already ready and IPC-connected, sends synchronously (returns true). Otherwise false — caller may fall back to `send`. */
|
||||
trySendSync: (key: K, msg: unknown) => boolean;
|
||||
start: (key: K) => Promise<void>;
|
||||
evict: (key: K) => Promise<void>;
|
||||
drain: (key: K) => Promise<void>;
|
||||
evict: (key: K, opts: WorkerDrainOpts | null) => Promise<void>;
|
||||
drain: (key: K, opts: WorkerDrainOpts | null) => Promise<void>;
|
||||
shutdown: () => Promise<void>;
|
||||
has: (key: K) => boolean;
|
||||
/** True when a child exists but IPC is disconnected (legacy pool skipped sends in this case). */
|
||||
@@ -264,7 +270,14 @@ export function createWorkerRuntime<K extends string>(
|
||||
await waitForReady(slot, config.shutdownTimeoutMs);
|
||||
}
|
||||
|
||||
async function gracefulStop(slot: WorkerSlot<K>): Promise<void> {
|
||||
function resolveShutdownTimeoutMs(opts: WorkerDrainOpts | null): number {
|
||||
if (opts !== null && opts.shutdownTimeoutMs !== null) {
|
||||
return opts.shutdownTimeoutMs;
|
||||
}
|
||||
return config.shutdownTimeoutMs;
|
||||
}
|
||||
|
||||
async function gracefulStop(slot: WorkerSlot<K>, shutdownTimeoutMs: number): Promise<void> {
|
||||
if (slot.child === null) {
|
||||
return;
|
||||
}
|
||||
@@ -276,7 +289,7 @@ export function createWorkerRuntime<K extends string>(
|
||||
} catch {
|
||||
// IPC channel may have closed between null-check and send
|
||||
}
|
||||
await waitForChildExit(child, config.shutdownTimeoutMs);
|
||||
await waitForChildExit(child, shutdownTimeoutMs);
|
||||
}
|
||||
|
||||
async function handleUnexpectedCrashRecovery(slot: WorkerSlot<K>): Promise<void> {
|
||||
@@ -295,6 +308,9 @@ export function createWorkerRuntime<K extends string>(
|
||||
console.error(
|
||||
`[WorkerRuntime] worker "${String(slot.key)}" exceeded crash limit (${String(config.respawn.maxCrashes)} in ${String(config.respawn.windowMs)}ms); not respawning`,
|
||||
);
|
||||
if (config.onCrashLimitReached !== null) {
|
||||
config.onCrashLimitReached(slot.key);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -303,7 +319,7 @@ export function createWorkerRuntime<K extends string>(
|
||||
}
|
||||
|
||||
async function shutdownWorker(slot: WorkerSlot<K>): Promise<void> {
|
||||
await gracefulStop(slot);
|
||||
await gracefulStop(slot, config.shutdownTimeoutMs);
|
||||
workers.delete(slot.key);
|
||||
}
|
||||
|
||||
@@ -348,22 +364,24 @@ export function createWorkerRuntime<K extends string>(
|
||||
});
|
||||
},
|
||||
|
||||
evict: async (key: K) => {
|
||||
evict: async (key: K, opts: WorkerDrainOpts | null) => {
|
||||
const slot = getOrCreateSlot(key);
|
||||
const shutdownMs = resolveShutdownTimeoutMs(opts);
|
||||
await enqueueOp(slot, async () => {
|
||||
await gracefulStop(slot);
|
||||
await gracefulStop(slot, shutdownMs);
|
||||
workers.delete(key);
|
||||
});
|
||||
},
|
||||
|
||||
drain: async (key: K) => {
|
||||
drain: async (key: K, opts: WorkerDrainOpts | null) => {
|
||||
const slot = getOrCreateSlot(key);
|
||||
const shutdownMs = resolveShutdownTimeoutMs(opts);
|
||||
await enqueueOp(slot, async () => {
|
||||
if (slot.child === null) {
|
||||
await forkAndWaitReady(slot);
|
||||
return;
|
||||
}
|
||||
await gracefulStop(slot);
|
||||
await gracefulStop(slot, shutdownMs);
|
||||
await forkAndWaitReady(slot);
|
||||
});
|
||||
},
|
||||
|
||||
@@ -0,0 +1,256 @@
|
||||
/**
|
||||
* Pure helpers and IPC branching for workflow-manager (keeps workflow-manager.ts lean).
|
||||
*/
|
||||
|
||||
import { dirname, join } from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
|
||||
import type { WorkflowMessage } from "@uncaged/nerve-core";
|
||||
import { START, isPlainRecord } from "@uncaged/nerve-core";
|
||||
|
||||
import type { LogStore, WorkflowRunStatus } from "@uncaged/nerve-store";
|
||||
import type { ResumeThreadMessage, ThreadEventMessage } from "./ipc.js";
|
||||
import type { WorkerToParentMessage } from "./ipc.js";
|
||||
|
||||
export type PendingThread = {
|
||||
runId: string;
|
||||
prompt: string;
|
||||
maxRounds: number;
|
||||
dryRun: boolean;
|
||||
};
|
||||
|
||||
export type WorkflowState = {
|
||||
active: Set<string>;
|
||||
queue: PendingThread[];
|
||||
};
|
||||
|
||||
/** Matches legacy manager: 6 crashes within 60s stops respawn (was `length > 5`). */
|
||||
export const WORKFLOW_WORKER_RESPAWN = {
|
||||
enabled: true,
|
||||
maxCrashes: 6,
|
||||
windowMs: 60_000,
|
||||
delayMs: 0,
|
||||
} as const;
|
||||
|
||||
/**
|
||||
* Worker shutdown timeout — must stay in sync with SHUTDOWN_TIMEOUT_MS in workflow-worker.ts.
|
||||
* The drain timeout passed to drainAndRespawn must be >= this value so the worker has
|
||||
* enough time to finish in-flight threads before the parent force-kills it.
|
||||
*/
|
||||
export const WORKER_SHUTDOWN_TIMEOUT_MS = 10_000;
|
||||
|
||||
export const DEFAULT_MAX_QUEUE = 100;
|
||||
|
||||
export function readLaunchFromTriggerPayload(
|
||||
raw: unknown,
|
||||
engineDefaultMaxRounds: number,
|
||||
): { prompt: string; maxRounds: number; dryRun: boolean } {
|
||||
if (isPlainRecord(raw)) {
|
||||
const o = raw;
|
||||
if (typeof o.prompt === "string" && typeof o.maxRounds === "number") {
|
||||
const dryRun = typeof o.dryRun === "boolean" ? o.dryRun : false;
|
||||
return { prompt: o.prompt, maxRounds: o.maxRounds, dryRun };
|
||||
}
|
||||
}
|
||||
return { prompt: "", maxRounds: engineDefaultMaxRounds, dryRun: false };
|
||||
}
|
||||
|
||||
export function ensureThreadMessagesWithStart(
|
||||
messages: Array<{ role: string; content: string; meta: unknown; timestamp: number }>,
|
||||
threadId: string,
|
||||
fallbackPrompt: string,
|
||||
fallbackMaxRounds: number,
|
||||
): WorkflowMessage[] {
|
||||
const mapped: WorkflowMessage[] = messages.map((m) => ({
|
||||
role: m.role,
|
||||
content: m.content,
|
||||
meta: m.meta,
|
||||
timestamp: m.timestamp,
|
||||
}));
|
||||
if (mapped.length > 0 && mapped[0].role === START) {
|
||||
return mapped;
|
||||
}
|
||||
const start: WorkflowMessage = {
|
||||
role: START,
|
||||
content: fallbackPrompt,
|
||||
meta: { maxRounds: fallbackMaxRounds, threadId },
|
||||
timestamp: Date.now(),
|
||||
};
|
||||
return [start, ...mapped];
|
||||
}
|
||||
|
||||
export function resolveWorkflowWorkerScript(): string {
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dir = dirname(__filename);
|
||||
return join(__dir, "workflow-worker.js");
|
||||
}
|
||||
|
||||
export function mapWorkflowRunStatus(eventType: string): WorkflowRunStatus | null {
|
||||
const map: Record<string, WorkflowRunStatus> = {
|
||||
started: "started",
|
||||
queued: "queued",
|
||||
completed: "completed",
|
||||
failed: "failed",
|
||||
crashed: "crashed",
|
||||
dropped: "dropped",
|
||||
interrupted: "interrupted",
|
||||
killed: "killed",
|
||||
};
|
||||
return map[eventType] ?? null;
|
||||
}
|
||||
|
||||
export function extractExitCodeFromPayload(payload: unknown): number | null {
|
||||
if (isPlainRecord(payload) && typeof payload.exitCode === "number") {
|
||||
return payload.exitCode;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
export function appendWorkflowRunLog(
|
||||
logStore: LogStore,
|
||||
workflowName: string,
|
||||
runId: string,
|
||||
eventType: string,
|
||||
payload: unknown | undefined,
|
||||
exitCode: number | null,
|
||||
): void {
|
||||
const timestamp = Date.now();
|
||||
const serialised = payload !== undefined ? JSON.stringify(payload) : null;
|
||||
const status = mapWorkflowRunStatus(eventType);
|
||||
|
||||
if (status !== null) {
|
||||
logStore.upsertWorkflowRun(
|
||||
{
|
||||
source: "workflow",
|
||||
type: eventType,
|
||||
refId: runId,
|
||||
payload: serialised,
|
||||
timestamp,
|
||||
},
|
||||
{ runId, workflow: workflowName, status, timestamp, exitCode },
|
||||
);
|
||||
} else {
|
||||
logStore.append({
|
||||
source: "workflow",
|
||||
type: eventType,
|
||||
refId: runId,
|
||||
payload: serialised,
|
||||
timestamp,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export function recoverQueuedRun(
|
||||
workflowName: string,
|
||||
runId: string,
|
||||
state: WorkflowState,
|
||||
logStore: LogStore,
|
||||
engineMaxRounds: number,
|
||||
): void {
|
||||
if (state.queue.some((q) => q.runId === runId)) return;
|
||||
const launch = readLaunchFromTriggerPayload(logStore.getTriggerPayload(runId), engineMaxRounds);
|
||||
state.queue.push({
|
||||
runId,
|
||||
prompt: launch.prompt,
|
||||
maxRounds: launch.maxRounds,
|
||||
dryRun: launch.dryRun,
|
||||
});
|
||||
process.stderr.write(
|
||||
`[workflow-manager] crash-recovery: re-queued thread "${runId}" for "${workflowName}"\n`,
|
||||
);
|
||||
}
|
||||
|
||||
export function recoverStartedRun(
|
||||
workflowName: string,
|
||||
runId: string,
|
||||
state: WorkflowState,
|
||||
logStore: LogStore,
|
||||
engineMaxRounds: number,
|
||||
sendResume: (wf: string, msg: ResumeThreadMessage) => void,
|
||||
): void {
|
||||
if (state.active.has(runId)) return;
|
||||
const rawMessages = logStore.getThreadMessages(runId);
|
||||
const launch = readLaunchFromTriggerPayload(logStore.getTriggerPayload(runId), engineMaxRounds);
|
||||
const messages = ensureThreadMessagesWithStart(
|
||||
rawMessages,
|
||||
runId,
|
||||
launch.prompt,
|
||||
launch.maxRounds,
|
||||
);
|
||||
state.active.add(runId);
|
||||
const msg: ResumeThreadMessage = {
|
||||
type: "resume-thread",
|
||||
runId,
|
||||
messages,
|
||||
maxRounds: launch.maxRounds,
|
||||
dryRun: launch.dryRun,
|
||||
};
|
||||
sendResume(workflowName, msg);
|
||||
process.stderr.write(
|
||||
`[workflow-manager] crash-recovery: resuming thread "${runId}" for "${workflowName}" (${String(messages.length)} messages)\n`,
|
||||
);
|
||||
}
|
||||
|
||||
export function recoverThreadsFromStore(
|
||||
workflowName: string,
|
||||
logStore: LogStore,
|
||||
engineMaxRounds: number,
|
||||
getOrCreateState: (name: string) => WorkflowState,
|
||||
sendResume: (wf: string, msg: ResumeThreadMessage) => void,
|
||||
): void {
|
||||
const activeRuns = logStore.getActiveWorkflowRuns(workflowName);
|
||||
const state = getOrCreateState(workflowName);
|
||||
|
||||
for (const run of activeRuns) {
|
||||
if (run.status === "queued") {
|
||||
recoverQueuedRun(workflowName, run.runId, state, logStore, engineMaxRounds);
|
||||
} else if (run.status === "started") {
|
||||
recoverStartedRun(workflowName, run.runId, state, logStore, engineMaxRounds, sendResume);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export type WorkflowManagerMessageDeps = {
|
||||
logStore: LogStore;
|
||||
handleThreadEvent: (workflowName: string, msg: ThreadEventMessage) => void;
|
||||
onWorkflowRoleError: (
|
||||
workflowName: string,
|
||||
runId: string,
|
||||
error: string,
|
||||
exitCode: number,
|
||||
) => void;
|
||||
};
|
||||
|
||||
export function dispatchWorkflowWorkerMessage(
|
||||
workflowName: string,
|
||||
msg: WorkerToParentMessage,
|
||||
deps: WorkflowManagerMessageDeps,
|
||||
): void {
|
||||
if (msg.type === "thread-event") {
|
||||
deps.handleThreadEvent(workflowName, msg);
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.type === "thread-workflow-message") {
|
||||
deps.logStore.append({
|
||||
source: "workflow",
|
||||
type: "thread_workflow_message",
|
||||
refId: msg.runId,
|
||||
payload: JSON.stringify(msg.message),
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.type === "workflow-error") {
|
||||
process.stderr.write(
|
||||
`[workflow-manager] workflow-error for runId "${msg.runId}" in "${workflowName}": ${msg.error}\n`,
|
||||
);
|
||||
deps.onWorkflowRoleError(workflowName, msg.runId, msg.error, msg.exitCode);
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.type === "error") {
|
||||
process.stderr.write(`[workflow-manager] error from "${workflowName}" worker: ${msg.error}\n`);
|
||||
}
|
||||
}
|
||||
@@ -6,33 +6,24 @@
|
||||
* Concurrency and overflow (drop/queue) are enforced here in the parent process.
|
||||
*/
|
||||
|
||||
import { fork } from "node:child_process";
|
||||
import type { ChildProcess } from "node:child_process";
|
||||
import { dirname, join } from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
import type { NerveConfig, WorkflowConfig, WorkflowStatus } from "@uncaged/nerve-core";
|
||||
|
||||
import type {
|
||||
NerveConfig,
|
||||
WorkflowConfig,
|
||||
WorkflowMessage,
|
||||
WorkflowStatus,
|
||||
} from "@uncaged/nerve-core";
|
||||
import { START, isPlainRecord } from "@uncaged/nerve-core";
|
||||
|
||||
import type { LogStore, WorkflowRunStatus } from "@uncaged/nerve-store";
|
||||
import type {
|
||||
KillThreadMessage,
|
||||
ResumeThreadMessage,
|
||||
ShutdownMessage,
|
||||
StartThreadMessage,
|
||||
ThreadEventMessage,
|
||||
} from "./ipc.js";
|
||||
import type { LogStore } from "@uncaged/nerve-store";
|
||||
import type { KillThreadMessage, StartThreadMessage, ThreadEventMessage } from "./ipc.js";
|
||||
import { parseWorkerMessage } from "./ipc.js";
|
||||
import { formatCapturedStderrTail, formatChildExitSummary } from "./worker-fork-support.js";
|
||||
import { createWorkerRuntime } from "./worker-runtime.js";
|
||||
import {
|
||||
formatCapturedStderrTail,
|
||||
formatChildExitSummary,
|
||||
teeCapturedStderr,
|
||||
} from "./worker-fork-support.js";
|
||||
DEFAULT_MAX_QUEUE,
|
||||
WORKER_SHUTDOWN_TIMEOUT_MS,
|
||||
WORKFLOW_WORKER_RESPAWN,
|
||||
type WorkflowState,
|
||||
appendWorkflowRunLog,
|
||||
dispatchWorkflowWorkerMessage,
|
||||
extractExitCodeFromPayload,
|
||||
recoverThreadsFromStore,
|
||||
resolveWorkflowWorkerScript,
|
||||
} from "./workflow-manager-support.js";
|
||||
|
||||
export type WorkflowLaunchParams = {
|
||||
prompt: string;
|
||||
@@ -74,169 +65,109 @@ export type WorkflowManager = {
|
||||
stop: () => Promise<void>;
|
||||
};
|
||||
|
||||
type PendingThread = {
|
||||
runId: string;
|
||||
prompt: string;
|
||||
maxRounds: number;
|
||||
dryRun: boolean;
|
||||
};
|
||||
|
||||
type WorkflowState = {
|
||||
active: Set<string>;
|
||||
queue: PendingThread[];
|
||||
};
|
||||
|
||||
type WorkerEntry = {
|
||||
workflowName: string;
|
||||
process: ChildProcess;
|
||||
stopping: boolean;
|
||||
/** When set, the worker is draining before a hot-reload respawn. */
|
||||
draining: boolean;
|
||||
stderrTail: { value: string };
|
||||
};
|
||||
|
||||
// Crash respawn backoff: track crash timestamps per workflow.
|
||||
const MAX_CRASHES_IN_WINDOW = 5;
|
||||
const CRASH_WINDOW_MS = 60_000;
|
||||
|
||||
/**
|
||||
* Worker shutdown timeout — must stay in sync with SHUTDOWN_TIMEOUT_MS in workflow-worker.ts.
|
||||
* The drain timeout passed to drainAndRespawn must be >= this value so the worker has
|
||||
* enough time to finish in-flight threads before the parent force-kills it.
|
||||
*/
|
||||
const WORKER_SHUTDOWN_TIMEOUT_MS = 10_000;
|
||||
|
||||
const DEFAULT_MAX_QUEUE = 100;
|
||||
|
||||
function readLaunchFromTriggerPayload(
|
||||
raw: unknown,
|
||||
engineDefaultMaxRounds: number,
|
||||
): { prompt: string; maxRounds: number; dryRun: boolean } {
|
||||
if (isPlainRecord(raw)) {
|
||||
const o = raw;
|
||||
if (typeof o.prompt === "string" && typeof o.maxRounds === "number") {
|
||||
const dryRun = typeof o.dryRun === "boolean" ? o.dryRun : false;
|
||||
return { prompt: o.prompt, maxRounds: o.maxRounds, dryRun };
|
||||
}
|
||||
}
|
||||
return { prompt: "", maxRounds: engineDefaultMaxRounds, dryRun: false };
|
||||
}
|
||||
|
||||
function ensureThreadMessagesWithStart(
|
||||
messages: Array<{ role: string; content: string; meta: unknown; timestamp: number }>,
|
||||
threadId: string,
|
||||
fallbackPrompt: string,
|
||||
fallbackMaxRounds: number,
|
||||
): WorkflowMessage[] {
|
||||
const mapped: WorkflowMessage[] = messages.map((m) => ({
|
||||
role: m.role,
|
||||
content: m.content,
|
||||
meta: m.meta,
|
||||
timestamp: m.timestamp,
|
||||
}));
|
||||
if (mapped.length > 0 && mapped[0].role === START) {
|
||||
return mapped;
|
||||
}
|
||||
const start: WorkflowMessage = {
|
||||
role: START,
|
||||
content: fallbackPrompt,
|
||||
meta: { maxRounds: fallbackMaxRounds, threadId },
|
||||
timestamp: Date.now(),
|
||||
};
|
||||
return [start, ...mapped];
|
||||
}
|
||||
|
||||
function resolveWorkerScript(): string {
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dir = dirname(__filename);
|
||||
return join(__dir, "workflow-worker.js");
|
||||
}
|
||||
|
||||
function spawnWorkflowWorker(
|
||||
nerveRoot: string,
|
||||
workflowName: string,
|
||||
workerScript: string,
|
||||
stderrTail: { value: string },
|
||||
): ChildProcess {
|
||||
const child = fork(workerScript, ["--workflow", workflowName, "--root", nerveRoot], {
|
||||
stdio: ["ignore", "inherit", "pipe", "ipc"],
|
||||
});
|
||||
teeCapturedStderr(child, stderrTail);
|
||||
// Prevent unhandled EPIPE when writing to a child whose IPC channel closed
|
||||
child.on("error", (err) => {
|
||||
if ((err as NodeJS.ErrnoException).code !== "EPIPE") {
|
||||
console.error("[worker] error:", err.message);
|
||||
}
|
||||
});
|
||||
return child;
|
||||
}
|
||||
|
||||
function sendStartThread(worker: ChildProcess, msg: StartThreadMessage): void {
|
||||
if (worker.connected === false) return;
|
||||
try {
|
||||
worker.send(msg);
|
||||
} catch {
|
||||
// IPC channel closed between connected check and send
|
||||
}
|
||||
}
|
||||
|
||||
function sendShutdown(worker: ChildProcess, entry: WorkerEntry): void {
|
||||
entry.stopping = true;
|
||||
if (worker.connected === false) return;
|
||||
const msg: ShutdownMessage = { type: "shutdown" };
|
||||
try {
|
||||
worker.send(msg);
|
||||
} catch {
|
||||
// IPC channel closed between connected check and send
|
||||
}
|
||||
}
|
||||
|
||||
function sendResumeThread(worker: ChildProcess, msg: ResumeThreadMessage): void {
|
||||
if (worker.connected === false) return;
|
||||
try {
|
||||
worker.send(msg);
|
||||
} catch {
|
||||
// IPC channel closed between connected check and send
|
||||
}
|
||||
}
|
||||
|
||||
function sendKillThread(worker: ChildProcess, runId: string): void {
|
||||
if (worker.connected === false) return;
|
||||
const msg: KillThreadMessage = { type: "kill-thread", runId };
|
||||
try {
|
||||
worker.send(msg);
|
||||
} catch {
|
||||
// IPC channel closed between connected check and send
|
||||
}
|
||||
}
|
||||
|
||||
function waitForExit(child: ChildProcess, timeoutMs: number): Promise<void> {
|
||||
return new Promise((resolve) => {
|
||||
const timer = setTimeout(() => {
|
||||
child.kill("SIGKILL");
|
||||
resolve();
|
||||
}, timeoutMs);
|
||||
child.once("exit", () => {
|
||||
clearTimeout(timer);
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
export function createWorkflowManager(
|
||||
nerveRoot: string,
|
||||
initialConfig: NerveConfig,
|
||||
logStore: LogStore,
|
||||
): WorkflowManager {
|
||||
const workerScript = resolveWorkerScript();
|
||||
const workerScript = resolveWorkflowWorkerScript();
|
||||
|
||||
/**
|
||||
* Default drain timeout must be at least WORKER_SHUTDOWN_TIMEOUT_MS so the worker
|
||||
* has enough time to finish in-flight threads before the parent force-kills it.
|
||||
*/
|
||||
const DEFAULT_DRAIN_TIMEOUT_MS = Math.max(30_000, WORKER_SHUTDOWN_TIMEOUT_MS + 5_000);
|
||||
|
||||
const states = new Map<string, WorkflowState>();
|
||||
const workers = new Map<string, WorkerEntry>();
|
||||
const crashTimestamps = new Map<string, number[]>();
|
||||
const trackedWorkflows = new Set<string>();
|
||||
const hotReloadEvicting = new Set<string>();
|
||||
const crashRecoveryPending = new Set<string>();
|
||||
const crashLimitBlocked = new Set<string>();
|
||||
let stopped = false;
|
||||
let config = initialConfig;
|
||||
const pendingDrains = new Set<string>();
|
||||
|
||||
function logWorkflowEvent(
|
||||
workflowName: string,
|
||||
runId: string,
|
||||
eventType: string,
|
||||
payload: unknown | null = null,
|
||||
exitCode: number | null = null,
|
||||
): void {
|
||||
appendWorkflowRunLog(logStore, workflowName, runId, eventType, payload, exitCode);
|
||||
}
|
||||
|
||||
const runtime = createWorkerRuntime<string>({
|
||||
script: workerScript,
|
||||
argsForKey: (workflowName) => ["--workflow", workflowName, "--root", nerveRoot],
|
||||
forwardStderr: true,
|
||||
onMessage: (workflowName, raw) => {
|
||||
handleWorkerMessage(workflowName, raw);
|
||||
},
|
||||
onReady: (workflowName, _msg) => {
|
||||
if (crashRecoveryPending.has(workflowName)) {
|
||||
crashRecoveryPending.delete(workflowName);
|
||||
recoverThreadsFromStore(
|
||||
workflowName,
|
||||
logStore,
|
||||
config.maxRounds,
|
||||
getOrCreateState,
|
||||
(wf, msg) => {
|
||||
sendToWorker(wf, msg);
|
||||
},
|
||||
);
|
||||
}
|
||||
},
|
||||
onExit: (workflowName, code, signalStr) => {
|
||||
const sig =
|
||||
signalStr === null || signalStr === undefined || signalStr === ""
|
||||
? null
|
||||
: (signalStr as NodeJS.Signals);
|
||||
|
||||
if (hotReloadEvicting.has(workflowName)) {
|
||||
hotReloadEvicting.delete(workflowName);
|
||||
markActiveRunsInterrupted(workflowName);
|
||||
if (!stopped && workflowConfig(workflowName) !== null) {
|
||||
process.stderr.write(
|
||||
`[workflow-manager] worker for "${workflowName}" drained, respawning\n`,
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (stopped) {
|
||||
const state = states.get(workflowName);
|
||||
if (state !== undefined) {
|
||||
state.active.clear();
|
||||
}
|
||||
crashRecoveryPending.delete(workflowName);
|
||||
return;
|
||||
}
|
||||
|
||||
const summary = formatChildExitSummary(code, sig);
|
||||
const stderrExtra = formatCapturedStderrTail(runtime.stderrTail(workflowName));
|
||||
process.stderr.write(
|
||||
`[workflow-manager] worker for "${workflowName}" exited (${summary})${stderrExtra}\n`,
|
||||
);
|
||||
|
||||
cleanupAfterUnexpectedWorkerExit(workflowName);
|
||||
crashRecoveryPending.add(workflowName);
|
||||
},
|
||||
onCrashLimitReached: (workflowName) => {
|
||||
crashRecoveryPending.delete(workflowName);
|
||||
trackedWorkflows.delete(workflowName);
|
||||
crashLimitBlocked.add(workflowName);
|
||||
process.stderr.write(
|
||||
`[workflow-manager] worker for "${workflowName}" exceeded crash limit (${String(WORKFLOW_WORKER_RESPAWN.maxCrashes)} in ${String(WORKFLOW_WORKER_RESPAWN.windowMs)}ms) — stopping respawn\n`,
|
||||
);
|
||||
},
|
||||
respawn: {
|
||||
...WORKFLOW_WORKER_RESPAWN,
|
||||
allowRespawn: (_wf) => !stopped,
|
||||
},
|
||||
shutdownTimeoutMs: DEFAULT_DRAIN_TIMEOUT_MS,
|
||||
});
|
||||
|
||||
function getOrCreateState(workflowName: string): WorkflowState {
|
||||
let state = states.get(workflowName);
|
||||
if (state === undefined) {
|
||||
@@ -250,60 +181,38 @@ export function createWorkflowManager(
|
||||
return config.workflows[workflowName] ?? null;
|
||||
}
|
||||
|
||||
function toWorkflowRunStatus(eventType: string): WorkflowRunStatus | null {
|
||||
const map: Record<string, WorkflowRunStatus> = {
|
||||
started: "started",
|
||||
queued: "queued",
|
||||
completed: "completed",
|
||||
failed: "failed",
|
||||
crashed: "crashed",
|
||||
dropped: "dropped",
|
||||
interrupted: "interrupted",
|
||||
killed: "killed",
|
||||
};
|
||||
return map[eventType] ?? null;
|
||||
}
|
||||
|
||||
function extractExitCode(payload: unknown): number | null {
|
||||
if (isPlainRecord(payload) && typeof payload.exitCode === "number") {
|
||||
return payload.exitCode;
|
||||
/** IPC send — matches legacy pool: no-op when IPC is disconnected; cold-start via WorkerRuntime.send. */
|
||||
function sendToWorker(workflowName: string, msg: unknown): void {
|
||||
if (crashLimitBlocked.has(workflowName)) {
|
||||
return;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
function logWorkflowEvent(
|
||||
workflowName: string,
|
||||
runId: string,
|
||||
eventType: string,
|
||||
payload?: unknown,
|
||||
exitCode: number | null = null,
|
||||
): void {
|
||||
const timestamp = Date.now();
|
||||
const serialised = payload !== undefined ? JSON.stringify(payload) : null;
|
||||
const status = toWorkflowRunStatus(eventType);
|
||||
|
||||
if (status !== null) {
|
||||
logStore.upsertWorkflowRun(
|
||||
{
|
||||
source: "workflow",
|
||||
type: eventType,
|
||||
refId: runId,
|
||||
payload: serialised,
|
||||
timestamp,
|
||||
},
|
||||
{ runId, workflow: workflowName, status, timestamp, exitCode },
|
||||
);
|
||||
} else {
|
||||
logStore.append({
|
||||
source: "workflow",
|
||||
type: eventType,
|
||||
refId: runId,
|
||||
payload: serialised,
|
||||
timestamp,
|
||||
trackedWorkflows.add(workflowName);
|
||||
if (runtime.hasDisconnectedChild(workflowName)) {
|
||||
return;
|
||||
}
|
||||
if (!runtime.trySendSync(workflowName, msg)) {
|
||||
void runtime.send(workflowName, msg).catch(() => {
|
||||
// IPC channel closed — mark any thread from this message as failed
|
||||
if (isStartThreadMsg(msg)) {
|
||||
const state = states.get(workflowName);
|
||||
if (state?.active.has(msg.runId)) {
|
||||
state.active.delete(msg.runId);
|
||||
logWorkflowEvent(workflowName, msg.runId, "failed", { error: "IPC channel closed" }, 1);
|
||||
dequeueNext(workflowName);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function isStartThreadMsg(msg: unknown): msg is StartThreadMessage {
|
||||
return (
|
||||
msg !== null &&
|
||||
typeof msg === "object" &&
|
||||
(msg as Record<string, unknown>).type === "start-thread"
|
||||
);
|
||||
}
|
||||
|
||||
function dispatchThread(
|
||||
workflowName: string,
|
||||
runId: string,
|
||||
@@ -314,7 +223,6 @@ export function createWorkflowManager(
|
||||
const state = getOrCreateState(workflowName);
|
||||
state.active.add(runId);
|
||||
|
||||
const worker = getOrSpawnWorker(workflowName);
|
||||
const msg: StartThreadMessage = {
|
||||
type: "start-thread",
|
||||
runId,
|
||||
@@ -323,7 +231,7 @@ export function createWorkflowManager(
|
||||
maxRounds,
|
||||
dryRun,
|
||||
};
|
||||
sendStartThread(worker.process, msg);
|
||||
sendToWorker(workflowName, msg);
|
||||
logWorkflowEvent(workflowName, runId, "started", { prompt, maxRounds, dryRun });
|
||||
}
|
||||
|
||||
@@ -367,92 +275,20 @@ export function createWorkflowManager(
|
||||
if (msg.eventType === "completed" || msg.eventType === "failed" || msg.eventType === "killed") {
|
||||
state.active.delete(msg.runId);
|
||||
dequeueNext(workflowName);
|
||||
const exitCode = extractExitCode(msg.payload);
|
||||
const exitCode = extractExitCodeFromPayload(msg.payload);
|
||||
logWorkflowEvent(workflowName, msg.runId, msg.eventType, msg.payload, exitCode);
|
||||
maybeDeferredHotReloadDrain(workflowName);
|
||||
}
|
||||
}
|
||||
|
||||
function recoverQueuedRun(workflowName: string, runId: string, state: WorkflowState): void {
|
||||
if (state.queue.some((q) => q.runId === runId)) return;
|
||||
const launch = readLaunchFromTriggerPayload(
|
||||
logStore.getTriggerPayload(runId),
|
||||
config.maxRounds,
|
||||
);
|
||||
state.queue.push({
|
||||
runId,
|
||||
prompt: launch.prompt,
|
||||
maxRounds: launch.maxRounds,
|
||||
dryRun: launch.dryRun,
|
||||
});
|
||||
process.stderr.write(
|
||||
`[workflow-manager] crash-recovery: re-queued thread "${runId}" for "${workflowName}"\n`,
|
||||
);
|
||||
}
|
||||
|
||||
function recoverStartedRun(
|
||||
workflowName: string,
|
||||
runId: string,
|
||||
state: WorkflowState,
|
||||
worker: WorkerEntry,
|
||||
): void {
|
||||
if (state.active.has(runId)) return;
|
||||
const rawMessages = logStore.getThreadMessages(runId);
|
||||
const launch = readLaunchFromTriggerPayload(
|
||||
logStore.getTriggerPayload(runId),
|
||||
config.maxRounds,
|
||||
);
|
||||
const messages = ensureThreadMessagesWithStart(
|
||||
rawMessages,
|
||||
runId,
|
||||
launch.prompt,
|
||||
launch.maxRounds,
|
||||
);
|
||||
state.active.add(runId);
|
||||
const msg: ResumeThreadMessage = {
|
||||
type: "resume-thread",
|
||||
runId,
|
||||
messages,
|
||||
maxRounds: launch.maxRounds,
|
||||
dryRun: launch.dryRun,
|
||||
};
|
||||
sendResumeThread(worker.process, msg);
|
||||
process.stderr.write(
|
||||
`[workflow-manager] crash-recovery: resuming thread "${runId}" for "${workflowName}" (${messages.length} messages)\n`,
|
||||
);
|
||||
}
|
||||
|
||||
function recoverThreadsForWorker(workflowName: string, worker: WorkerEntry): void {
|
||||
const activeRuns = logStore.getActiveWorkflowRuns(workflowName);
|
||||
const state = getOrCreateState(workflowName);
|
||||
|
||||
for (const run of activeRuns) {
|
||||
if (run.status === "queued") {
|
||||
recoverQueuedRun(workflowName, run.runId, state);
|
||||
} else if (run.status === "started") {
|
||||
recoverStartedRun(workflowName, run.runId, state, worker);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function recordCrashAndCheckLimit(workflowName: string): boolean {
|
||||
const now = Date.now();
|
||||
const timestamps = (crashTimestamps.get(workflowName) ?? []).filter(
|
||||
(t) => now - t < CRASH_WINDOW_MS,
|
||||
);
|
||||
timestamps.push(now);
|
||||
crashTimestamps.set(workflowName, timestamps);
|
||||
return timestamps.length > MAX_CRASHES_IN_WINDOW;
|
||||
}
|
||||
|
||||
function handleWorkerCrash(workflowName: string): void {
|
||||
function cleanupAfterUnexpectedWorkerExit(workflowName: string): void {
|
||||
const state = states.get(workflowName);
|
||||
if (state === undefined) return;
|
||||
|
||||
const crashedCount = state.active.size;
|
||||
if (crashedCount > 0) {
|
||||
process.stderr.write(
|
||||
`[workflow-manager] worker for "${workflowName}" crashed with ${crashedCount} active thread(s)\n`,
|
||||
`[workflow-manager] worker for "${workflowName}" crashed with ${String(crashedCount)} active thread(s)\n`,
|
||||
);
|
||||
for (const runId of state.active) {
|
||||
logWorkflowEvent(workflowName, runId, "crashed", undefined, 255);
|
||||
@@ -460,26 +296,13 @@ export function createWorkflowManager(
|
||||
}
|
||||
|
||||
state.active.clear();
|
||||
workers.delete(workflowName);
|
||||
pendingDrains.delete(workflowName);
|
||||
|
||||
if (stopped || workflowConfig(workflowName) === null) return;
|
||||
|
||||
if (recordCrashAndCheckLimit(workflowName)) {
|
||||
const count = crashTimestamps.get(workflowName)?.length ?? 0;
|
||||
if (!stopped && !crashLimitBlocked.has(workflowName) && workflowConfig(workflowName) !== null) {
|
||||
process.stderr.write(
|
||||
`[workflow-manager] worker for "${workflowName}" crashed ${count} times in ${CRASH_WINDOW_MS}ms — stopping respawn\n`,
|
||||
`[workflow-manager] respawning worker for "${workflowName}" after crash\n`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
process.stderr.write(
|
||||
`[workflow-manager] respawning worker for "${workflowName}" after crash\n`,
|
||||
);
|
||||
const newWorker = getOrSpawnWorker(workflowName);
|
||||
setImmediate(() => {
|
||||
recoverThreadsForWorker(workflowName, newWorker);
|
||||
});
|
||||
}
|
||||
|
||||
function handleWorkerMessage(workflowName: string, raw: unknown): void {
|
||||
@@ -490,43 +313,19 @@ export function createWorkflowManager(
|
||||
);
|
||||
return;
|
||||
}
|
||||
const msg = result.value;
|
||||
|
||||
if (msg.type === "thread-event") {
|
||||
handleThreadEvent(workflowName, msg);
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.type === "thread-workflow-message") {
|
||||
logStore.append({
|
||||
source: "workflow",
|
||||
type: "thread_workflow_message",
|
||||
refId: msg.runId,
|
||||
payload: JSON.stringify(msg.message),
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.type === "workflow-error") {
|
||||
process.stderr.write(
|
||||
`[workflow-manager] workflow-error for runId "${msg.runId}" in "${workflowName}": ${msg.error}\n`,
|
||||
);
|
||||
const state = states.get(workflowName);
|
||||
if (state !== undefined) {
|
||||
state.active.delete(msg.runId);
|
||||
dequeueNext(workflowName);
|
||||
}
|
||||
logWorkflowEvent(workflowName, msg.runId, "failed", { error: msg.error }, msg.exitCode);
|
||||
maybeDeferredHotReloadDrain(workflowName);
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.type === "error") {
|
||||
process.stderr.write(
|
||||
`[workflow-manager] error from "${workflowName}" worker: ${msg.error}\n`,
|
||||
);
|
||||
}
|
||||
dispatchWorkflowWorkerMessage(workflowName, result.value, {
|
||||
logStore,
|
||||
handleThreadEvent,
|
||||
onWorkflowRoleError: (wf, runId, error, exitCode) => {
|
||||
const state = states.get(wf);
|
||||
if (state !== undefined) {
|
||||
state.active.delete(runId);
|
||||
dequeueNext(wf);
|
||||
}
|
||||
logWorkflowEvent(wf, runId, "failed", { error }, exitCode);
|
||||
maybeDeferredHotReloadDrain(wf);
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
function markActiveRunsInterrupted(workflowName: string): void {
|
||||
@@ -538,67 +337,6 @@ export function createWorkflowManager(
|
||||
state.active.clear();
|
||||
}
|
||||
|
||||
function handleWorkerExit(
|
||||
workflowName: string,
|
||||
code: number | null,
|
||||
signal: NodeJS.Signals | null,
|
||||
): void {
|
||||
const entry = workers.get(workflowName);
|
||||
if (entry?.draining) {
|
||||
workers.delete(workflowName);
|
||||
markActiveRunsInterrupted(workflowName);
|
||||
if (!stopped && workflowConfig(workflowName) !== null) {
|
||||
process.stderr.write(
|
||||
`[workflow-manager] worker for "${workflowName}" drained, respawning\n`,
|
||||
);
|
||||
getOrSpawnWorker(workflowName);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (entry?.stopping) {
|
||||
workers.delete(workflowName);
|
||||
const state = states.get(workflowName);
|
||||
if (state !== undefined) {
|
||||
state.active.clear();
|
||||
}
|
||||
return;
|
||||
}
|
||||
const summary = formatChildExitSummary(code, signal);
|
||||
const stderrExtra = entry !== undefined ? formatCapturedStderrTail(entry.stderrTail.value) : "";
|
||||
process.stderr.write(
|
||||
`[workflow-manager] worker for "${workflowName}" exited (${summary})${stderrExtra}\n`,
|
||||
);
|
||||
handleWorkerCrash(workflowName);
|
||||
}
|
||||
|
||||
function getOrSpawnWorker(workflowName: string): WorkerEntry {
|
||||
const existing = workers.get(workflowName);
|
||||
if (existing !== undefined && existing.process.exitCode === null) {
|
||||
return existing;
|
||||
}
|
||||
|
||||
const stderrTail = { value: "" };
|
||||
const child = spawnWorkflowWorker(nerveRoot, workflowName, workerScript, stderrTail);
|
||||
|
||||
child.on("message", (raw: unknown) => {
|
||||
handleWorkerMessage(workflowName, raw);
|
||||
});
|
||||
|
||||
child.on("exit", (code, signal) => {
|
||||
handleWorkerExit(workflowName, code, signal ?? null);
|
||||
});
|
||||
|
||||
const entry: WorkerEntry = {
|
||||
workflowName,
|
||||
process: child,
|
||||
stopping: false,
|
||||
draining: false,
|
||||
stderrTail,
|
||||
};
|
||||
workers.set(workflowName, entry);
|
||||
return entry;
|
||||
}
|
||||
|
||||
function killThread(runId: string): boolean {
|
||||
for (const [workflowName, state] of states) {
|
||||
const queueIdx = state.queue.findIndex((q) => q.runId === runId);
|
||||
@@ -609,10 +347,8 @@ export function createWorkflowManager(
|
||||
}
|
||||
|
||||
if (state.active.has(runId)) {
|
||||
const workerEntry = workers.get(workflowName);
|
||||
if (workerEntry !== undefined) {
|
||||
sendKillThread(workerEntry.process, runId);
|
||||
}
|
||||
const msg: KillThreadMessage = { type: "kill-thread", runId };
|
||||
sendToWorker(workflowName, msg);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
@@ -663,7 +399,7 @@ export function createWorkflowManager(
|
||||
state.queue.push({ runId, prompt, maxRounds, dryRun });
|
||||
logWorkflowEvent(workflowName, runId, "queued");
|
||||
process.stderr.write(
|
||||
`[workflow-manager] queued thread for "${workflowName}" runId "${runId}" (queue length: ${state.queue.length})\n`,
|
||||
`[workflow-manager] queued thread for "${workflowName}" runId "${runId}" (queue length: ${String(state.queue.length)})\n`,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -707,35 +443,29 @@ export function createWorkflowManager(
|
||||
config = newConfig;
|
||||
}
|
||||
|
||||
/**
|
||||
* Default drain timeout must be at least WORKER_SHUTDOWN_TIMEOUT_MS so the worker
|
||||
* has enough time to finish in-flight threads before the parent force-kills it.
|
||||
*/
|
||||
const DEFAULT_DRAIN_TIMEOUT_MS = Math.max(30_000, WORKER_SHUTDOWN_TIMEOUT_MS + 5_000);
|
||||
|
||||
async function drainAndRespawn(
|
||||
workflowName: string,
|
||||
drainTimeoutMs: number = DEFAULT_DRAIN_TIMEOUT_MS,
|
||||
): Promise<void> {
|
||||
const entry = workers.get(workflowName);
|
||||
if (entry === undefined) {
|
||||
// No active worker — nothing to drain
|
||||
if (!trackedWorkflows.has(workflowName)) {
|
||||
return;
|
||||
}
|
||||
|
||||
entry.draining = true;
|
||||
// Send shutdown without setting stopping=true (so the exit handler uses the draining branch)
|
||||
if (entry.process.connected) {
|
||||
const msg: ShutdownMessage = { type: "shutdown" };
|
||||
try {
|
||||
entry.process.send(msg);
|
||||
} catch {
|
||||
// IPC closed
|
||||
const shutdownMs = Math.max(drainTimeoutMs, WORKER_SHUTDOWN_TIMEOUT_MS);
|
||||
hotReloadEvicting.add(workflowName);
|
||||
try {
|
||||
await runtime.evict(workflowName, { shutdownTimeoutMs: shutdownMs });
|
||||
trackedWorkflows.delete(workflowName);
|
||||
|
||||
if (!stopped && workflowConfig(workflowName) !== null) {
|
||||
trackedWorkflows.add(workflowName);
|
||||
await runtime.start(workflowName);
|
||||
}
|
||||
} finally {
|
||||
hotReloadEvicting.delete(workflowName);
|
||||
}
|
||||
await waitForExit(entry.process, drainTimeoutMs);
|
||||
// The exit handler (draining branch) will respawn the worker automatically
|
||||
}
|
||||
|
||||
function drainWhenIdle(workflowName: string): void {
|
||||
const state = states.get(workflowName);
|
||||
const hasActiveRuns = state !== undefined && state.active.size > 0;
|
||||
@@ -761,20 +491,17 @@ export function createWorkflowManager(
|
||||
|
||||
pendingDrains.add(workflowName);
|
||||
process.stderr.write(
|
||||
`[workflow-manager] deferring hot-reload for "${workflowName}" until ${state.active.size} active run(s) complete\n`,
|
||||
`[workflow-manager] deferring hot-reload for "${workflowName}" until ${String(state.active.size)} active run(s) complete\n`,
|
||||
);
|
||||
}
|
||||
|
||||
async function stop(): Promise<void> {
|
||||
stopped = true;
|
||||
pendingDrains.clear();
|
||||
const exitPromises: Promise<void>[] = [];
|
||||
for (const entry of workers.values()) {
|
||||
sendShutdown(entry.process, entry);
|
||||
exitPromises.push(waitForExit(entry.process, 5000));
|
||||
}
|
||||
await Promise.all(exitPromises);
|
||||
workers.clear();
|
||||
hotReloadEvicting.clear();
|
||||
crashRecoveryPending.clear();
|
||||
await runtime.shutdown();
|
||||
trackedWorkflows.clear();
|
||||
}
|
||||
|
||||
return {
|
||||
|
||||
@@ -10,7 +10,7 @@ export type CoderMeta = z.infer<typeof coderMetaSchema>;
|
||||
|
||||
export function coderPrompt({ threadId }: { threadId: string }): string {
|
||||
return `Read the workflow thread for the planner's sense design and any tester feedback: \`nerve thread ${threadId}\`
|
||||
Read the nerve-dev skill for sense file structure and conventions: \`cat node_modules/@uncaged/nerve-skills/nerve-dev/SKILL.md\`
|
||||
Read \`cat AGENT.md\` from the repository root, then \`CONVENTIONS.md\` and \`.knowledge/sense.md\` if present.
|
||||
|
||||
## Your task
|
||||
|
||||
@@ -20,21 +20,21 @@ Implement (or fix) the sense the planner designed. If there is tester feedback i
|
||||
|
||||
You do NOT need to finish everything in one pass. You may return \`done: false\` to continue in the next iteration.
|
||||
|
||||
## File structure for each sense
|
||||
## File structure for each sense (flat workspace)
|
||||
|
||||
- \`senses/<name>/src/index.ts\` — TypeScript compute source; import schema as \`./schema.ts\`
|
||||
The workspace has **one root** \`package.json\` and root \`scripts/build.mjs\` (or equivalent) that bundles all senses. There is **no** per-sense \`package.json\`. Bundled output is \`dist/senses/<name>/index.js\` after a root build.
|
||||
|
||||
- \`senses/<name>/src/index.ts\` — compute entry; import schema as \`./schema.ts\`
|
||||
- \`senses/<name>/src/schema.ts\` — Drizzle schema (TypeScript)
|
||||
- \`senses/<name>/migrations/\` — Drizzle migration files (at sense root, not inside src/)
|
||||
- \`senses/<name>/package.json\` — with esbuild build script
|
||||
- \`senses/<name>/index.js\` — bundled output generated by \`pnpm build\` (do NOT edit by hand)
|
||||
- \`senses/<name>/migrations/\` — SQL migration files (at sense root, not inside \`src/\`)
|
||||
|
||||
Look at existing senses for the package.json template and patterns.
|
||||
Look at existing senses for patterns.
|
||||
|
||||
## When to return done: true
|
||||
|
||||
Return \`done: true\` ONLY when ALL of the following are true:
|
||||
- All required files are created
|
||||
- \`pnpm install --no-cache && pnpm build\` succeeds (run it!)
|
||||
- From the **workspace root**, \`pnpm run build\` or \`npm run build\` succeeds (run it!) and \`dist/senses/<name>/index.js\` exists
|
||||
- \`nerve.yaml\` is updated with the sense config
|
||||
|
||||
Return \`done: false\` if you made progress but there is still work to do.`;
|
||||
|
||||
@@ -12,7 +12,7 @@ export function plannerPrompt({ threadId }: { threadId: string }): string {
|
||||
return `You are planning a new Nerve sense.
|
||||
|
||||
Read the workflow thread for the user's request: \`nerve thread ${threadId}\`
|
||||
Read the nerve-dev skill for sense conventions: \`cat node_modules/@uncaged/nerve-skills/nerve-dev/SKILL.md\`
|
||||
Read the workspace guide: \`cat AGENT.md\` from the repository root (created by \`nerve init\`). Also read \`CONVENTIONS.md\` and \`.knowledge/sense.md\` if present. Optional skills live under \`.cursor/skills/\`.
|
||||
Also look at existing senses in the \`senses/\` directory for patterns.
|
||||
|
||||
Pick a good kebab-case name for this sense. Produce a PLAN (not code) in markdown:
|
||||
|
||||
@@ -17,21 +17,20 @@ export function testerPrompt({
|
||||
**IMPORTANT: The Nerve workspace is at \`${nerveRoot}\`. All paths below are relative to this directory. Always \`cd ${nerveRoot}\` first.**
|
||||
|
||||
Read the workflow thread for context: \`nerve thread ${threadId}\`
|
||||
Read the nerve-dev skill for expected file structure: \`cat ${nerveRoot}/node_modules/@uncaged/nerve-skills/nerve-dev/SKILL.md\`
|
||||
Read \`cat ${nerveRoot}/AGENT.md\`, then \`${nerveRoot}/CONVENTIONS.md\` and \`${nerveRoot}/.knowledge/sense.md\` if they exist.
|
||||
|
||||
Verify the full lifecycle in this order:
|
||||
|
||||
1. **File check** — all required sense files exist:
|
||||
1. **File check** — all required sense files exist (no per-sense \`package.json\`):
|
||||
- \`senses/<name>/src/index.ts\`
|
||||
- \`senses/<name>/src/schema.ts\`
|
||||
- \`senses/<name>/migrations/\`
|
||||
- \`senses/<name>/package.json\`
|
||||
|
||||
2. **Build** — run inside the sense directory:
|
||||
2. **Build** — from the workspace root:
|
||||
\`\`\`
|
||||
cd ${nerveRoot}/senses/<name> && pnpm install --no-cache && pnpm build
|
||||
cd ${nerveRoot} && pnpm run build
|
||||
\`\`\`
|
||||
Must produce \`index.js\` at sense root without errors.
|
||||
(or \`npm run build\` per root \`package.json\`.) Must produce \`${nerveRoot}/dist/senses/<name>/index.js\` without errors.
|
||||
|
||||
3. **Config check** — \`nerve validate\` passes, confirming nerve.yaml is valid.
|
||||
|
||||
|
||||
@@ -10,7 +10,7 @@ export type CoderMeta = z.infer<typeof coderMetaSchema>;
|
||||
|
||||
export function coderPrompt({ threadId }: { threadId: string }): string {
|
||||
return `Read the workflow thread to get the planner's design and any reviewer/tester/committer feedback: \`nerve thread ${threadId}\`
|
||||
Read the nerve-dev skill for workflow file structure and conventions: \`cat node_modules/@uncaged/nerve-skills/nerve-dev/SKILL.md\`
|
||||
Read \`cat AGENT.md\` from the repository root, then \`CONVENTIONS.md\` and \`.knowledge/workflow.md\` if present. Optional skills live under \`.cursor/skills/\`.
|
||||
Also look at existing workflows in the \`workflows/\` directory for patterns.
|
||||
|
||||
## Your task
|
||||
@@ -29,15 +29,13 @@ You do NOT need to finish everything in one pass. You may return \`done: false\`
|
||||
2. Second pass: implement role logic
|
||||
3. Third pass: fix build/lint errors
|
||||
|
||||
## Workflow file structure
|
||||
## Workflow file structure (flat workspace)
|
||||
|
||||
The workspace has **one root** \`package.json\` and **one** root build (\`pnpm run build\` or \`npm run build\`), implemented by \`scripts/build.mjs\`, which emits bundles under \`dist/workflows/<name>/index.js\`. There is **no** per-workflow \`package.json\` or \`tsconfig.json\`.
|
||||
|
||||
Each workflow must have:
|
||||
- \`workflows/<name>/index.ts\` — WorkflowDefinition default export
|
||||
- \`workflows/<name>/build.ts\` — factory function
|
||||
- \`workflows/<name>/moderator.ts\` — moderator + meta types
|
||||
- \`workflows/<name>/roles/<role>.ts\` — meta schema and prompt function per role
|
||||
- \`workflows/<name>/package.json\` — with esbuild build script
|
||||
- \`workflows/<name>/tsconfig.json\` — TypeScript config
|
||||
- \`workflows/<name>/index.ts\` — default export \`WorkflowDefinition\` (moderator and meta types typically live here or are imported from co-located modules)
|
||||
- \`workflows/<name>/roles/<role>.ts\` — one TypeScript file per role (schemas, prompts, \`createRole\` wiring, or plain async role functions)
|
||||
|
||||
For **new workflows**, also update \`nerve.yaml\` with \`workflows.<name>\`.
|
||||
|
||||
@@ -53,7 +51,7 @@ For **new workflows**, also update \`nerve.yaml\` with \`workflows.<name>\`.
|
||||
|
||||
Return \`done: true\` ONLY when ALL of the following are true:
|
||||
- All changes from the plan are implemented
|
||||
- \`cd workflows/<name> && pnpm install --no-cache && pnpm build\` succeeds (run it!)
|
||||
- From the **workspace root**, \`pnpm run build\` or \`npm run build\` succeeds (run it!) so \`dist/workflows/<name>/index.js\` is produced
|
||||
- No lint or type errors remain
|
||||
|
||||
Return \`done: false\` if you made progress but there is still work to do, or if build/lint has errors you plan to fix in the next iteration.`;
|
||||
|
||||
@@ -12,18 +12,18 @@ export function plannerPrompt({ threadId }: { threadId: string }): string {
|
||||
return `You are a Nerve workflow planner. You can **create new workflows** or **modify existing ones**.
|
||||
|
||||
Read the workflow thread for the user's request: \`nerve thread ${threadId}\`
|
||||
Read the nerve-dev skill for workflow conventions: \`cat node_modules/@uncaged/nerve-skills/nerve-dev/SKILL.md\`
|
||||
Read the workspace guide: \`cat AGENT.md\` from the repository root (created by \`nerve init\`). Also read \`CONVENTIONS.md\` if it exists; if \`.knowledge/workflow.md\` exists (e.g. Nerve monorepo), read it for layout and engine behavior. Optional Cursor skills live under \`.cursor/skills/\`.
|
||||
List existing workflows: \`ls workflows/\`
|
||||
|
||||
## Determine the task type
|
||||
|
||||
1. If the user wants to **modify an existing workflow** — read its current code (\`cat workflows/<name>/moderator.ts\`, \`cat workflows/<name>/build.ts\`, \`ls workflows/<name>/roles/\`, etc.) and understand its current structure before planning changes.
|
||||
1. If the user wants to **modify an existing workflow** — read its current code (\`cat workflows/<name>/index.ts\`, \`ls workflows/<name>/roles/\`, \`cat workflows/<name>/roles/<role>.ts\`, etc.) and understand its current structure before planning changes.
|
||||
2. If the user wants to **create a new workflow** — look at existing workflows in \`workflows/\` for patterns to follow.
|
||||
|
||||
## Produce a PLAN (not code) in markdown
|
||||
|
||||
For **new workflows**:
|
||||
- Workflow name (kebab-case)
|
||||
- Workflow name — **verb-first** kebab-case phrase (e.g. \`review-pull-request\`, \`deploy-staging\`), not a bare noun
|
||||
- Roles list (name, purpose, tool)
|
||||
- Flow transitions / moderator routing logic
|
||||
- Validation loops design
|
||||
|
||||
@@ -17,24 +17,22 @@ export function testerPrompt({
|
||||
**IMPORTANT: The Nerve workspace is at \`${nerveRoot}\`. All paths below are relative to this directory. Always \`cd ${nerveRoot}\` first.**
|
||||
|
||||
Read the workflow thread for context: \`nerve thread ${threadId}\`
|
||||
Read the nerve-dev skill for expected file structure: \`cat ${nerveRoot}/node_modules/@uncaged/nerve-skills/nerve-dev/SKILL.md\`
|
||||
Read \`cat ${nerveRoot}/AGENT.md\`, then \`${nerveRoot}/CONVENTIONS.md\` and \`${nerveRoot}/.knowledge/workflow.md\` if they exist.
|
||||
|
||||
Get the workflow name from the thread (the planner's output).
|
||||
|
||||
Verify the full lifecycle in this order:
|
||||
|
||||
1. **File check** — all required workflow files exist (under \`${nerveRoot}/\`):
|
||||
1. **File check** — all required workflow sources exist (under \`${nerveRoot}/\`):
|
||||
- \`workflows/<name>/index.ts\`
|
||||
- \`workflows/<name>/build.ts\`
|
||||
- \`workflows/<name>/moderator.ts\`
|
||||
- \`workflows/<name>/roles/\` with one \`.ts\` file per role
|
||||
- \`workflows/<name>/package.json\`
|
||||
- \`workflows/<name>/roles/\` with one \`.ts\` file per role (flat files, not per-role packages)
|
||||
- **No** \`workflows/<name>/package.json\` or \`tsconfig.json\` expected
|
||||
|
||||
2. **Build** — run inside the workflow directory:
|
||||
2. **Build** — from the workspace root:
|
||||
\`\`\`
|
||||
cd ${nerveRoot}/workflows/<name> && pnpm install --no-cache && pnpm build
|
||||
cd ${nerveRoot} && pnpm run build
|
||||
\`\`\`
|
||||
Must produce \`dist/index.js\` without errors.
|
||||
(or \`npm run build\` if that is what the root \`package.json\` defines.) Must produce \`${nerveRoot}/dist/workflows/<name>/index.js\` without errors.
|
||||
|
||||
3. **Config check** — \`cd ${nerveRoot} && nerve validate\` passes, confirming nerve.yaml is valid.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user