Compare commits

..

3 Commits

Author SHA1 Message Date
xiaomo e67ddc58d8 fix: address review feedback (星月)
1. trySendSync: wrap child.send in try/catch — IPC race between connected check and send
2. gracefulStop: same try/catch for shutdown send
3. Remove crashTimestamps reset on ready — crash window detection was being bypassed
2026-04-30 13:41:31 +00:00
xiaomo 4dffcb636b fix: resolve 2 failing tests after WorkerRuntime migration
- Add trySendSync() for synchronous send when worker is ready+connected
- sendCompute uses sync path first, async fallback for cold start
- Add forwardStderr, allowRespawn, hasDisconnectedChild, onReady(key,msg)
- Tests: add connected:true to mocks, flush async fork microtasks
- All 167 daemon tests pass
2026-04-30 13:34:10 +00:00
xiaomo c34ec46416 feat(daemon): WorkerRuntime — generic message-routed process manager (closes #280)
RFC-006 Phase 1: ManagedWorker state machine + WorkerRuntime<K> with
cold start, crash respawn, drain/evict, graceful shutdown.
8 test cases covering all lifecycle scenarios.
2026-04-30 13:09:19 +00:00
15 changed files with 768 additions and 858 deletions
+1 -1
View File
@@ -10,7 +10,7 @@
},
"main": "dist/index.js",
"types": "dist/index.d.ts",
"files": ["dist", "skills"],
"files": ["dist"],
"publishConfig": {
"access": "public"
},
-505
View File
@@ -1,505 +0,0 @@
---
name: nerve
version: 0.5.0
description: >
Nerve — AI agent 观测引擎。掌握 nerve 的核心概念、CLI 操作、sense/workflow 开发。
加载此 skill 后你可以:查看系统状态、监控 sense、触发 workflow、开发新 sense 和 workflow。
metadata:
hermes:
tags: [nerve, sense, workflow, monitoring, agent-kernel]
homepage: https://git.shazhou.work/uncaged/nerve
---
# Nerve — AI Agent 观测引擎
Nerve 是一个轻量级观测引擎守护进程。它持续观测外部状态,通过声明式规则响应变化,编排多步骤工作流。
## 核心架构
```
External World → Sense → Signal → Workflow → Log
```
| 概念 | 说明 |
|------|------|
| **Sense** | 观测函数,`compute()` 采样或推导数据。返回非 null 则发出 Signal,可选触发 Workflow。每个 Sense 有独立 SQLite 数据库。 |
| **Signal** | Sense 返回非 null 时发出的通知。纯事实,无意图。通过内存 Signal Bus 分发,不持久化。 |
| **Workflow** | 有状态的多步骤执行。包含 Role(有副作用的执行者)和 Moderator(纯路由器)。每个实例是一个 Thread,有唯一 runId。 |
| **Log** | 不可变审计日志。记录执行、状态转换、错误。不能触发 Sense(防止反馈循环)。 |
| **Engine** | 内核,持有 Signal Bus、Process Manager、Workflow Manager。不直接加载用户代码。 |
| **Daemon** | 引擎运行时,作为后台进程运行。 |
**关键规则:**
- 因果链单向:External → Sense → Signal → Workflow + Log
- 进程隔离:每个 Sense group 一个 worker(长期),每个 Workflow 类型一个 worker(按需)
- 两个扩展点:Sense(观测什么 + 何时)、Workflow(做什么)
## 工作区结构
```
~/.uncaged-nerve/ # 默认工作区(nerve init 创建)
├── nerve.yaml # 核心配置
├── senses/
│ └── <name>/
│ ├── src/index.ts # exports compute() + table
│ ├── src/schema.ts # drizzle 表定义
│ └── migrations/ # SQL 迁移
├── workflows/
│ └── <name>/
│ ├── index.ts # exports WorkflowDefinition
│ └── roles/<role>/
│ ├── index.ts # role 实现
│ └── prompt.md # 可选 system prompt
└── data/ # 运行时数据(SQLite、blobs)
```
---
## CLI 完整参考
全局选项:`--host <host:port>`(连接远程 daemon)、`--api-token <secret>`(Bearer 认证)
### 初始化与脚手架
```bash
nerve init # 初始化工作区
nerve init --from <git-url> # 从 git 仓库克隆工作区
nerve init workspace # 只初始化工作区结构
nerve create sense <name> # 创建 sense 脚手架
nerve create sense <name> --force # 覆盖已有
nerve create workflow <name> # 创建 workflow 脚手架
nerve create workflow <name> --force
nerve validate # 验证 nerve.yaml 配置
```
### Daemon 管理
```bash
nerve daemon start # 启动后台 daemon
nerve daemon start --port 3000 # 指定 HTTP API 端口
nerve daemon stop # 停止 daemon
nerve daemon restart # 重启
nerve daemon status # 查看状态
nerve daemon logs # 查看日志
nerve daemon logs --follow # 实时日志
nerve daemon logs --n 50 # 最近 50 行
nerve dev # 前台开发模式(不 fork daemon)
nerve dev --port 3000 # 指定端口
```
### Sense 操作
```bash
nerve sense list # 列出所有注册的 sense
nerve sense trigger <name> # 手动触发 sense 计算
nerve sense schema <name> # 查看 sense 数据库表结构
nerve sense schema <name> --json # JSON 格式
nerve sense query <name> <sql> # 对 sense 数据库执行只读 SQL
nerve sense query <name> "SELECT * FROM samples ORDER BY ts DESC LIMIT 10" --json
```
### Workflow 操作
```bash
nerve workflow list # 列出 nerve.yaml 中定义的 workflow
nerve workflow status # 查看运行中的 workflow 状态
nerve workflow trigger <name> # 触发 workflow
nerve workflow trigger <name> --prompt "检查生产环境"
nerve workflow trigger <name> --maxRounds 50
nerve workflow trigger <name> --dryRun # 干跑模式
```
### Thread(Workflow 执行记录)
```bash
nerve thread list # 列出最近的 workflow 执行
nerve thread list --all # 包含已完成/失败的
nerve thread list --workflow <name> # 按 workflow 过滤
nerve thread list --limit 50 # 最多 50 条
nerve thread show <runId> # 查看 role 对话轮次
nerve thread show <runId> --budget 16000 # 增大输出预算(默认 8000 字符)
nerve thread inspect <runId> # 查看详情和事件
nerve thread kill <runId> # 终止运行中/排队中的 thread
```
### Store(日志归档)
```bash
nerve store archive # 导出旧日志到 JSONL 归档
nerve store archive --vacuum # 归档后 VACUUM 数据库
```
### Knowledge(知识库)
```bash
nerve knowledge sync # 从 knowledge.yaml 重建索引
nerve knowledge query "搜索内容" # 搜索知识库
nerve knowledge query "内容" --limit 5
nerve knowledge query "内容" -g # 搜索所有注册仓库
```
### Remote(远程 daemon)
```bash
nerve remote add <name> <host:port> --token <secret>
nerve remote list
nerve remote show <name>
nerve remote set-url <name> <host>
nerve remote set-token <name> <token>
nerve remote remove <name>
nerve remote default <name> # 设为默认远程
```
---
## nerve.yaml 配置参考
```yaml
# 引擎全局配置
max_rounds: 100 # moderator 最大轮次(默认 100)
# Sense 配置
senses:
cpu-usage:
group: system # 必填,同 group 的 sense 共享 worker
interval: 10s # 轮询间隔(duration: 5s, 10m, 1h)
throttle: 5s # 最小计算间隔
timeout: 10s # compute 超时
grace_period: null # 优雅关闭等待
retention: 10000 # _signals 表最大行数(默认 10000)
system-health:
group: derived
on: [cpu-usage, disk-usage] # 响应式:被列出的 sense 发出 signal 时触发
throttle: null
timeout: null
# Workflow 配置
workflows:
my-workflow:
concurrency: 1 # 必填,并发数
overflow: drop # 必填,超并发时处理:drop | queue
max_queue: 100 # overflow=queue 时的队列上限(默认 100)
# HTTP API
api:
port: 3000 # null = 不启用 HTTP
host: "127.0.0.1" # 监听地址
token: null # 非 loopback 时必填
# LLM Extract(可选)
extract:
provider: anthropic
model: claude-sonnet-4-20250514
```
---
## Sense 开发指南
### compute 函数签名
```typescript
import type { LibSQLDatabase } from "drizzle-orm/libsql";
import type { ComputeResult, WorkflowTrigger } from "@uncaged/nerve-core";
export async function compute(
db: LibSQLDatabase, // 此 sense 的 Drizzle ORM 数据库
peers: Record<string, LibSQLDatabase>, // 其他 sense 的数据库(只读)
options: { signal: AbortSignal }, // 超时 abort signal
): Promise<ComputeResult<T>>
```
### 返回值
```typescript
// 返回 null = 静默,不发 signal
// 返回非 null = 发出 signal,可选触发 workflow
type ComputeResult<T> =
| null
| { signal: T; workflow: WorkflowTrigger | null };
type WorkflowTrigger = {
name: string; // workflow 名称(对应 nerve.yaml 中的 key)
maxRounds: number; // moderator 最大轮次
prompt: string; // 初始 prompt
dryRun: boolean; // 干跑模式
};
```
### Sense 模块导出
```typescript
// senses/<name>/src/index.ts
import type { SenseModule, ComputeResult } from "@uncaged/nerve-core";
import { table } from "./schema.js";
export async function compute(
db: LibSQLDatabase,
_peers: Record<string, LibSQLDatabase>,
_options: { signal: AbortSignal },
): Promise<ComputeResult<number>> {
const value = Math.random(); // 替换为真实观测逻辑
await db.insert(table).values({ ts: Date.now(), value });
return { signal: value, workflow: null };
}
export { table };
```
### Schema 定义
```typescript
// senses/<name>/src/schema.ts
import { sqliteTable, integer, real } from "drizzle-orm/sqlite-core";
export const table = sqliteTable("samples", {
ts: integer("ts").notNull(),
value: real("value").notNull(),
});
```
### 调度方式
1. **interval 轮询**`interval: 10s` — 每 10 秒执行一次
2. **响应式触发**`on: [cpu-usage]` — 当 cpu-usage 发出 signal 时触发
3. 两者可以组合
### 调试
```bash
nerve dev # 前台运行,看实时输出
nerve sense trigger <name> # 手动触发一次
nerve sense query <name> "SELECT * FROM samples ORDER BY ts DESC LIMIT 5"
```
### 完整示例:CPU 监控
```typescript
// senses/cpu-usage/src/schema.ts
import { sqliteTable, integer, real } from "drizzle-orm/sqlite-core";
export const table = sqliteTable("samples", {
ts: integer("ts").notNull(),
value: real("value").notNull(),
});
// senses/cpu-usage/src/index.ts
import os from "node:os";
import type { LibSQLDatabase } from "drizzle-orm/libsql";
import type { ComputeResult } from "@uncaged/nerve-core";
import { table } from "./schema.js";
export async function compute(
db: LibSQLDatabase,
_peers: Record<string, LibSQLDatabase>,
_options: { signal: AbortSignal },
): Promise<ComputeResult<number>> {
const oneMin = os.loadavg()[0];
await db.insert(table).values({ ts: Date.now(), value: oneMin });
return { signal: oneMin, workflow: null };
}
export { table };
```
nerve.yaml:
```yaml
senses:
cpu-usage:
group: system
interval: 10s
throttle: 5s
timeout: 10s
retention: 10000
```
---
## Workflow 开发指南
### 核心类型
```typescript
import type {
WorkflowDefinition,
RoleResult,
ThreadContext,
StartStep,
RoleStep,
} from "@uncaged/nerve-core";
import { END } from "@uncaged/nerve-core";
// Role:执行者,接收上下文返回结果
type Role<Meta> = (ctx: ThreadContext) => Promise<RoleResult<Meta>>;
type RoleResult<Meta> = { content: string; meta: Meta };
// Moderator:路由器,决定下一个 role 或结束
type Moderator<M> = (ctx: ThreadContext<M>) => (keyof M & string) | typeof END;
// ThreadContext:对话上下文
type ThreadContext<M = RoleMeta> = {
threadId: string;
start: StartStep; // 初始 prompt(role: "__start__")
steps: RoleStep<M>[]; // 所有 role 的执行记录
};
// WorkflowDefinition:完整定义
type WorkflowDefinition<M> = {
name: string;
roles: { [K in keyof M & string]: Role<M[K]> };
moderator: Moderator<M>;
};
```
### 基本 Workflow 示例
```typescript
// workflows/example/index.ts
import type { RoleResult, ThreadContext, WorkflowDefinition } from "@uncaged/nerve-core";
import { END } from "@uncaged/nerve-core";
type Meta = Record<"main", { round: number }>;
async function main(ctx: ThreadContext): Promise<RoleResult<{ round: number }>> {
const prompt = ctx.start.content;
return {
content: `处理完成: ${prompt}`,
meta: { round: ctx.steps.length },
};
}
const workflow: WorkflowDefinition<Meta> = {
name: "example",
roles: { main },
moderator(ctx: ThreadContext<Meta>) {
// 执行一次 main 就结束
return ctx.steps.length === 0 ? "main" : END;
},
};
export default workflow;
```
### 多 Role Workflow 示例
```typescript
import type { WorkflowDefinition, RoleResult, ThreadContext } from "@uncaged/nerve-core";
import { END } from "@uncaged/nerve-core";
type Roles = Record<"planner" | "executor" | "reviewer", { status: string }>;
async function planner(ctx: ThreadContext): Promise<RoleResult<{ status: string }>> {
return { content: "计划: ...", meta: { status: "planned" } };
}
async function executor(ctx: ThreadContext): Promise<RoleResult<{ status: string }>> {
return { content: "执行: ...", meta: { status: "executed" } };
}
async function reviewer(ctx: ThreadContext): Promise<RoleResult<{ status: string }>> {
return { content: "审核通过", meta: { status: "approved" } };
}
const workflow: WorkflowDefinition<Roles> = {
name: "plan-execute-review",
roles: { planner, executor, reviewer },
moderator(ctx: ThreadContext<Roles>) {
if (ctx.steps.length === 0) return "planner";
const last = ctx.steps[ctx.steps.length - 1];
if (last.role === "planner") return "executor";
if (last.role === "executor") return "reviewer";
return END;
},
};
export default workflow;
```
### Agent 适配器
Workflow role 可以集成 AI agent。已知适配器 ID:`echo``cursor``hermes``codex`
```typescript
type AgentFn = (ctx: ThreadContext, systemPrompt: string) => Promise<string>;
```
### Workflow 运行状态
`queued``started``completed` | `failed` | `crashed` | `killed` | `interrupted` | `dropped`
---
## 日常操作 Pattern
### 查看系统整体状态
```bash
nerve daemon status # daemon 是否在运行
nerve sense list # 所有 sense 及其调度配置
nerve workflow status # 运行中的 workflow
nerve thread list # 最近的 workflow 执行记录
```
### 检查某个 sense 的历史数据
```bash
nerve sense query cpu-usage "SELECT * FROM samples ORDER BY ts DESC LIMIT 10" --json
nerve sense schema cpu-usage # 查看表结构
```
### 手动触发 workflow
```bash
nerve workflow trigger my-workflow --prompt "手动检查"
nerve thread list --workflow my-workflow # 查看执行状态
nerve thread show <runId> # 查看对话详情
```
### 排查 sense 报错
```bash
nerve daemon logs --follow # 查看实时日志
nerve sense trigger <name> # 手动触发看报错
nerve dev # 前台模式,更详细的输出
```
### 开发新 sense
```bash
nerve create sense my-sensor # 脚手架
# 编辑 senses/my-sensor/src/index.ts 和 schema.ts
nerve validate # 验证配置
nerve dev # 前台测试
nerve sense trigger my-sensor # 单次触发验证
nerve sense query my-sensor "SELECT * FROM ..." # 检查数据
```
### 开发新 workflow
```bash
nerve create workflow my-flow # 脚手架
# 编辑 workflows/my-flow/index.ts 和 roles/
nerve validate # 验证配置
nerve workflow trigger my-flow --prompt "测试" --dryRun # 干跑
nerve thread show <runId> # 查看执行轨迹
```
---
## Pitfalls
- **Sense 返回值**:返回 `null` 表示静默(不发 signal);返回 `{ signal, workflow }` 才发 signal。不要返回 undefined。
- **no optional properties**:nerve 代码规范禁止 `?:`,用 `T | null` 代替。
- **函数式风格**:用 `function` + `type`,不用 `class` + `interface`
- **workflow 用 default export**:这是唯一允许 default export 的场景。
- **_signals 表**:每个 sense 自动有 `_signals` 表记录 signal 历史,受 `retention` 配置限制。
- **peers 只读**:sense 的 `peers` 参数提供其他 sense 数据库的只读访问,不要写入。
- **concurrency + overflow**:workflow 必须配置并发策略,否则验证失败。
- **moderator 是同步函数**:不要加 async,moderator 是纯路由逻辑,不能有副作用。
-2
View File
@@ -3,7 +3,6 @@ import "@uncaged/nerve-daemon/experimental-warning-suppression.js";
import { defineCommand, runMain } from "citty";
import { consumeGlobalDaemonCliFlags } from "./cli-global.js";
import { agentCommand } from "./commands/agent.js";
import { createCommand } from "./commands/create.js";
import { daemonCommand } from "./commands/daemon.js";
import { devCommand } from "./commands/dev.js";
@@ -43,7 +42,6 @@ const main = defineCommand({
"Nerve — an AI agent kernel. Global options: --host <host:port> (remote HTTP), --api-token <secret> (Bearer auth).",
},
subCommands: {
agent: agentCommand,
init: initCommand,
create: createCommand,
daemon: daemonCommand,
-225
View File
@@ -1,225 +0,0 @@
import {
cpSync,
existsSync,
mkdirSync,
readFileSync,
readdirSync,
rmSync,
writeFileSync,
} from "node:fs";
import { homedir } from "node:os";
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import { defineCommand } from "citty";
const CLI_VERSION = "0.5.0";
function getSkillSourceDir(): string {
const thisFile = fileURLToPath(import.meta.url);
let dir = dirname(thisFile);
for (let i = 0; i < 5; i++) {
if (existsSync(join(dir, "skills"))) return join(dir, "skills");
dir = dirname(dir);
}
throw new Error("Cannot locate skills directory. Is the CLI package intact?");
}
function getHermesSkillDir(profile: string | null): string {
const hermesHome = join(homedir(), ".hermes");
if (profile !== null) {
return join(hermesHome, "profiles", profile, "skills", "nerve");
}
return join(hermesHome, "skills", "nerve");
}
function readVersionFile(skillDir: string): string | null {
const versionPath = join(skillDir, ".nerve-version");
if (!existsSync(versionPath)) return null;
return readFileSync(versionPath, "utf8").trim();
}
function writeVersionFile(skillDir: string, version: string): void {
writeFileSync(join(skillDir, ".nerve-version"), `${version}\n`, "utf8");
}
function injectHermes(profile: string | null): void {
const sourceDir = join(getSkillSourceDir(), "hermes");
const targetDir = getHermesSkillDir(profile);
const existing = readVersionFile(targetDir);
if (existing === CLI_VERSION) {
const loc = profile !== null ? ` (profile: ${profile})` : "";
process.stdout.write(`✅ Hermes nerve skill is already up to date (v${CLI_VERSION})${loc}\n`);
return;
}
mkdirSync(targetDir, { recursive: true });
cpSync(sourceDir, targetDir, { recursive: true });
writeVersionFile(targetDir, CLI_VERSION);
const action = existing !== null ? "Updated" : "Installed";
const loc = profile !== null ? ` (profile: ${profile})` : "";
process.stdout.write(`${action} Hermes nerve skill v${CLI_VERSION}${loc}\n`);
process.stdout.write(`${targetDir}/SKILL.md\n`);
}
function removeHermes(profile: string | null): void {
const targetDir = getHermesSkillDir(profile);
if (!existsSync(targetDir)) {
process.stdout.write("ℹ️ Hermes nerve skill is not installed.\n");
return;
}
rmSync(targetDir, { recursive: true, force: true });
const loc = profile !== null ? ` (profile: ${profile})` : "";
process.stdout.write(`✅ Removed Hermes nerve skill${loc}\n`);
}
function printStatus(): void {
process.stdout.write(`nerve agent skills (CLI v${CLI_VERSION})\n\n`);
// Default profile
const defaultDir = getHermesSkillDir(null);
const defaultVer = readVersionFile(defaultDir);
printAgentLine("Hermes (default)", defaultVer);
// Named profiles
const profilesDir = join(homedir(), ".hermes", "profiles");
if (existsSync(profilesDir)) {
const profiles = readdirSync(profilesDir, { withFileTypes: true })
.filter((d) => d.isDirectory())
.map((d) => d.name);
for (const profile of profiles) {
const dir = getHermesSkillDir(profile);
const ver = readVersionFile(dir);
if (ver !== null) {
printAgentLine(`Hermes (${profile})`, ver);
}
}
}
process.stdout.write("\n");
}
function printAgentLine(label: string, version: string | null): void {
if (version === null) {
process.stdout.write(` ${label}: ❌ not installed\n`);
} else if (version === CLI_VERSION) {
process.stdout.write(` ${label}: ✅ v${version}\n`);
} else {
process.stdout.write(
` ${label}: ⚠️ v${version} → v${CLI_VERSION} available (run \`nerve agent update\`)\n`,
);
}
}
const injectCommand = defineCommand({
meta: {
name: "inject",
description: "Inject nerve skill into an AI agent",
},
args: {
target: {
type: "positional",
description: "Agent target: hermes",
},
profile: {
type: "string",
description: "Hermes profile name (default: main profile)",
},
},
run({ args }) {
if (args.target !== "hermes") {
process.stderr.write(`❌ Unknown agent target: ${args.target}\n`);
process.stderr.write(" Supported targets: hermes\n");
process.exit(1);
}
injectHermes(args.profile ?? null);
},
});
const updateCommand = defineCommand({
meta: {
name: "update",
description: "Update all injected nerve skills to current CLI version",
},
run() {
let updated = 0;
// Default profile
const defaultDir = getHermesSkillDir(null);
if (existsSync(defaultDir)) {
injectHermes(null);
updated++;
}
// Named profiles
const profilesDir = join(homedir(), ".hermes", "profiles");
if (existsSync(profilesDir)) {
const profiles = readdirSync(profilesDir, { withFileTypes: true })
.filter((d) => d.isDirectory())
.map((d) => d.name);
for (const profile of profiles) {
const dir = getHermesSkillDir(profile);
if (existsSync(dir)) {
injectHermes(profile);
updated++;
}
}
}
if (updated === 0) {
process.stdout.write("ℹ️ No injected skills found. Run `nerve agent inject hermes` first.\n");
}
},
});
const removeCommand = defineCommand({
meta: {
name: "remove",
description: "Remove injected nerve skill from an AI agent",
},
args: {
target: {
type: "positional",
description: "Agent target: hermes",
},
profile: {
type: "string",
description: "Hermes profile name (default: main profile)",
},
},
run({ args }) {
if (args.target !== "hermes") {
process.stderr.write(`❌ Unknown agent target: ${args.target}\n`);
process.stderr.write(" Supported targets: hermes\n");
process.exit(1);
}
removeHermes(args.profile ?? null);
},
});
const statusCommand = defineCommand({
meta: {
name: "status",
description: "Show injection status of nerve skills across agents",
},
run() {
printStatus();
},
});
export const agentCommand = defineCommand({
meta: {
name: "agent",
description: "Manage nerve skill injection for AI agents",
},
subCommands: {
inject: injectCommand,
update: updateCommand,
remove: removeCommand,
status: statusCommand,
},
});
@@ -0,0 +1,9 @@
// Ready then crashes on a timer; still echoes IPC so parent tests can send after respawn
process.on("message", (msg) => {
if (msg && msg.type === "shutdown") {
process.exit(0);
}
process.send({ type: "echo", payload: msg });
});
process.send({ type: "ready" });
setTimeout(() => process.exit(1), 50);
@@ -0,0 +1,9 @@
// Simple test worker: sends ready, echoes messages, handles shutdown
process.on("message", (msg) => {
if (msg && msg.type === "shutdown") {
process.exit(0);
}
// Echo back with 'echo' type
process.send({ type: "echo", payload: msg });
});
process.send({ type: "ready" });
@@ -0,0 +1,9 @@
// Like echo-worker but writes stderr for tail diagnostics
console.error("stderr-marker");
process.on("message", (msg) => {
if (msg && msg.type === "shutdown") {
process.exit(0);
}
process.send({ type: "echo", payload: msg });
});
process.send({ type: "ready" });
@@ -70,6 +70,14 @@ const { createKernel } = await import("../kernel.js");
// Helpers
// ---------------------------------------------------------------------------
/** Sense worker `fork` runs on the next microtask per scheduled `start`. */
async function flushSenseWorkerForkMicrotasks(kernel: { groups: Set<string> }): Promise<void> {
const n = kernel.groups.size;
for (let i = 0; i < n; i++) {
await Promise.resolve();
}
}
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
return {
senses: {
@@ -142,6 +150,8 @@ describe("kernel — getHealth", () => {
},
});
const kernel = createKernel(config, nerveRoot);
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
const health = kernel.getHealth();
expect(health.activeSenses).toBe(3);
@@ -171,6 +181,8 @@ describe("kernel — restartGroup", () => {
it("sends shutdown to old worker and spawns new one", async () => {
const config = makeConfig();
const kernel = createKernel(config, nerveRoot);
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
expect(mockChildren.length).toBe(1);
const oldChild = mockChildren[0];
@@ -178,6 +190,7 @@ describe("kernel — restartGroup", () => {
const restartPromise = kernel.restartGroup("system");
// The shutdown message triggers exit in the mock
await restartPromise;
await vi.runAllTimersAsync();
// A new child should have been spawned
expect(mockChildren.length).toBe(2);
@@ -191,6 +204,8 @@ describe("kernel — restartGroup", () => {
it("restartGroup on unknown group does nothing", async () => {
const config = makeConfig();
const kernel = createKernel(config, nerveRoot);
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
expect(mockChildren.length).toBe(1);
await kernel.restartGroup("nonexistent");
@@ -218,6 +233,8 @@ describe("kernel — reloadConfig", () => {
it("adds new group worker when new sense group appears", async () => {
const config = makeConfig();
const kernel = createKernel(config, nerveRoot);
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
expect(mockChildren.length).toBe(1); // only system group
expect(kernel.groups.has("network")).toBe(false);
@@ -249,6 +266,9 @@ describe("kernel — reloadConfig", () => {
api: { port: null, token: null, host: "127.0.0.1" },
});
await Promise.resolve();
await vi.runAllTimersAsync();
expect(kernel.groups.has("network")).toBe(true);
expect(mockChildren.length).toBe(2); // system + network
@@ -283,6 +303,8 @@ describe("kernel — reloadConfig", () => {
api: { port: null, token: null, host: "127.0.0.1" },
};
const kernel = createKernel(config, nerveRoot);
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
expect(mockChildren.length).toBe(2);
expect(kernel.groups.has("network")).toBe(true);
@@ -308,6 +330,7 @@ describe("kernel — reloadConfig", () => {
});
expect(kernel.groups.has("network")).toBe(false);
await vi.runAllTimersAsync();
// Network child should have received shutdown
expect(networkChild.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" }));
@@ -317,6 +340,8 @@ describe("kernel — reloadConfig", () => {
it("health reflects updated sense count after reloadConfig", async () => {
const config = makeConfig();
const kernel = createKernel(config, nerveRoot);
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
expect(kernel.getHealth().activeSenses).toBe(1);
@@ -29,6 +29,9 @@ type MockChild = EventEmitter & {
function makeMockChild(pid = 1): MockChild {
const child = new EventEmitter() as MockChild;
child.connected = true;
setImmediate(() => {
child.emit("message", { type: "ready" });
});
child.send = vi.fn((msg: unknown) => {
if (
msg !== null &&
@@ -136,6 +139,7 @@ describe("kernel.triggerSense()", () => {
logStore: makeMockLogStore() as never,
});
await vi.runAllTimersAsync();
expect(() => kernel.triggerSense("no-such-sense")).toThrow(/Unknown sense/);
await kernel.stop();
@@ -169,6 +173,7 @@ describe("kernel.triggerSense()", () => {
logStore: makeMockLogStore() as never,
});
await vi.runAllTimersAsync();
// Two groups → two workers
expect(mockChildren.length).toBe(2);
@@ -214,6 +219,7 @@ describe("kernel.triggerSense()", () => {
logStore: makeMockLogStore() as never,
});
await vi.runAllTimersAsync();
// Both senses share the "system" group → one worker only
expect(mockChildren.length).toBe(1);
const worker = mockChildren[0];
@@ -237,6 +243,7 @@ describe("kernel.triggerSense()", () => {
logStore: makeMockLogStore() as never,
});
await new Promise<void>((resolve) => setImmediate(resolve));
const worker = mockChildren[0];
worker.connected = false;
@@ -102,6 +102,13 @@ function makeLogStore() {
};
}
async function flushSenseWorkerForkMicrotasks(kernel: { groups: Set<string> }): Promise<void> {
const n = kernel.groups.size;
for (let i = 0; i < n; i++) {
await Promise.resolve();
}
}
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
return {
senses: {
@@ -164,6 +171,8 @@ describe("kernel + workflowManager integration", () => {
workerScript: "fake-worker.js",
logStore,
});
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
// Simulate a sense worker sending a signal with workflow launch payload
// The kernel's handleWorkerMessage processes "signal" type messages
@@ -222,6 +231,8 @@ describe("kernel + workflowManager integration", () => {
workerScript: "fake-worker.js",
logStore,
});
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
// Simulate sense worker returning a signal plus workflow launch
const workerPool = mockChildren[0];
@@ -275,6 +286,8 @@ describe("kernel + workflowManager integration", () => {
workerScript: "fake-worker.js",
logStore,
});
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
const workerPool = mockChildren[0];
if (workerPool) {
@@ -337,6 +350,8 @@ describe("kernel + workflowManager integration", () => {
workerScript: "fake-worker.js",
logStore,
});
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
// Emit a regular signal (shorthand payload) — should NOT trigger any workflow
const workerPool = mockChildren[0];
@@ -387,6 +402,8 @@ describe("kernel + workflowManager integration", () => {
workerScript: "fake-worker.js",
logStore,
});
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
// Simulate sense compute returning a signal plus workflow launch
const workerPool = mockChildren[0];
@@ -440,6 +457,8 @@ describe("kernel + workflowManager integration", () => {
workerScript: "fake-worker.js",
logStore,
});
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
// Reload with a workflow added
const newConfig: NerveConfig = {
@@ -517,6 +536,8 @@ describe("kernel + workflowManager integration", () => {
workerScript: "fake-worker.js",
logStore,
});
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
// Reload with the workflow removed
const newConfig: NerveConfig = {
@@ -600,6 +621,8 @@ describe("kernel + workflowManager integration", () => {
workerScript: "fake-worker.js",
logStore,
});
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
// Trigger a workflow via sense compute return value
const workerPool = mockChildren[0];
@@ -664,6 +687,8 @@ describe("kernel + workflowManager integration", () => {
workerScript: "fake-worker.js",
logStore,
});
await flushSenseWorkerForkMicrotasks(kernel);
await vi.runAllTimersAsync();
const health = kernel.getHealth();
expect(health).toHaveProperty("activeWorkflows");
+22 -2
View File
@@ -16,10 +16,12 @@ type MockChild = EventEmitter & {
send: ReturnType<typeof vi.fn>;
kill: ReturnType<typeof vi.fn>;
pid: number;
connected: boolean;
};
function makeMockChild(pid = 1): MockChild {
const child = new EventEmitter() as MockChild;
child.connected = true;
setImmediate(() => {
child.emit("message", { type: "ready" });
});
@@ -27,7 +29,10 @@ function makeMockChild(pid = 1): MockChild {
if (msg === null || typeof msg !== "object") return;
const m = msg as Record<string, unknown>;
if (m.type === "shutdown") {
setImmediate(() => child.emit("exit", 0, null));
setImmediate(() => {
child.connected = false;
child.emit("exit", 0, null);
});
return;
}
if (m.type === "compute" && typeof m.sense === "string") {
@@ -37,6 +42,7 @@ function makeMockChild(pid = 1): MockChild {
}
});
child.kill = vi.fn((_signal?: string) => {
child.connected = false;
child.emit("exit", null, _signal ?? "SIGKILL");
});
child.pid = pid;
@@ -59,6 +65,14 @@ const { createLogStore } = await import("@uncaged/nerve-store");
// Helpers
// ---------------------------------------------------------------------------
/** `WorkerRuntime.start` schedules `fork` on the next microtask — flush one tick per initial group. */
async function flushSenseWorkerForkMicrotasks(kernel: { groups: Set<string> }): Promise<void> {
const n = kernel.groups.size;
for (let i = 0; i < n; i++) {
await Promise.resolve();
}
}
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
return {
senses: {
@@ -173,6 +187,7 @@ describe("kernel — message routing", () => {
},
});
const kernel = createKernel(config, nerveRoot);
await flushSenseWorkerForkMicrotasks(kernel);
const child = mockChildren[0];
child.emit("message", { type: "error", sense: "cpu-usage", error: "compute failed" });
@@ -201,6 +216,7 @@ describe("kernel — message routing", () => {
},
});
const kernel = createKernel(config, nerveRoot);
await flushSenseWorkerForkMicrotasks(kernel);
const child = mockChildren[0];
const callsBefore = stderrSpy.mock.calls.length;
@@ -228,6 +244,7 @@ describe("kernel — message routing", () => {
},
});
const kernel = createKernel(config, nerveRoot);
await flushSenseWorkerForkMicrotasks(kernel);
const child = mockChildren[0];
expect(() => child.emit("message", { type: "unknown-type" })).not.toThrow();
@@ -290,6 +307,7 @@ describe("kernel — groupForSense mapping", () => {
api: { port: null, token: null, host: "127.0.0.1" },
};
const kernel = createKernel(config, nerveRoot);
await flushSenseWorkerForkMicrotasks(kernel);
// system and network = 2 unique groups
expect(mockChildren.length).toBe(2);
@@ -311,8 +329,10 @@ describe("kernel — groupForSense mapping", () => {
},
});
const kernel = createKernel(config, nerveRoot);
await flushSenseWorkerForkMicrotasks(kernel);
const child = mockChildren[0];
child.emit("message", { type: "ready" });
vi.advanceTimersByTime(500);
expect(child.send).toHaveBeenCalledWith(
@@ -50,6 +50,7 @@ async function startWorkerWithReady(
group: string,
): Promise<void> {
const pr = pool.startWorker(group);
await Promise.resolve();
const child = mockChildren[mockChildren.length - 1];
child.emit("message", { type: "ready" });
await pr;
@@ -137,6 +138,7 @@ describe("createSenseWorkerPool", () => {
expect(pool.activeGroupCount()).toBe(1);
pool.evictGroup("x");
expect(pool.hasWorkerForGroup("x")).toBe(false);
await Promise.resolve();
expect(mockChildren[0].send).toHaveBeenCalledWith(
expect.objectContaining({ type: "shutdown" }),
);
@@ -159,6 +161,7 @@ describe("createSenseWorkerPool", () => {
const p = pool.restartGroup("g");
expect(onBeforeGroupRestart).toHaveBeenCalledWith("g");
await Promise.resolve();
expect(mockChildren[0].send).toHaveBeenCalledWith(
expect.objectContaining({ type: "shutdown" }),
);
@@ -171,7 +174,7 @@ describe("createSenseWorkerPool", () => {
});
it("onWorkerCrashed runs and schedules respawn after non-zero exit", async () => {
vi.useFakeTimers({ shouldAdvanceTime: true });
vi.useFakeTimers();
const onWorkerCrashed = vi.fn();
const pool = createSenseWorkerPool({
nerveRoot: "/tmp/n",
@@ -0,0 +1,180 @@
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import { afterEach, describe, expect, it, vi } from "vitest";
import { createWorkerRuntime } from "../worker-runtime.js";
const fixturesDir = join(dirname(fileURLToPath(import.meta.url)), "fixtures");
const echoWorkerPath = join(fixturesDir, "echo-worker.js");
const crashWorkerPath = join(fixturesDir, "crash-worker.js");
const stderrWorkerPath = join(fixturesDir, "stderr-worker.js");
function baseConfig(script: string) {
return {
script,
argsForKey: () => [],
forwardStderr: true,
onMessage: vi.fn(),
onReady: vi.fn(),
onExit: vi.fn(),
respawn: {
enabled: true,
maxCrashes: 6,
windowMs: 60_000,
delayMs: 80,
allowRespawn: null,
},
shutdownTimeoutMs: 5000,
};
}
describe("createWorkerRuntime", () => {
const runtimes: Array<{ shutdown: () => Promise<void> }> = [];
afterEach(async () => {
await Promise.all(runtimes.splice(0).map((r) => r.shutdown()));
});
function track<R extends { shutdown: () => Promise<void> }>(r: R): R {
runtimes.push(r);
return r;
}
it("start + send message + receive echo", async () => {
const incoming: unknown[] = [];
const rt = track(
createWorkerRuntime({
...baseConfig(echoWorkerPath),
onMessage: (_key, msg) => {
incoming.push(msg);
},
}),
);
await rt.start("a");
expect(rt.has("a")).toBe(true);
await rt.send("a", { type: "ping", n: 1 });
await vi.waitFor(() => {
expect(incoming.some((m) => isEchoOf(m, { type: "ping", n: 1 }))).toBe(true);
});
await rt.shutdown();
});
it("cold start on send (no explicit start)", async () => {
const incoming: unknown[] = [];
const rt = track(
createWorkerRuntime({
...baseConfig(echoWorkerPath),
onMessage: (_key, msg) => {
incoming.push(msg);
},
}),
);
expect(rt.has("x")).toBe(false);
await rt.send("x", { type: "hi" });
await vi.waitFor(() => {
expect(rt.has("x")).toBe(true);
expect(incoming.some((m) => isEchoOf(m, { type: "hi" }))).toBe(true);
});
await rt.shutdown();
});
it("evict stops worker; has() is false", async () => {
const rt = track(createWorkerRuntime(baseConfig(echoWorkerPath)));
await rt.start("k");
expect(rt.has("k")).toBe(true);
await rt.evict("k");
expect(rt.has("k")).toBe(false);
await rt.shutdown();
});
it("drain stops and respawns (new pid)", async () => {
const rt = track(createWorkerRuntime(baseConfig(echoWorkerPath)));
await rt.start("k");
const before = rt.pid("k");
expect(before).not.toBeNull();
await rt.drain("k");
const after = rt.pid("k");
expect(after).not.toBeNull();
expect(after).not.toBe(before);
await rt.shutdown();
});
it("crash triggers auto-respawn", async () => {
const incoming: unknown[] = [];
const onExit = vi.fn();
const rt = track(
createWorkerRuntime({
...baseConfig(crashWorkerPath),
onExit,
onMessage: (_key, msg) => {
incoming.push(msg);
},
}),
);
await rt.start("c");
await vi.waitFor(() => expect(onExit.mock.calls.length).toBeGreaterThanOrEqual(1), {
timeout: 3000,
});
await vi.waitFor(() => expect(rt.has("c")).toBe(true), { timeout: 3000 });
await rt.send("c", { type: "after-crash" });
await vi.waitFor(() => {
expect(incoming.some((m) => isEchoOf(m, { type: "after-crash" }))).toBe(true);
});
await rt.shutdown();
});
it("crash limit reached → no more automatic respawns", async () => {
const rt = track(
createWorkerRuntime({
...baseConfig(crashWorkerPath),
respawn: {
enabled: true,
maxCrashes: 2,
windowMs: 60_000,
delayMs: 50,
allowRespawn: null,
},
}),
);
await rt.start("z");
await vi.waitFor(() => expect(rt.has("z")).toBe(false), { timeout: 8000 });
await rt.shutdown();
});
it("shutdown stops all workers", async () => {
const rt = track(createWorkerRuntime(baseConfig(echoWorkerPath)));
await rt.start("a");
await rt.start("b");
expect(rt.keys().sort()).toEqual(["a", "b"].sort());
await rt.shutdown();
expect(rt.keys()).toEqual([]);
expect(rt.has("a")).toBe(false);
expect(rt.has("b")).toBe(false);
});
it("stderrTail captures stderr output", async () => {
const rt = track(createWorkerRuntime(baseConfig(stderrWorkerPath)));
await rt.start("s");
await vi.waitFor(() => {
expect(rt.stderrTail("s")).toContain("stderr-marker");
});
await rt.shutdown();
});
});
function isEchoOf(msg: unknown, payload: unknown): boolean {
return (
typeof msg === "object" &&
msg !== null &&
(msg as Record<string, unknown>).type === "echo" &&
JSON.stringify((msg as Record<string, unknown>).payload) === JSON.stringify(payload)
);
}
+73 -122
View File
@@ -1,19 +1,13 @@
/**
* Sense worker pool — forked child processes per sense group (IPC lifecycle).
* Sense worker pool — thin wrapper around WorkerRuntime (RFC-006): one fork per sense group.
*/
import { fork } from "node:child_process";
import type { ChildProcess } from "node:child_process";
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import type { ComputeMessage, ShutdownMessage } from "./ipc.js";
import { parseWorkerMessage } from "./ipc.js";
import {
formatCapturedStderrTail,
formatChildExitSummary,
teeCapturedStderr,
} from "./worker-fork-support.js";
import type { ComputeMessage } from "./ipc.js";
import { formatCapturedStderrTail, formatChildExitSummary } from "./worker-fork-support.js";
import { createWorkerRuntime } from "./worker-runtime.js";
export function resolveWorkerScript(): string {
const __filename = fileURLToPath(import.meta.url);
@@ -21,17 +15,12 @@ export function resolveWorkerScript(): string {
return join(__dir, "sense-worker.js");
}
type WorkerEntry = {
group: string;
process: ChildProcess;
};
export type SenseWorkerPoolOptions = {
nerveRoot: string;
workerScript: string;
/** Invoked for every IPC message from a worker (including ready / signal / error). */
onWorkerMessage: (raw: unknown) => void;
/** Sense names in a group — used when clearing scheduler state on crash or restart. */
/** Sense names in a group — reserved for scheduler-aligned cleanup (kernel passes current config). */
sensesForGroup: (group: string) => string[];
/**
* Called when a worker exits with non-zero code before scheduling a respawn
@@ -58,144 +47,106 @@ export type SenseWorkerPool = {
activeGroupCount: () => number;
};
function spawnWorker(
nerveRoot: string,
group: string,
workerScript: string,
stderrTail: { value: string },
): ChildProcess {
const child = fork(workerScript, ["--group", group, "--root", nerveRoot], {
stdio: ["ignore", "inherit", "pipe", "ipc"],
});
teeCapturedStderr(child, stderrTail);
child.on("error", (err) => {
if ((err as NodeJS.ErrnoException).code !== "EPIPE") {
console.error("[worker] error:", err.message);
}
});
return child;
}
function sendComputeToProcess(worker: ChildProcess, senseName: string): void {
if (worker.connected === false) return;
const msg: ComputeMessage = { type: "compute", sense: senseName };
try {
worker.send(msg);
} catch {
// IPC channel closed between connected check and send
}
}
function sendShutdownToProcess(worker: ChildProcess): void {
if (worker.connected === false) return;
const msg: ShutdownMessage = { type: "shutdown" };
try {
worker.send(msg);
} catch {
// IPC channel closed between connected check and send
}
}
function waitForExit(child: ChildProcess, timeoutMs: number): Promise<void> {
return new Promise((resolve) => {
const timer = setTimeout(() => {
child.kill("SIGKILL");
resolve();
}, timeoutMs);
child.once("exit", () => {
clearTimeout(timer);
resolve();
});
});
}
/** Matches legacy pool: long crash window, 1s respawn delay, practical unlimited respawns. */
const SENSE_WORKER_RESPAWN = {
enabled: true,
maxCrashes: 100_000,
windowMs: 86_400_000,
delayMs: 1000,
} as const;
export function createSenseWorkerPool(options: SenseWorkerPoolOptions): SenseWorkerPool {
const workers = new Map<string, WorkerEntry>();
function startWorker(group: string): Promise<void> {
const stderrTail = { value: "" };
const child = spawnWorker(options.nerveRoot, group, options.workerScript, stderrTail);
let workerReadyResolve: (() => void) | undefined;
const workerReady = new Promise<void>((resolve) => {
workerReadyResolve = resolve;
});
child.on("message", (raw: unknown) => {
const result = parseWorkerMessage(raw);
if (result.ok && result.value.type === "ready") {
workerReadyResolve?.();
}
const runtime = createWorkerRuntime<string>({
script: options.workerScript,
argsForKey: (group) => ["--group", group, "--root", options.nerveRoot],
forwardStderr: true,
onMessage: (_key, raw) => {
options.onWorkerMessage(raw);
});
child.on("exit", (code, signal) => {
const summary = formatChildExitSummary(code, signal ?? null);
},
onReady: (_key, msg) => {
options.onWorkerMessage(msg);
},
onExit: (group, code, signal) => {
const sig =
signal === null || signal === undefined || signal === ""
? null
: (signal as NodeJS.Signals);
const summary = formatChildExitSummary(code, sig);
process.stderr.write(
`[kernel] worker for group "${group}" exited (${summary})${formatCapturedStderrTail(stderrTail.value)}\n`,
`[kernel] worker for group "${group}" exited (${summary})${formatCapturedStderrTail(runtime.stderrTail(group))}\n`,
);
workerReadyResolve?.();
if (!options.isStopped() && code !== 0) {
process.stderr.write(`[kernel] respawning worker for group "${group}" in 1s\n`);
options.onWorkerCrashed(group);
setTimeout(() => {
if (!options.isStopped()) {
startWorker(group);
}
}, 1000);
}
});
},
respawn: {
...SENSE_WORKER_RESPAWN,
allowRespawn: (_group) => !options.isStopped(),
},
shutdownTimeoutMs: 5000,
});
workers.set(group, { group, process: child });
return workerReady;
/** Groups we have ever started — mirrors legacy Map presence for `restartGroup` no-op when unknown. */
const trackedGroups = new Set<string>();
/** Marks groups mid-evict so `hasWorkerForGroup` drops immediately (legacy synchronous eviction). */
const evicting = new Set<string>();
async function startWorker(group: string): Promise<void> {
trackedGroups.add(group);
await runtime.start(group);
}
async function restartGroup(group: string): Promise<void> {
const entry = workers.get(group);
if (entry === undefined) return;
options.onBeforeGroupRestart(group);
sendShutdownToProcess(entry.process);
await waitForExit(entry.process, 5000);
if (!options.isStopped()) {
await startWorker(group);
if (!trackedGroups.has(group)) {
return;
}
options.onBeforeGroupRestart(group);
await runtime.drain(group);
}
function evictGroup(group: string): void {
const entry = workers.get(group);
if (entry === undefined) return;
sendShutdownToProcess(entry.process);
workers.delete(group);
trackedGroups.delete(group);
evicting.add(group);
void runtime.evict(group).finally(() => {
evicting.delete(group);
});
}
async function shutdownAll(): Promise<void> {
const exitPromises: Promise<void>[] = [];
for (const entry of workers.values()) {
sendShutdownToProcess(entry.process);
exitPromises.push(waitForExit(entry.process, 5000));
}
await Promise.all(exitPromises);
await runtime.shutdown();
trackedGroups.clear();
evicting.clear();
}
function sendCompute(group: string, senseName: string): void {
const entry = workers.get(group);
if (entry === undefined) return;
sendComputeToProcess(entry.process, senseName);
if (!trackedGroups.has(group) || evicting.has(group)) {
return;
}
// Legacy pool: `child.send` no-op when IPC is closed (still allow cold start: child === null).
if (runtime.hasDisconnectedChild(group)) {
return;
}
const msg: ComputeMessage = { type: "compute", sense: senseName };
if (!runtime.trySendSync(group, msg)) {
void runtime.send(group, msg).catch(() => {
// IPC channel may close between scheduling and send — same as legacy try/catch on child.send
});
}
}
function getWorkerPid(group: string): number | null {
return workers.get(group)?.process.pid ?? null;
return runtime.pid(group);
}
/** True once `startWorker` has been called for the group and it is not mid-evict (matches legacy Map key). */
function hasWorkerForGroup(group: string): boolean {
return workers.has(group);
return trackedGroups.has(group) && !evicting.has(group);
}
/** Count of sense groups with a worker slot (includes not-yet-ready), excluding evicted keys. */
function activeGroupCount(): number {
return workers.size;
return trackedGroups.size;
}
return {
+404
View File
@@ -0,0 +1,404 @@
/**
* Generic message-routed worker process manager (RFC-006).
* One forked Node child per key; cold start, crash respawn, drain/evict, shutdown.
*/
import { type ChildProcess, type Serializable, fork } from "node:child_process";
import { isPlainRecord } from "@uncaged/nerve-core";
const STDERR_TAIL_MAX_CHARS = 2048;
export type WorkerRuntimeConfig<K extends string> = {
script: string;
argsForKey: (key: K) => string[];
/** When false, stderr is not captured into `stderrTail` (e.g. tests without a pipe). */
forwardStderr: boolean;
onMessage: (key: K, msg: unknown) => void;
onReady: (key: K, msg: unknown) => void;
onExit: (key: K, code: number | null, signal: string | null) => void;
respawn: {
enabled: boolean;
maxCrashes: number;
windowMs: number;
delayMs: number;
/** When non-null, return false to skip automatic respawn after an unexpected exit. */
allowRespawn: ((key: K) => boolean) | null;
};
shutdownTimeoutMs: number;
};
export type WorkerRuntime<K extends string> = {
send: (key: K, msg: unknown) => Promise<void>;
/** When the worker is already ready and IPC-connected, sends synchronously (returns true). Otherwise false — caller may fall back to `send`. */
trySendSync: (key: K, msg: unknown) => boolean;
start: (key: K) => Promise<void>;
evict: (key: K) => Promise<void>;
drain: (key: K) => Promise<void>;
shutdown: () => Promise<void>;
has: (key: K) => boolean;
/** True when a child exists but IPC is disconnected (legacy pool skipped sends in this case). */
hasDisconnectedChild: (key: K) => boolean;
pid: (key: K) => number | null;
keys: () => K[];
stderrTail: (key: K) => string;
};
type WorkerMachineState = "stopped" | "starting" | "ready" | "draining";
type ReadyWaiter = {
resolve: () => void;
reject: (err: Error) => void;
};
/** Internal: one forked process slot (ManagedWorker). */
type WorkerSlot<K extends string> = {
key: K;
state: WorkerMachineState;
child: ChildProcess | null;
pid: number | null;
stderrTail: string;
crashTimestamps: number[];
expectExit: boolean;
readyWaiters: ReadyWaiter[];
opChain: Promise<void>;
};
function isReadyIpcMessage(raw: unknown): boolean {
return isPlainRecord(raw) && raw.type === "ready";
}
function signalToString(signal: NodeJS.Signals | null): string | null {
if (signal === null) {
return null;
}
return String(signal);
}
function attachStderrTail<K extends string>(child: ChildProcess, slot: WorkerSlot<K>): void {
const stream = child.stderr;
if (stream == null) {
return;
}
stream.setEncoding("utf8");
stream.on("data", (chunk: string | Buffer) => {
const text = typeof chunk === "string" ? chunk : chunk.toString("utf8");
slot.stderrTail = (slot.stderrTail + text).slice(-STDERR_TAIL_MAX_CHARS);
});
}
function enqueueOp<K extends string>(slot: WorkerSlot<K>, fn: () => Promise<void>): Promise<void> {
const run = slot.opChain.then(fn, fn);
slot.opChain = run.then(
() => {},
() => {},
);
return run;
}
function resolveReadyWaiters<K extends string>(slot: WorkerSlot<K>): void {
const waiters = slot.readyWaiters;
slot.readyWaiters = [];
for (const w of waiters) {
w.resolve();
}
}
function rejectReadyWaiters<K extends string>(slot: WorkerSlot<K>, err: Error): void {
const waiters = slot.readyWaiters;
slot.readyWaiters = [];
for (const w of waiters) {
w.reject(err);
}
}
function waitForReady<K extends string>(
slot: WorkerSlot<K>,
shutdownTimeoutMs: number,
): Promise<void> {
if (slot.state === "ready" && slot.child !== null && slot.child.connected) {
return Promise.resolve();
}
return new Promise((resolve, reject) => {
let settled = false;
const timer = setTimeout(() => {
if (!settled) {
settled = true;
reject(new Error(`Worker "${String(slot.key)}" ready timeout`));
}
}, shutdownTimeoutMs);
slot.readyWaiters.push({
resolve: () => {
if (settled) {
return;
}
settled = true;
clearTimeout(timer);
resolve();
},
reject: (err: Error) => {
if (settled) {
return;
}
settled = true;
clearTimeout(timer);
reject(err);
},
});
});
}
async function waitForChildExit(child: ChildProcess, timeoutMs: number): Promise<void> {
await new Promise<void>((resolve) => {
const timer = setTimeout(() => {
child.kill("SIGKILL");
}, timeoutMs);
child.once("exit", () => {
clearTimeout(timer);
resolve();
});
});
}
export function createWorkerRuntime<K extends string>(
config: WorkerRuntimeConfig<K>,
): WorkerRuntime<K> {
const workers = new Map<K, WorkerSlot<K>>();
function getOrCreateSlot(key: K): WorkerSlot<K> {
let slot = workers.get(key);
if (slot === undefined) {
slot = {
key,
state: "stopped",
child: null,
pid: null,
stderrTail: "",
crashTimestamps: [],
expectExit: false,
readyWaiters: [],
opChain: Promise.resolve(),
};
workers.set(key, slot);
}
return slot;
}
function handleWorkerMessage(slot: WorkerSlot<K>, msg: unknown): void {
if (isReadyIpcMessage(msg)) {
if (slot.state === "starting") {
slot.state = "ready";
config.onReady(slot.key, msg);
resolveReadyWaiters(slot);
}
return;
}
config.onMessage(slot.key, msg);
}
function onChildExit(
slot: WorkerSlot<K>,
code: number | null,
signal: NodeJS.Signals | null,
): void {
config.onExit(slot.key, code, signalToString(signal));
if (slot.child !== null) {
slot.child.removeAllListeners("message");
slot.child.removeAllListeners("exit");
}
const wasExpect = slot.expectExit;
slot.expectExit = false;
slot.child = null;
slot.pid = null;
if (wasExpect) {
slot.state = "stopped";
return;
}
rejectReadyWaiters(slot, new Error(`Worker "${String(slot.key)}" exited unexpectedly`));
slot.state = "stopped";
void enqueueOp(slot, async () => {
await handleUnexpectedCrashRecovery(slot);
});
}
function registerChild(slot: WorkerSlot<K>, child: ChildProcess): void {
slot.child = child;
slot.pid = child.pid ?? null;
if (config.forwardStderr) {
attachStderrTail(child, slot);
}
child.on("message", (msg: unknown) => {
handleWorkerMessage(slot, msg);
});
child.on("exit", (code, sig) => {
onChildExit(slot, code, sig ?? null);
});
}
async function forkAndWaitReady(slot: WorkerSlot<K>): Promise<void> {
if (slot.state === "ready" && slot.child !== null && slot.child.connected) {
return;
}
slot.state = "starting";
let child: ChildProcess;
try {
child = fork(config.script, config.argsForKey(slot.key), {
stdio: ["ignore", "inherit", "pipe", "ipc"],
env: process.env,
});
} catch (e) {
slot.state = "stopped";
const err = e instanceof Error ? e : new Error(String(e));
rejectReadyWaiters(slot, err);
throw err;
}
registerChild(slot, child);
await waitForReady(slot, config.shutdownTimeoutMs);
}
async function gracefulStop(slot: WorkerSlot<K>): Promise<void> {
if (slot.child === null) {
return;
}
slot.expectExit = true;
slot.state = "draining";
const child = slot.child;
try {
child.send({ type: "shutdown" });
} catch {
// IPC channel may have closed between null-check and send
}
await waitForChildExit(child, config.shutdownTimeoutMs);
}
async function handleUnexpectedCrashRecovery(slot: WorkerSlot<K>): Promise<void> {
if (!config.respawn.enabled) {
return;
}
if (config.respawn.allowRespawn !== null && !config.respawn.allowRespawn(slot.key)) {
return;
}
const now = Date.now();
slot.crashTimestamps.push(now);
slot.crashTimestamps = slot.crashTimestamps.filter((t) => now - t <= config.respawn.windowMs);
if (slot.crashTimestamps.length >= config.respawn.maxCrashes) {
console.error(
`[WorkerRuntime] worker "${String(slot.key)}" exceeded crash limit (${String(config.respawn.maxCrashes)} in ${String(config.respawn.windowMs)}ms); not respawning`,
);
return;
}
await new Promise<void>((resolve) => setTimeout(resolve, config.respawn.delayMs));
await forkAndWaitReady(slot);
}
async function shutdownWorker(slot: WorkerSlot<K>): Promise<void> {
await gracefulStop(slot);
workers.delete(slot.key);
}
function isActive(slot: WorkerSlot<K>): boolean {
return slot.state === "ready" && slot.child !== null && slot.child.connected;
}
return {
send: async (key: K, msg: unknown) => {
const slot = getOrCreateSlot(key);
await enqueueOp(slot, async () => {
await forkAndWaitReady(slot);
const child = slot.child;
if (child === null || !child.connected) {
throw new Error(`Worker "${String(key)}" is not connected`);
}
child.send(msg as Serializable);
});
},
trySendSync: (key: K, msg: unknown): boolean => {
const slot = workers.get(key);
if (slot === undefined || !isActive(slot)) {
return false;
}
const child = slot.child;
if (child === null || !child.connected) {
return false;
}
try {
child.send(msg as Serializable);
return true;
} catch {
return false;
}
},
start: async (key: K) => {
const slot = getOrCreateSlot(key);
await enqueueOp(slot, async () => {
await forkAndWaitReady(slot);
});
},
evict: async (key: K) => {
const slot = getOrCreateSlot(key);
await enqueueOp(slot, async () => {
await gracefulStop(slot);
workers.delete(key);
});
},
drain: async (key: K) => {
const slot = getOrCreateSlot(key);
await enqueueOp(slot, async () => {
if (slot.child === null) {
await forkAndWaitReady(slot);
return;
}
await gracefulStop(slot);
await forkAndWaitReady(slot);
});
},
shutdown: async () => {
const snapshot = [...workers.values()];
await Promise.all(snapshot.map((slot) => enqueueOp(slot, () => shutdownWorker(slot))));
},
has: (key: K) => {
const slot = workers.get(key);
return slot !== undefined && isActive(slot);
},
hasDisconnectedChild: (key: K): boolean => {
const slot = workers.get(key);
if (slot === undefined || slot.child === null) {
return false;
}
return !slot.child.connected;
},
pid: (key: K) => {
const slot = workers.get(key);
if (slot === undefined || !isActive(slot) || slot.pid === null) {
return null;
}
return slot.pid;
},
keys: () => [...workers.values()].filter((slot) => isActive(slot)).map((slot) => slot.key),
stderrTail: (key: K) => {
const slot = workers.get(key);
return slot === undefined ? "" : slot.stderrTail;
},
};
}