Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 9802f68380 | |||
| f245224320 | |||
| 1b995379f9 | |||
| ea6bc5610b | |||
| 49d078b144 | |||
| 9ca8c8ecb8 | |||
| 00c1932144 | |||
| 097a4790be | |||
| ad2b40dd4f | |||
| 31d1eae44a |
@@ -0,0 +1,410 @@
|
||||
# RFC-002: Workflow Engine
|
||||
|
||||
> Status: Draft
|
||||
> Author: 小橘 🍊(NEKO Team)
|
||||
> Date: 2026-04-22
|
||||
|
||||
## 1. 动机
|
||||
|
||||
RFC-001 定义了 Sense → Signal → Reflex 的无状态观测循环。当 Reflex 的 action 为 `StartWorkflow` 时,需要一个有状态的执行引擎来驱动多步骤工作流。本 RFC 定义 Workflow Engine 的完整设计。
|
||||
|
||||
## 2. 概念模型
|
||||
|
||||
```
|
||||
Signal 循环 ──→ ThreadStart ──→ Thread 循环
|
||||
(无状态,幂等,可合并) (有状态,顺序,Command Event 驱动)
|
||||
│
|
||||
Thread 产出 Log
|
||||
(执行日志,供 retrospection)
|
||||
```
|
||||
|
||||
### 2.1 核心概念
|
||||
|
||||
- **Workflow**:一个有状态工作流的定义,包含并发策略和溢出策略
|
||||
- **Thread**:Workflow 的一次执行实例,有唯一 `runId`
|
||||
- **Role**:执行具体动作的角色,有副作用(调 API、改文件等)
|
||||
- **Moderator**:在 Role 之间递话筒的调度逻辑(纯函数)
|
||||
- **CommandEvent**:Thread 内部的事件流,驱动 Role 之间的流转
|
||||
|
||||
Role 和 Moderator 是 Workflow 的内部细节,不跨 Workflow 共享,不作为顶层扩展点。
|
||||
|
||||
### 2.2 与 Sense 循环的边界
|
||||
|
||||
| | Signal 循环 | Thread 循环 |
|
||||
|---|---|---|
|
||||
| 状态 | 无状态 | 有状态(事件溯源) |
|
||||
| 幂等性 | 幂等、可合并、可丢弃 | 严格顺序、每个 event 必须响应 |
|
||||
| 触发 | Reflex | Moderator 递话筒 |
|
||||
| 产出 | Signal → Bus | Log → logs.db |
|
||||
|
||||
**关键约束**:Thread 产出的 Log 不能触发 Reflex(见 RFC-001 §2.4),只能被 Sense 的 compute 查询用于 retrospection。
|
||||
|
||||
## 3. 配置
|
||||
|
||||
```yaml
|
||||
# nerve.yaml
|
||||
workflows:
|
||||
cleanup:
|
||||
concurrency: 1 # 同时最多 1 个 thread
|
||||
overflow: drop # 已有活跃 thread 时丢弃新请求
|
||||
execute-task:
|
||||
concurrency: 10 # 可并行 10 个 thread
|
||||
overflow: queue # 超出时排队
|
||||
code-review:
|
||||
concurrency: 3
|
||||
overflow: queue
|
||||
maxQueue: 20 # 队列上限,超出丢弃最旧请求
|
||||
```
|
||||
|
||||
- **concurrency**:同时允许的最大活跃 Thread 数
|
||||
- **overflow**:
|
||||
- `drop`:丢弃,适用于幂等操作(如 cleanup)
|
||||
- `queue`:排队等待,适用于每次需要执行的操作
|
||||
- **maxQueue**(仅 `overflow: queue`):队列上限,默认 100,超出丢弃最旧请求
|
||||
|
||||
不需要 throttle——触发频率由上游 Sense 的 throttle 控制。
|
||||
|
||||
## 4. 用户代码结构
|
||||
|
||||
```
|
||||
~/.uncaged-nerve/
|
||||
workflows/
|
||||
cleanup/
|
||||
index.ts # workflow 定义
|
||||
code-review/
|
||||
index.ts
|
||||
```
|
||||
|
||||
### 4.1 Workflow 定义(用户代码)
|
||||
|
||||
```typescript
|
||||
// workflows/cleanup/index.ts
|
||||
import type { WorkflowDefinition } from "@uncaged/nerve-daemon";
|
||||
|
||||
const workflow: WorkflowDefinition = {
|
||||
roles: {
|
||||
scanner: {
|
||||
execute: async (prompt, ctx) => {
|
||||
// 扫描需要清理的资源
|
||||
const stale = await findStaleResources();
|
||||
return { type: "scan_complete", resources: stale };
|
||||
},
|
||||
},
|
||||
cleaner: {
|
||||
execute: async (prompt, ctx) => {
|
||||
// 执行清理
|
||||
await deleteResources(prompt.resources);
|
||||
return { type: "cleanup_done", count: prompt.resources.length };
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
moderate(thread, event) {
|
||||
// 纯函数:决定下一步交给谁
|
||||
if (event.type === "thread_start") {
|
||||
return { role: "scanner", prompt: {} };
|
||||
}
|
||||
if (event.type === "scan_complete") {
|
||||
if (event.resources.length === 0) return null; // 结束
|
||||
return { role: "cleaner", prompt: { resources: event.resources } };
|
||||
}
|
||||
return null; // 结束
|
||||
},
|
||||
};
|
||||
|
||||
export default workflow;
|
||||
```
|
||||
|
||||
### 4.2 类型定义
|
||||
|
||||
```typescript
|
||||
// 用户需要实现的接口
|
||||
export type RoleExecuteFn = (
|
||||
prompt: unknown,
|
||||
ctx: WorkflowContext,
|
||||
) => Promise<CommandEvent>;
|
||||
|
||||
export type Role = {
|
||||
execute: RoleExecuteFn;
|
||||
};
|
||||
|
||||
export type ModerateFn = (
|
||||
thread: ThreadState,
|
||||
event: CommandEvent,
|
||||
) => ModerateResult | null; // null = thread 完成
|
||||
|
||||
export type ModerateResult = {
|
||||
role: string;
|
||||
prompt: unknown;
|
||||
};
|
||||
|
||||
export type WorkflowDefinition = {
|
||||
roles: Record<string, Role>;
|
||||
moderate: ModerateFn;
|
||||
};
|
||||
|
||||
export type WorkflowContext = {
|
||||
runId: string;
|
||||
workflowName: string;
|
||||
log: (message: string) => void;
|
||||
};
|
||||
|
||||
export type CommandEvent = {
|
||||
type: string;
|
||||
[key: string]: unknown;
|
||||
};
|
||||
|
||||
export type ThreadState = {
|
||||
runId: string;
|
||||
events: CommandEvent[]; // 历史事件,供 moderate 做决策
|
||||
};
|
||||
```
|
||||
|
||||
## 5. 运行时架构
|
||||
|
||||
### 5.1 进程模型
|
||||
|
||||
同一个 workflow 的所有 thread 共享一个 worker 进程。`concurrency` 控制进程内的并发 async task 数。
|
||||
|
||||
```
|
||||
nerve-engine (kernel)
|
||||
├─ nerve-worker sense --group system (永驻)
|
||||
├─ nerve-worker sense --group tasks (永驻)
|
||||
├─ nerve-worker workflow --name cleanup (按需启动)
|
||||
└─ nerve-worker workflow --name code-review (按需启动)
|
||||
```
|
||||
|
||||
- 进程数 = workflow 种类数,不会膨胀
|
||||
- 有活跃 thread 时启动,所有 thread 完成后退出(或保持待命 `idleTimeout`,默认 30s)
|
||||
- 崩溃后 engine respawn worker,从持久化状态恢复 thread
|
||||
|
||||
### 5.2 IPC 扩展
|
||||
|
||||
在现有 IPC 协议基础上扩展 workflow 相关消息:
|
||||
|
||||
```typescript
|
||||
// Parent → Workflow Worker
|
||||
type StartThreadMessage = {
|
||||
type: "start-thread";
|
||||
runId: string;
|
||||
workflow: string;
|
||||
triggerPayload: unknown; // Reflex 传入的 signal payload
|
||||
};
|
||||
|
||||
type ResumeThreadMessage = {
|
||||
type: "resume-thread";
|
||||
runId: string;
|
||||
};
|
||||
|
||||
type ShutdownMessage = {
|
||||
type: "shutdown";
|
||||
};
|
||||
|
||||
// Workflow Worker → Parent
|
||||
type ThreadEventMessage = {
|
||||
type: "thread-event";
|
||||
runId: string;
|
||||
eventType: string; // queued, started, step_complete, completed, failed
|
||||
payload: unknown;
|
||||
};
|
||||
|
||||
type WorkflowReadyMessage = {
|
||||
type: "ready";
|
||||
};
|
||||
|
||||
type WorkflowErrorMessage = {
|
||||
type: "error";
|
||||
runId: string;
|
||||
error: string;
|
||||
};
|
||||
```
|
||||
|
||||
### 5.3 Thread 生命周期
|
||||
|
||||
```
|
||||
┌──────────────────────────────────────────┐
|
||||
│ │
|
||||
trigger ──→ queued ──→ started ──→ step_complete ──→ completed
|
||||
│ ↑ │
|
||||
│ └─────┘
|
||||
│
|
||||
└──→ failed
|
||||
└──→ crashed (worker 崩溃)
|
||||
```
|
||||
|
||||
1. Reflex `StartWorkflow` 触发 → kernel 检查 concurrency
|
||||
2. 有空位 → 状态 `started`,发 `start-thread` 给 worker
|
||||
3. 超出 concurrency:
|
||||
- `overflow: drop` → 丢弃,写 `dropped` log
|
||||
- `overflow: queue` → 状态 `queued`,等空位
|
||||
4. Worker 内部循环:`moderate → execute → moderate → execute → ...`
|
||||
5. `moderate` 返回 `null` → `completed`
|
||||
6. Role `execute` 抛异常 → `failed`
|
||||
7. Worker 进程崩溃 → 所有活跃 thread 标记 `crashed`
|
||||
|
||||
### 5.4 Workflow Worker 内部实现
|
||||
|
||||
```typescript
|
||||
// workflow-worker.ts(engine 代码,不是用户代码)
|
||||
async function runThread(
|
||||
def: WorkflowDefinition,
|
||||
runId: string,
|
||||
triggerPayload: unknown,
|
||||
sendToParent: (msg: WorkerToParentMessage) => void,
|
||||
): Promise<void> {
|
||||
const state: ThreadState = { runId, events: [] };
|
||||
const ctx: WorkflowContext = {
|
||||
runId,
|
||||
workflowName,
|
||||
log: (msg) => sendToParent({
|
||||
type: "thread-event", runId, eventType: "step_complete", payload: { message: msg },
|
||||
}),
|
||||
};
|
||||
|
||||
// 初始事件
|
||||
let event: CommandEvent = { type: "thread_start", ...triggerPayload };
|
||||
|
||||
while (true) {
|
||||
state.events.push(event);
|
||||
const next = def.moderate(state, event);
|
||||
if (next === null) {
|
||||
sendToParent({ type: "thread-event", runId, eventType: "completed", payload: null });
|
||||
return;
|
||||
}
|
||||
const role = def.roles[next.role];
|
||||
if (!role) {
|
||||
sendToParent({ type: "error", runId, error: `Unknown role: ${next.role}` });
|
||||
return;
|
||||
}
|
||||
event = await role.execute(next.prompt, ctx);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## 6. 持久化
|
||||
|
||||
### 6.1 事件溯源
|
||||
|
||||
Thread 状态以 append-only 事件流为 source of truth,写入统一的 `logs.db`:
|
||||
|
||||
```sql
|
||||
-- logs 表(已存在于 RFC-001),source = "workflow"
|
||||
-- type 取值:queued, started, step_complete, completed, failed, crashed, dropped
|
||||
```
|
||||
|
||||
### 6.2 物化表
|
||||
|
||||
为避免每次查活跃 workflow 都扫描全表,维护一张物化表,在写 log 的同一事务中 UPSERT:
|
||||
|
||||
```sql
|
||||
CREATE TABLE workflow_runs (
|
||||
run_id TEXT PRIMARY KEY,
|
||||
workflow TEXT NOT NULL,
|
||||
status TEXT NOT NULL, -- queued, started, completed, failed, crashed
|
||||
ts INTEGER NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX idx_workflow_runs_status ON workflow_runs(status);
|
||||
CREATE INDEX idx_workflow_runs_workflow ON workflow_runs(workflow);
|
||||
```
|
||||
|
||||
写入流程(同一事务):
|
||||
|
||||
```sql
|
||||
BEGIN;
|
||||
INSERT INTO logs (source, type, ref_id, payload, ts)
|
||||
VALUES ('workflow', 'started', 'run-7', '{}', 1001);
|
||||
INSERT OR REPLACE INTO workflow_runs (run_id, workflow, status, ts)
|
||||
VALUES ('run-7', 'cleanup', 'started', 1001);
|
||||
COMMIT;
|
||||
```
|
||||
|
||||
物化表是 logs 的派生数据——数据丢失时可从 logs 重建。
|
||||
|
||||
### 6.3 崩溃恢复
|
||||
|
||||
Worker 重启时:
|
||||
|
||||
1. 查 `workflow_runs WHERE status IN ('queued', 'started')`
|
||||
2. `queued` → 重新排队
|
||||
3. `started` → 从 logs 重建 `ThreadState`(事件列表),resume
|
||||
|
||||
幂等 workflow(`overflow: drop`)直接标记 `crashed` 重新来。
|
||||
|
||||
## 7. Kernel 扩展
|
||||
|
||||
### 7.1 WorkflowManager
|
||||
|
||||
Kernel 新增 `WorkflowManager` 组件:
|
||||
|
||||
```typescript
|
||||
export type WorkflowManager = {
|
||||
/** 触发一个 workflow(来自 Reflex) */
|
||||
startWorkflow: (workflowName: string, payload: unknown) => void;
|
||||
/** 获取活跃 thread 数 */
|
||||
activeCount: (workflowName: string) => number;
|
||||
/** 获取队列长度 */
|
||||
queueLength: (workflowName: string) => number;
|
||||
/** 停止所有 workflow */
|
||||
stop: () => Promise<void>;
|
||||
};
|
||||
```
|
||||
|
||||
### 7.2 Reflex Scheduler 扩展
|
||||
|
||||
当前 `ReflexScheduler` 只处理 `kind: "sense"` 的 reflex。扩展为同时处理 `kind: "workflow"`:
|
||||
|
||||
```typescript
|
||||
// reflex-scheduler.ts 扩展
|
||||
if (reflex.kind === "workflow") {
|
||||
const workflowName = reflex.workflow;
|
||||
if (reflex.on !== null && reflex.on.length > 0) {
|
||||
const watchedSenses = new Set(reflex.on);
|
||||
const unsub = bus.subscribe((signal) => {
|
||||
if (watchedSenses.has(signal.senseId)) {
|
||||
workflowManager.startWorkflow(workflowName, signal.payload);
|
||||
}
|
||||
});
|
||||
unsubscribers.push(unsub);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 7.3 热更新
|
||||
|
||||
| 变化 | 处理 |
|
||||
|------|------|
|
||||
| workflow ts 文件修改 | drain(等活跃 thread 完成,`drainTimeout` 后 force kill + 标记 crashed)→ respawn |
|
||||
| nerve.yaml workflow 配置修改 | 更新 concurrency/overflow,不重启 worker |
|
||||
| 新增 workflow | 下次触发时按需启动 worker |
|
||||
| 删除 workflow | drain + 移除 |
|
||||
|
||||
## 8. 实现计划
|
||||
|
||||
### Phase 1:核心类型与 WorkflowManager 骨架
|
||||
|
||||
- [ ] `packages/core/src/types.ts` — 扩展 workflow 相关类型(`WorkflowDefinition`、`ThreadState`、`CommandEvent`、`ModerateResult` 等)
|
||||
- [ ] `packages/daemon/src/ipc.ts` — 扩展 IPC 消息类型(`StartThreadMessage`、`ThreadEventMessage` 等)
|
||||
- [ ] `packages/daemon/src/workflow-manager.ts` — WorkflowManager 实现(并发控制、队列管理、thread 生命周期)
|
||||
- [ ] `packages/daemon/src/workflow-worker.ts` — Workflow worker 进程(加载用户代码、运行 moderate/execute 循环)
|
||||
- [ ] `packages/daemon/src/log-store.ts` — 扩展 LogStore,添加 `workflow_runs` 物化表 + 查询方法
|
||||
- [ ] 单元测试覆盖:并发控制、队列溢出、thread 生命周期
|
||||
|
||||
### Phase 2:Kernel 集成
|
||||
|
||||
- [ ] `packages/daemon/src/kernel.ts` — 集成 WorkflowManager,处理 workflow worker 的生命周期
|
||||
- [ ] `packages/daemon/src/reflex-scheduler.ts` — 扩展支持 `kind: "workflow"` 的 reflex
|
||||
- [ ] 集成测试:Sense signal → Reflex → Workflow 全链路
|
||||
|
||||
### Phase 3:崩溃恢复与热更新
|
||||
|
||||
- [ ] Worker crash recovery — 从持久化状态恢复 thread
|
||||
- [ ] 热更新 — workflow 代码变更时 drain + respawn
|
||||
- [ ] nerve.yaml workflow 配置变更的增量更新
|
||||
|
||||
### Phase 4:CLI 与用户体验
|
||||
|
||||
- [ ] `nerve workflow list` — 查看活跃 workflow
|
||||
- [ ] `nerve workflow inspect <runId>` — 查看 thread 详情
|
||||
- [ ] `nerve workflow trigger <name>` — 手动触发
|
||||
- [ ] scaffold 模板 — `nerve init workflow <name>`
|
||||
@@ -0,0 +1,70 @@
|
||||
/**
|
||||
* nerve-health — built-in sense that reports daemon health via IPC.
|
||||
*
|
||||
* When running inside a sense worker, this compute function sends a
|
||||
* "health-request" to the parent kernel process and returns the
|
||||
* health-response payload as its signal.
|
||||
*
|
||||
* Usage in nerve.yaml:
|
||||
* senses:
|
||||
* nerve-health:
|
||||
* group: internal
|
||||
* throttle: 30s
|
||||
* timeout: 5s
|
||||
*
|
||||
* reflexes:
|
||||
* - sense: nerve-health
|
||||
* interval: 30s
|
||||
*/
|
||||
|
||||
export type NerveHealth = {
|
||||
uptime: number;
|
||||
activeSenses: string[];
|
||||
inFlightCount: number;
|
||||
workerMemoryUsage: NodeJS.MemoryUsage;
|
||||
workerUptime: number;
|
||||
};
|
||||
|
||||
export async function compute(): Promise<NerveHealth | null> {
|
||||
const health = await requestHealthFromKernel();
|
||||
return health;
|
||||
}
|
||||
|
||||
function requestHealthFromKernel(): Promise<NerveHealth> {
|
||||
return new Promise((resolve, reject) => {
|
||||
if (!process.send) {
|
||||
reject(new Error("nerve-health: not running as a child process with IPC"));
|
||||
return;
|
||||
}
|
||||
|
||||
const timeout = setTimeout(() => {
|
||||
process.removeListener("message", onMessage);
|
||||
reject(new Error("nerve-health: health-request timed out"));
|
||||
}, 5000);
|
||||
|
||||
function onMessage(msg: unknown): void {
|
||||
if (
|
||||
msg !== null &&
|
||||
typeof msg === "object" &&
|
||||
(msg as Record<string, unknown>).type === "health-response"
|
||||
) {
|
||||
clearTimeout(timeout);
|
||||
process.removeListener("message", onMessage);
|
||||
const resp = msg as {
|
||||
senses: string[];
|
||||
inFlightCount: number;
|
||||
};
|
||||
resolve({
|
||||
uptime: process.uptime(),
|
||||
activeSenses: resp.senses,
|
||||
inFlightCount: resp.inFlightCount,
|
||||
workerMemoryUsage: process.memoryUsage(),
|
||||
workerUptime: process.uptime(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
process.on("message", onMessage);
|
||||
process.send({ type: "health-request" });
|
||||
});
|
||||
}
|
||||
@@ -13,6 +13,10 @@
|
||||
},
|
||||
"dependencies": {
|
||||
"@uncaged/nerve-core": "workspace:*",
|
||||
"@uncaged/nerve-daemon": "workspace:*"
|
||||
"@uncaged/nerve-daemon": "workspace:*",
|
||||
"citty": "^0.1.6"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/node": "^22.0.0"
|
||||
}
|
||||
}
|
||||
|
||||
+20
-99
@@ -1,104 +1,25 @@
|
||||
#!/usr/bin/env node
|
||||
|
||||
import { readFileSync } from "node:fs";
|
||||
import { homedir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import { defineCommand, runMain } from "citty";
|
||||
|
||||
import { parseNerveConfig } from "@uncaged/nerve-core";
|
||||
import { createKernel } from "@uncaged/nerve-daemon";
|
||||
import { initCommand } from "./commands/init.js";
|
||||
import { startCommand } from "./commands/start.js";
|
||||
import { statusCommand } from "./commands/status.js";
|
||||
import { stopCommand } from "./commands/stop.js";
|
||||
import { validateCommand } from "./commands/validate.js";
|
||||
|
||||
const DEFAULT_NERVE_ROOT = join(homedir(), ".uncaged-nerve");
|
||||
|
||||
function parseArgs(argv: string[]): { root: string } {
|
||||
// Skip argv[0] (node), argv[1] (script), argv[2] ('start' subcommand)
|
||||
const args = argv.slice(3);
|
||||
let root = DEFAULT_NERVE_ROOT;
|
||||
|
||||
for (let i = 0; i < args.length; i++) {
|
||||
if (args[i] === "--root" && i + 1 < args.length) {
|
||||
root = args[i + 1];
|
||||
i++;
|
||||
} else if (!args[i].startsWith("-")) {
|
||||
process.stderr.write("Usage: nerve start [--root <path>]\n");
|
||||
process.exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
return { root };
|
||||
}
|
||||
|
||||
function readConfig(nerveRoot: string): ReturnType<typeof parseNerveConfig> {
|
||||
const configPath = join(nerveRoot, "nerve.yaml");
|
||||
let raw: string;
|
||||
try {
|
||||
raw = readFileSync(configPath, "utf8");
|
||||
} catch (e) {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
return { ok: false, error: new Error(`Cannot read ${configPath}: ${msg}`) };
|
||||
}
|
||||
return parseNerveConfig(raw);
|
||||
}
|
||||
|
||||
async function main(): Promise<void> {
|
||||
const subcommand = process.argv[2];
|
||||
|
||||
if (subcommand !== "start") {
|
||||
process.stderr.write("Usage: nerve start [--root <path>]\n");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const { root } = parseArgs(process.argv);
|
||||
|
||||
const configResult = readConfig(root);
|
||||
if (!configResult.ok) {
|
||||
process.stderr.write(`[nerve] Config error: ${configResult.error.message}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const config = configResult.value;
|
||||
|
||||
const kernel = createKernel(config, root);
|
||||
|
||||
process.stderr.write(
|
||||
`[nerve] Starting — ${kernel.groups.size} group(s), ${kernel.senseCount} sense(s)\n`,
|
||||
);
|
||||
|
||||
for (const group of kernel.groups) {
|
||||
const groupSenses = Object.entries(config.senses)
|
||||
.filter(([, sc]) => sc.group === group)
|
||||
.map(([name]) => name);
|
||||
process.stderr.write(`[nerve] group "${group}": ${groupSenses.join(", ")}\n`);
|
||||
}
|
||||
|
||||
let shuttingDown = false;
|
||||
|
||||
async function shutdown(): Promise<void> {
|
||||
if (shuttingDown) return;
|
||||
shuttingDown = true;
|
||||
process.stderr.write("\n[nerve] Shutting down…\n");
|
||||
await kernel.stop();
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
process.on("SIGINT", () => {
|
||||
shutdown().catch((e: unknown) => {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`[nerve] Shutdown error: ${msg}\n`);
|
||||
process.exit(1);
|
||||
});
|
||||
});
|
||||
|
||||
process.on("SIGTERM", () => {
|
||||
shutdown().catch((e: unknown) => {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`[nerve] Shutdown error: ${msg}\n`);
|
||||
process.exit(1);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
main().catch((e: unknown) => {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`[nerve] Fatal error: ${msg}\n`);
|
||||
process.exit(1);
|
||||
const main = defineCommand({
|
||||
meta: {
|
||||
name: "nerve",
|
||||
description: "Nerve — an AI agent kernel",
|
||||
},
|
||||
subCommands: {
|
||||
init: initCommand,
|
||||
start: startCommand,
|
||||
stop: stopCommand,
|
||||
status: statusCommand,
|
||||
validate: validateCommand,
|
||||
},
|
||||
});
|
||||
|
||||
runMain(main);
|
||||
|
||||
@@ -0,0 +1,170 @@
|
||||
import { existsSync, mkdirSync, writeFileSync } from "node:fs";
|
||||
import { dirname, join } from "node:path";
|
||||
|
||||
import { defineCommand } from "citty";
|
||||
|
||||
import { getNerveRoot } from "../workspace.js";
|
||||
|
||||
const NERVE_YAML = `# nerve.yaml — Nerve workspace configuration
|
||||
senses:
|
||||
cpu-usage:
|
||||
group: system
|
||||
throttle: 5s
|
||||
timeout: 10s
|
||||
grace_period: null
|
||||
|
||||
reflexes:
|
||||
- kind: sense
|
||||
sense: cpu-usage
|
||||
interval: 10s
|
||||
`;
|
||||
|
||||
const PACKAGE_JSON = `{
|
||||
"name": "my-nerve-workspace",
|
||||
"version": "0.0.1",
|
||||
"private": true,
|
||||
"type": "module",
|
||||
"dependencies": {
|
||||
"@uncaged/nerve-core": "latest",
|
||||
"drizzle-orm": "latest"
|
||||
},
|
||||
"devDependencies": {
|
||||
"drizzle-kit": "latest"
|
||||
}
|
||||
}
|
||||
`;
|
||||
|
||||
const GITIGNORE = `data/
|
||||
node_modules/
|
||||
`;
|
||||
|
||||
const CPU_SCHEMA_TS = `import { integer, real, sqliteTable, text } from "drizzle-orm/sqlite-core";
|
||||
|
||||
export const cpuUsage = sqliteTable("cpu_usage", {
|
||||
id: integer("id").primaryKey({ autoIncrement: true }),
|
||||
ts: integer("ts").notNull(),
|
||||
model: text("model").notNull(),
|
||||
loadPercent: real("load_percent").notNull(),
|
||||
});
|
||||
`;
|
||||
|
||||
const CPU_INDEX_TS = `import { cpus } from "node:os";
|
||||
|
||||
export async function compute(): Promise<unknown> {
|
||||
const cpuList = cpus();
|
||||
|
||||
let totalIdle = 0;
|
||||
let totalTick = 0;
|
||||
for (const cpu of cpuList) {
|
||||
for (const [, time] of Object.entries(cpu.times)) {
|
||||
totalTick += time;
|
||||
}
|
||||
totalIdle += cpu.times.idle;
|
||||
}
|
||||
|
||||
const loadPercent = totalTick === 0 ? 0 : ((totalTick - totalIdle) / totalTick) * 100;
|
||||
|
||||
return {
|
||||
model: cpuList[0]?.model ?? "unknown",
|
||||
loadPercent: Math.round(loadPercent * 100) / 100,
|
||||
ts: Date.now(),
|
||||
};
|
||||
}
|
||||
`;
|
||||
|
||||
const CPU_MIGRATION_SQL = `CREATE TABLE IF NOT EXISTS cpu_usage (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
ts INTEGER NOT NULL,
|
||||
model TEXT NOT NULL,
|
||||
load_percent REAL NOT NULL
|
||||
);
|
||||
`;
|
||||
|
||||
function writeFile(filePath: string, content: string): void {
|
||||
mkdirSync(dirname(filePath), { recursive: true });
|
||||
writeFileSync(filePath, content, "utf8");
|
||||
}
|
||||
|
||||
async function runCommand(cmd: string, args: string[], cwd: string): Promise<void> {
|
||||
const { spawn } = await import("node:child_process");
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const child = spawn(cmd, args, { cwd, stdio: "inherit" });
|
||||
child.on("close", (code) => {
|
||||
if (code === 0) resolve();
|
||||
else reject(new Error(`${cmd} exited with code ${code}`));
|
||||
});
|
||||
child.on("error", reject);
|
||||
});
|
||||
}
|
||||
|
||||
async function detectPackageManager(): Promise<{ cmd: string; args: string[] }> {
|
||||
const { execFile } = await import("node:child_process");
|
||||
const { promisify } = await import("node:util");
|
||||
const execFileAsync = promisify(execFile);
|
||||
|
||||
for (const pm of ["pnpm", "yarn", "npm"]) {
|
||||
try {
|
||||
await execFileAsync(pm, ["--version"]);
|
||||
const args = pm === "pnpm" ? ["install", "--no-cache"] : ["install"];
|
||||
return { cmd: pm, args };
|
||||
} catch {
|
||||
// not available, try next
|
||||
}
|
||||
}
|
||||
return { cmd: "npm", args: ["install"] };
|
||||
}
|
||||
|
||||
export const initCommand = defineCommand({
|
||||
meta: {
|
||||
name: "init",
|
||||
description: "Initialize the ~/.uncaged-nerve/ workspace",
|
||||
},
|
||||
args: {
|
||||
force: {
|
||||
type: "boolean",
|
||||
description: "Reinitialize even if workspace already exists (preserves data/)",
|
||||
default: false,
|
||||
},
|
||||
},
|
||||
async run({ args }) {
|
||||
const nerveRoot = getNerveRoot();
|
||||
|
||||
if (existsSync(nerveRoot) && !args.force) {
|
||||
process.stderr.write("⚠️ ~/.uncaged-nerve/ already exists. Use --force to reinitialize.\n");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
mkdirSync(join(nerveRoot, "data"), { recursive: true });
|
||||
mkdirSync(join(nerveRoot, "senses", "cpu-usage", "migrations"), { recursive: true });
|
||||
|
||||
writeFile(join(nerveRoot, "nerve.yaml"), NERVE_YAML);
|
||||
writeFile(join(nerveRoot, "package.json"), PACKAGE_JSON);
|
||||
writeFile(join(nerveRoot, ".gitignore"), GITIGNORE);
|
||||
writeFile(join(nerveRoot, "senses", "cpu-usage", "schema.ts"), CPU_SCHEMA_TS);
|
||||
writeFile(join(nerveRoot, "senses", "cpu-usage", "index.ts"), CPU_INDEX_TS);
|
||||
writeFile(
|
||||
join(nerveRoot, "senses", "cpu-usage", "migrations", "0001_init.sql"),
|
||||
CPU_MIGRATION_SQL,
|
||||
);
|
||||
|
||||
process.stdout.write("Installing dependencies…\n");
|
||||
try {
|
||||
const { cmd, args } = await detectPackageManager();
|
||||
await runCommand(cmd, args, nerveRoot);
|
||||
} catch {
|
||||
process.stdout.write("⚠️ Install failed — you may need to install dependencies manually.\n");
|
||||
}
|
||||
|
||||
if (!existsSync(join(nerveRoot, ".git"))) {
|
||||
try {
|
||||
await runCommand("git", ["init"], nerveRoot);
|
||||
} catch {
|
||||
process.stdout.write("⚠️ git init failed — skipping.\n");
|
||||
}
|
||||
}
|
||||
|
||||
process.stdout.write(
|
||||
"✅ Workspace created at ~/.uncaged-nerve/\n 1 example sense: cpu-usage\n Run `nerve start` to launch the daemon.\n",
|
||||
);
|
||||
},
|
||||
});
|
||||
@@ -0,0 +1,145 @@
|
||||
import { createWriteStream } from "node:fs";
|
||||
import { readFileSync } from "node:fs";
|
||||
import { mkdir } from "node:fs/promises";
|
||||
import { join } from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
|
||||
import { parseNerveConfig } from "@uncaged/nerve-core";
|
||||
import { createKernel } from "@uncaged/nerve-daemon";
|
||||
import { defineCommand } from "citty";
|
||||
|
||||
import { getLogPath, getNerveRoot, isRunning, readPidFile, writePidFile } from "../workspace.js";
|
||||
|
||||
function readConfig(nerveRoot: string): ReturnType<typeof parseNerveConfig> {
|
||||
const configPath = join(nerveRoot, "nerve.yaml");
|
||||
let raw: string;
|
||||
try {
|
||||
raw = readFileSync(configPath, "utf8");
|
||||
} catch (e) {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
return { ok: false, error: new Error(`❌ Cannot read ${configPath}: ${msg}`) };
|
||||
}
|
||||
return parseNerveConfig(raw);
|
||||
}
|
||||
|
||||
async function runForeground(nerveRoot: string): Promise<void> {
|
||||
const configResult = readConfig(nerveRoot);
|
||||
if (!configResult.ok) {
|
||||
process.stderr.write(`${configResult.error.message}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const config = configResult.value;
|
||||
const kernel = createKernel(config, nerveRoot);
|
||||
|
||||
const senseNames = Object.keys(config.senses);
|
||||
const groups = [...kernel.groups];
|
||||
|
||||
process.stdout.write(
|
||||
`✅ Nerve starting — ${senseNames.length} sense(s), ${groups.length} group(s)\n`,
|
||||
);
|
||||
for (const group of groups) {
|
||||
const groupSenses = Object.entries(config.senses)
|
||||
.filter(([, sc]) => sc.group === group)
|
||||
.map(([name]) => name);
|
||||
process.stdout.write(` group "${group}": ${groupSenses.join(", ")}\n`);
|
||||
}
|
||||
process.stdout.write(" Press Ctrl+C to stop.\n");
|
||||
|
||||
let shuttingDown = false;
|
||||
|
||||
async function shutdown(): Promise<void> {
|
||||
if (shuttingDown) return;
|
||||
shuttingDown = true;
|
||||
process.stdout.write("\n[nerve] Shutting down…\n");
|
||||
await kernel.stop();
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
process.on("SIGINT", () => {
|
||||
shutdown().catch((e: unknown) => {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`[nerve] Shutdown error: ${msg}\n`);
|
||||
process.exit(1);
|
||||
});
|
||||
});
|
||||
|
||||
process.on("SIGTERM", () => {
|
||||
shutdown().catch((e: unknown) => {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`[nerve] Shutdown error: ${msg}\n`);
|
||||
process.exit(1);
|
||||
});
|
||||
});
|
||||
|
||||
await kernel.ready;
|
||||
}
|
||||
|
||||
async function runDaemon(nerveRoot: string): Promise<void> {
|
||||
if (isRunning()) {
|
||||
const pid = readPidFile();
|
||||
process.stderr.write(`⚠️ Nerve daemon is already running (pid ${pid}).\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const configResult = readConfig(nerveRoot);
|
||||
if (!configResult.ok) {
|
||||
process.stderr.write(`${configResult.error.message}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const logPath = getLogPath();
|
||||
await mkdir(join(nerveRoot, "logs"), { recursive: true });
|
||||
|
||||
const { spawn } = await import("node:child_process");
|
||||
const logStream = createWriteStream(logPath, { flags: "a" });
|
||||
await new Promise<void>((resolve) => {
|
||||
if (logStream.pending) logStream.once("open", () => resolve());
|
||||
else resolve();
|
||||
});
|
||||
|
||||
const selfPath = fileURLToPath(import.meta.url);
|
||||
|
||||
const child = spawn(process.execPath, [selfPath, "start"], {
|
||||
detached: true,
|
||||
stdio: ["ignore", logStream.fd, logStream.fd],
|
||||
env: { ...process.env, NERVE_DAEMON_MODE: "1" },
|
||||
});
|
||||
|
||||
child.unref();
|
||||
|
||||
const pid = child.pid;
|
||||
if (!pid) {
|
||||
process.stderr.write("❌ Failed to spawn daemon process.\n");
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
writePidFile(pid);
|
||||
process.stdout.write(`✅ Nerve daemon started (pid ${pid}).\n`);
|
||||
process.stdout.write(` Logs: ${logPath}\n`);
|
||||
process.stdout.write(" Run `nerve stop` to stop.\n");
|
||||
}
|
||||
|
||||
export const startCommand = defineCommand({
|
||||
meta: {
|
||||
name: "start",
|
||||
description: "Start the nerve daemon",
|
||||
},
|
||||
args: {
|
||||
daemon: {
|
||||
type: "boolean",
|
||||
alias: "d",
|
||||
description: "Run as background daemon",
|
||||
default: false,
|
||||
},
|
||||
},
|
||||
async run({ args }) {
|
||||
const nerveRoot = getNerveRoot();
|
||||
|
||||
if (args.daemon) {
|
||||
await runDaemon(nerveRoot);
|
||||
} else {
|
||||
await runForeground(nerveRoot);
|
||||
}
|
||||
},
|
||||
});
|
||||
@@ -0,0 +1,79 @@
|
||||
import { readFileSync, statSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
|
||||
import { parseNerveConfig } from "@uncaged/nerve-core";
|
||||
import { defineCommand } from "citty";
|
||||
|
||||
import { getNerveRoot, getPidPath, isRunning, readPidFile } from "../workspace.js";
|
||||
|
||||
function formatUptime(ms: number): string {
|
||||
const totalSeconds = Math.floor(ms / 1000);
|
||||
const hours = Math.floor(totalSeconds / 3600);
|
||||
const minutes = Math.floor((totalSeconds % 3600) / 60);
|
||||
const seconds = totalSeconds % 60;
|
||||
if (hours > 0) return `${hours}h ${minutes}m ${seconds}s`;
|
||||
if (minutes > 0) return `${minutes}m ${seconds}s`;
|
||||
return `${seconds}s`;
|
||||
}
|
||||
|
||||
function getUptimeMs(pid: number): number | null {
|
||||
try {
|
||||
const pidStat = readFileSync(`/proc/${pid}/stat`, "utf8").split(" ");
|
||||
const startJiffies = Number(pidStat[21]);
|
||||
const clkTck = 100;
|
||||
const uptimeRaw = readFileSync("/proc/uptime", "utf8").split(" ")[0];
|
||||
const systemUptimeSec = Number.parseFloat(uptimeRaw);
|
||||
const processStartSec = startJiffies / clkTck;
|
||||
return (systemUptimeSec - processStartSec) * 1000;
|
||||
} catch {
|
||||
// /proc not available (non-Linux); fall back to PID file mtime
|
||||
try {
|
||||
const pidMtime = statSync(getPidPath()).mtimeMs;
|
||||
return Date.now() - pidMtime;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const statusCommand = defineCommand({
|
||||
meta: {
|
||||
name: "status",
|
||||
description: "Show nerve daemon status",
|
||||
},
|
||||
async run() {
|
||||
if (!isRunning()) {
|
||||
process.stdout.write("😴 Nerve daemon is not running.\n");
|
||||
return;
|
||||
}
|
||||
|
||||
const pid = readPidFile() as number;
|
||||
|
||||
const configPath = join(getNerveRoot(), "nerve.yaml");
|
||||
let senseList: string[] = [];
|
||||
let workerGroups: string[] = [];
|
||||
|
||||
try {
|
||||
const raw = readFileSync(configPath, "utf8");
|
||||
const result = parseNerveConfig(raw);
|
||||
if (result.ok) {
|
||||
senseList = Object.keys(result.value.senses);
|
||||
workerGroups = [...new Set(Object.values(result.value.senses).map((s) => s.group))];
|
||||
}
|
||||
} catch {
|
||||
// config may not be readable; continue with what we have
|
||||
}
|
||||
|
||||
const uptimeMs = getUptimeMs(pid);
|
||||
const uptimeStr = uptimeMs !== null ? formatUptime(uptimeMs) : "unknown";
|
||||
|
||||
process.stdout.write("✅ Nerve daemon is running.\n");
|
||||
process.stdout.write(` pid: ${pid}\n`);
|
||||
process.stdout.write(` uptime: ${uptimeStr}\n`);
|
||||
process.stdout.write(` senses: ${senseList.length > 0 ? senseList.join(", ") : "(none)"}\n`);
|
||||
process.stdout.write(
|
||||
` workers: ${workerGroups.length > 0 ? workerGroups.join(", ") : "(none)"}\n`,
|
||||
);
|
||||
process.stdout.write(" signals: (pending SignalBus persistence)\n");
|
||||
},
|
||||
});
|
||||
@@ -0,0 +1,58 @@
|
||||
import { defineCommand } from "citty";
|
||||
|
||||
import { isRunning, readPidFile, removePidFile } from "../workspace.js";
|
||||
|
||||
async function waitForExit(pid: number, timeoutMs: number): Promise<boolean> {
|
||||
const start = Date.now();
|
||||
while (Date.now() - start < timeoutMs) {
|
||||
try {
|
||||
process.kill(pid, 0);
|
||||
} catch {
|
||||
return true;
|
||||
}
|
||||
await new Promise<void>((resolve) => setTimeout(resolve, 200));
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
export const stopCommand = defineCommand({
|
||||
meta: {
|
||||
name: "stop",
|
||||
description: "Stop the nerve daemon",
|
||||
},
|
||||
async run() {
|
||||
const pid = readPidFile();
|
||||
if (pid === null) {
|
||||
process.stdout.write("⚠️ No PID file found — daemon may not be running.\n");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!isRunning()) {
|
||||
process.stdout.write("⚠️ Daemon is not running (stale PID file). Cleaning up.\n");
|
||||
removePidFile();
|
||||
return;
|
||||
}
|
||||
|
||||
process.stdout.write(`Stopping nerve daemon (pid ${pid})…\n`);
|
||||
try {
|
||||
process.kill(pid, "SIGTERM");
|
||||
} catch (e) {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`❌ Failed to send SIGTERM: ${msg}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const graceful = await waitForExit(pid, 10_000);
|
||||
if (!graceful) {
|
||||
process.stdout.write("⚠️ Daemon did not exit in 10s — sending SIGKILL.\n");
|
||||
try {
|
||||
process.kill(pid, "SIGKILL");
|
||||
} catch {
|
||||
// already dead
|
||||
}
|
||||
}
|
||||
|
||||
removePidFile();
|
||||
process.stdout.write("✅ Daemon stopped.\n");
|
||||
},
|
||||
});
|
||||
@@ -0,0 +1,40 @@
|
||||
import { readFileSync } from "node:fs";
|
||||
import { join } from "node:path";
|
||||
|
||||
import { parseNerveConfig } from "@uncaged/nerve-core";
|
||||
import { defineCommand } from "citty";
|
||||
|
||||
import { getNerveRoot } from "../workspace.js";
|
||||
|
||||
export const validateCommand = defineCommand({
|
||||
meta: {
|
||||
name: "validate",
|
||||
description: "Validate nerve.yaml configuration",
|
||||
},
|
||||
async run() {
|
||||
const configPath = join(getNerveRoot(), "nerve.yaml");
|
||||
let raw: string;
|
||||
try {
|
||||
raw = readFileSync(configPath, "utf8");
|
||||
} catch (e) {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`❌ Cannot read ${configPath}: ${msg}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const result = parseNerveConfig(raw);
|
||||
if (!result.ok) {
|
||||
process.stderr.write(`❌ Config validation failed: ${result.error.message}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const config = result.value;
|
||||
const senseCount = Object.keys(config.senses).length;
|
||||
const reflexCount = config.reflexes.length;
|
||||
const workflowCount = config.workflows ? Object.keys(config.workflows).length : 0;
|
||||
|
||||
process.stdout.write(
|
||||
`✅ nerve.yaml is valid — ${senseCount} sense(s), ${reflexCount} reflex(es), ${workflowCount} workflow(s)\n`,
|
||||
);
|
||||
},
|
||||
});
|
||||
@@ -1,2 +1,12 @@
|
||||
export {
|
||||
getNerveRoot,
|
||||
getPidPath,
|
||||
getLogPath,
|
||||
readPidFile,
|
||||
writePidFile,
|
||||
removePidFile,
|
||||
isRunning,
|
||||
} from "./workspace.js";
|
||||
|
||||
export { createKernel } from "@uncaged/nerve-daemon";
|
||||
export type { Kernel } from "@uncaged/nerve-daemon";
|
||||
|
||||
@@ -0,0 +1,45 @@
|
||||
import { existsSync, readFileSync, rmSync, writeFileSync } from "node:fs";
|
||||
import { homedir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
|
||||
export function getNerveRoot(): string {
|
||||
return join(homedir(), ".uncaged-nerve");
|
||||
}
|
||||
|
||||
export function getPidPath(): string {
|
||||
return join(getNerveRoot(), "nerve.pid");
|
||||
}
|
||||
|
||||
export function getLogPath(): string {
|
||||
return join(getNerveRoot(), "logs", "nerve.log");
|
||||
}
|
||||
|
||||
export function readPidFile(): number | null {
|
||||
const pidPath = getPidPath();
|
||||
if (!existsSync(pidPath)) return null;
|
||||
const raw = readFileSync(pidPath, "utf8").trim();
|
||||
const pid = Number.parseInt(raw, 10);
|
||||
return Number.isNaN(pid) ? null : pid;
|
||||
}
|
||||
|
||||
export function writePidFile(pid: number): void {
|
||||
writeFileSync(getPidPath(), String(pid), "utf8");
|
||||
}
|
||||
|
||||
export function removePidFile(): void {
|
||||
const pidPath = getPidPath();
|
||||
if (existsSync(pidPath)) {
|
||||
rmSync(pidPath);
|
||||
}
|
||||
}
|
||||
|
||||
export function isRunning(): boolean {
|
||||
const pid = readPidFile();
|
||||
if (pid === null) return false;
|
||||
try {
|
||||
process.kill(pid, 0);
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -2,7 +2,9 @@
|
||||
"extends": "../../tsconfig.json",
|
||||
"compilerOptions": {
|
||||
"outDir": "dist",
|
||||
"rootDir": "src"
|
||||
"rootDir": "src",
|
||||
"composite": false,
|
||||
"types": ["node"]
|
||||
},
|
||||
"include": ["src"]
|
||||
}
|
||||
|
||||
@@ -2,7 +2,8 @@
|
||||
"extends": "../../tsconfig.json",
|
||||
"compilerOptions": {
|
||||
"outDir": "dist",
|
||||
"rootDir": "src"
|
||||
"rootDir": "src",
|
||||
"composite": false
|
||||
},
|
||||
"include": ["src"]
|
||||
}
|
||||
|
||||
@@ -0,0 +1,126 @@
|
||||
/**
|
||||
* Unit tests for file-watcher.ts
|
||||
*/
|
||||
|
||||
import { mkdirSync, mkdtempSync, writeFileSync } from "node:fs";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
|
||||
import { createFileWatcher } from "../file-watcher.js";
|
||||
import type { FileChange, FileWatcher } from "../file-watcher.js";
|
||||
|
||||
function makeTempNerveRoot(): string {
|
||||
const dir = mkdtempSync(join(tmpdir(), "nerve-fw-test-"));
|
||||
mkdirSync(join(dir, "senses", "cpu-usage"), { recursive: true });
|
||||
writeFileSync(join(dir, "nerve.yaml"), "senses: {}\nreflexes: []\n");
|
||||
writeFileSync(
|
||||
join(dir, "senses", "cpu-usage", "index.ts"),
|
||||
"export async function compute() { return null; }",
|
||||
);
|
||||
return dir;
|
||||
}
|
||||
|
||||
async function waitFor(
|
||||
predicate: () => boolean,
|
||||
timeoutMs: number,
|
||||
intervalMs = 50,
|
||||
): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const timer = setTimeout(
|
||||
() => reject(new Error(`waitFor timed out after ${timeoutMs}ms`)),
|
||||
timeoutMs,
|
||||
);
|
||||
const check = setInterval(() => {
|
||||
if (predicate()) {
|
||||
clearTimeout(timer);
|
||||
clearInterval(check);
|
||||
resolve();
|
||||
}
|
||||
}, intervalMs);
|
||||
});
|
||||
}
|
||||
|
||||
describe("createFileWatcher", () => {
|
||||
let watcher: FileWatcher | null = null;
|
||||
|
||||
afterEach(() => {
|
||||
if (watcher !== null) {
|
||||
watcher.close();
|
||||
watcher = null;
|
||||
}
|
||||
});
|
||||
|
||||
it("detects nerve.yaml changes", async () => {
|
||||
const root = makeTempNerveRoot();
|
||||
const changes: FileChange[] = [];
|
||||
|
||||
watcher = createFileWatcher(root, (change) => changes.push(change), 50);
|
||||
|
||||
// Wait a bit for watcher to initialize, then modify nerve.yaml
|
||||
await new Promise((r) => setTimeout(r, 100));
|
||||
writeFileSync(join(root, "nerve.yaml"), "senses: {}\nreflexes: []\n# changed\n");
|
||||
|
||||
await waitFor(() => changes.length > 0, 3000);
|
||||
|
||||
expect(changes.length).toBeGreaterThanOrEqual(1);
|
||||
expect(changes.some((c) => c.kind === "config")).toBe(true);
|
||||
}, 10_000);
|
||||
|
||||
it("detects sense .ts file changes", async () => {
|
||||
const root = makeTempNerveRoot();
|
||||
const changes: FileChange[] = [];
|
||||
|
||||
watcher = createFileWatcher(root, (change) => changes.push(change), 50);
|
||||
|
||||
await new Promise((r) => setTimeout(r, 100));
|
||||
writeFileSync(
|
||||
join(root, "senses", "cpu-usage", "index.ts"),
|
||||
"export async function compute() { return 42; }",
|
||||
);
|
||||
|
||||
await waitFor(() => changes.length > 0, 3000);
|
||||
|
||||
expect(changes.length).toBeGreaterThanOrEqual(1);
|
||||
const senseChanges = changes.filter((c) => c.kind === "sense");
|
||||
expect(senseChanges.length).toBeGreaterThanOrEqual(1);
|
||||
expect((senseChanges[0] as { senseName: string }).senseName).toBe("cpu-usage");
|
||||
}, 10_000);
|
||||
|
||||
it("close() stops the watcher", async () => {
|
||||
const root = makeTempNerveRoot();
|
||||
const changes: FileChange[] = [];
|
||||
|
||||
watcher = createFileWatcher(root, (change) => changes.push(change), 50);
|
||||
|
||||
await new Promise((r) => setTimeout(r, 100));
|
||||
watcher.close();
|
||||
watcher = null;
|
||||
|
||||
writeFileSync(join(root, "nerve.yaml"), "senses: {}\nreflexes: []\n# after close\n");
|
||||
|
||||
// Wait and verify no changes were captured
|
||||
await new Promise((r) => setTimeout(r, 500));
|
||||
expect(changes.length).toBe(0);
|
||||
}, 5_000);
|
||||
|
||||
it("debounces rapid changes", async () => {
|
||||
const root = makeTempNerveRoot();
|
||||
const changes: FileChange[] = [];
|
||||
|
||||
watcher = createFileWatcher(root, (change) => changes.push(change), 200);
|
||||
|
||||
await new Promise((r) => setTimeout(r, 100));
|
||||
|
||||
// Write rapidly
|
||||
for (let i = 0; i < 5; i++) {
|
||||
writeFileSync(join(root, "nerve.yaml"), `senses: {}\nreflexes: []\n# v${i}\n`);
|
||||
}
|
||||
|
||||
await waitFor(() => changes.length > 0, 3000);
|
||||
|
||||
// With debounce, should see only 1 change (not 5)
|
||||
expect(changes.length).toBe(1);
|
||||
}, 10_000);
|
||||
});
|
||||
@@ -0,0 +1,36 @@
|
||||
/**
|
||||
* Mock worker that crashes on first compute, then works normally after respawn.
|
||||
* Uses a marker file to track if it has already crashed.
|
||||
*
|
||||
* First invocation: sends ready, then exits with code 1 on first compute.
|
||||
* Subsequent invocations (after respawn): works like normal mock-worker.
|
||||
*/
|
||||
|
||||
import { existsSync, unlinkSync, writeFileSync } from "node:fs";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
|
||||
const markerFile = join(tmpdir(), `nerve-crash-once-${process.ppid}.marker`);
|
||||
|
||||
process.send({ type: "ready" });
|
||||
|
||||
const hasCrashed = existsSync(markerFile);
|
||||
|
||||
process.on("message", (msg) => {
|
||||
if (!msg || typeof msg !== "object") return;
|
||||
|
||||
if (msg.type === "shutdown") {
|
||||
try {
|
||||
unlinkSync(markerFile);
|
||||
} catch {}
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
if (msg.type === "compute" && typeof msg.sense === "string") {
|
||||
if (!hasCrashed) {
|
||||
writeFileSync(markerFile, "crashed", "utf8");
|
||||
process.exit(1);
|
||||
}
|
||||
process.send({ type: "signal", sense: msg.sense, payload: 42 });
|
||||
}
|
||||
});
|
||||
@@ -0,0 +1,18 @@
|
||||
/**
|
||||
* Mock worker that responds to compute with an error message.
|
||||
* Used for error isolation integration tests.
|
||||
*/
|
||||
|
||||
process.send({ type: "ready" });
|
||||
|
||||
process.on("message", (msg) => {
|
||||
if (!msg || typeof msg !== "object") return;
|
||||
|
||||
if (msg.type === "shutdown") {
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
if (msg.type === "compute" && typeof msg.sense === "string") {
|
||||
process.send({ type: "error", sense: msg.sense, error: "simulated compute error" });
|
||||
}
|
||||
});
|
||||
@@ -0,0 +1,24 @@
|
||||
/**
|
||||
* Mock worker that takes a long time to respond to compute messages.
|
||||
* Used for grace period / timeout integration tests.
|
||||
*
|
||||
* On compute: waits 10 seconds before responding (simulates hung compute).
|
||||
* On shutdown: exits cleanly.
|
||||
*/
|
||||
|
||||
process.send({ type: "ready" });
|
||||
|
||||
process.on("message", (msg) => {
|
||||
if (!msg || typeof msg !== "object") return;
|
||||
|
||||
if (msg.type === "shutdown") {
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
if (msg.type === "compute" && typeof msg.sense === "string") {
|
||||
// Intentionally slow — will be killed by grace period
|
||||
setTimeout(() => {
|
||||
process.send({ type: "signal", sense: msg.sense, payload: "late" });
|
||||
}, 10_000);
|
||||
}
|
||||
});
|
||||
@@ -0,0 +1,241 @@
|
||||
/**
|
||||
* Unit tests for kernel Phase 6 enhancements — restartGroup, reloadConfig, getHealth.
|
||||
* Uses mocked child_process.fork.
|
||||
*/
|
||||
|
||||
import { EventEmitter } from "node:events";
|
||||
|
||||
import type { NerveConfig } from "@uncaged/nerve-core";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Mock child_process.fork
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const mockChildren: MockChild[] = [];
|
||||
|
||||
type MockChild = EventEmitter & {
|
||||
send: ReturnType<typeof vi.fn>;
|
||||
kill: ReturnType<typeof vi.fn>;
|
||||
pid: number;
|
||||
connected: boolean;
|
||||
};
|
||||
|
||||
function makeMockChild(pid = 1): MockChild {
|
||||
const child = new EventEmitter() as MockChild;
|
||||
child.connected = true;
|
||||
child.send = vi.fn((msg: unknown) => {
|
||||
if (
|
||||
msg !== null &&
|
||||
typeof msg === "object" &&
|
||||
(msg as Record<string, unknown>).type === "shutdown"
|
||||
) {
|
||||
setImmediate(() => {
|
||||
child.connected = false;
|
||||
child.emit("exit", 0, null);
|
||||
});
|
||||
}
|
||||
});
|
||||
child.kill = vi.fn((_signal?: string) => {
|
||||
child.connected = false;
|
||||
child.emit("exit", null, _signal ?? "SIGKILL");
|
||||
});
|
||||
child.pid = pid;
|
||||
// Auto-emit ready so restartGroup's await resolves
|
||||
setImmediate(() => {
|
||||
child.emit("message", { type: "ready" });
|
||||
});
|
||||
return child;
|
||||
}
|
||||
|
||||
vi.mock("node:child_process", () => ({
|
||||
fork: vi.fn((_script: string, _args: string[], _opts: unknown) => {
|
||||
const child = makeMockChild(mockChildren.length + 100);
|
||||
mockChildren.push(child);
|
||||
return child;
|
||||
}),
|
||||
}));
|
||||
|
||||
// Must also mock file-watcher since kernel imports it
|
||||
vi.mock("../file-watcher.js", () => ({
|
||||
createFileWatcher: vi.fn(() => ({ close: vi.fn() })),
|
||||
}));
|
||||
|
||||
const { createKernel } = await import("../kernel.js");
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Helpers
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
return {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("kernel — getHealth", () => {
|
||||
beforeEach(() => {
|
||||
mockChildren.length = 0;
|
||||
vi.useFakeTimers({ shouldAdvanceTime: true });
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("returns correct health shape", async () => {
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"disk-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
});
|
||||
const kernel = createKernel(config, "/tmp/nerve-test");
|
||||
|
||||
const health = kernel.getHealth();
|
||||
expect(health.activeSenses).toBe(3);
|
||||
expect(health.activeGroups).toBe(2);
|
||||
expect(health.uptime).toBeGreaterThanOrEqual(0);
|
||||
expect(health.memoryUsage).toBeDefined();
|
||||
expect(typeof health.memoryUsage.heapUsed).toBe("number");
|
||||
|
||||
await kernel.stop();
|
||||
});
|
||||
});
|
||||
|
||||
describe("kernel — restartGroup", () => {
|
||||
beforeEach(() => {
|
||||
mockChildren.length = 0;
|
||||
vi.useFakeTimers({ shouldAdvanceTime: true });
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("sends shutdown to old worker and spawns new one", async () => {
|
||||
const config = makeConfig();
|
||||
const kernel = createKernel(config, "/tmp/nerve-test");
|
||||
|
||||
expect(mockChildren.length).toBe(1);
|
||||
const oldChild = mockChildren[0];
|
||||
|
||||
const restartPromise = kernel.restartGroup("system");
|
||||
// The shutdown message triggers exit in the mock
|
||||
await restartPromise;
|
||||
|
||||
// A new child should have been spawned
|
||||
expect(mockChildren.length).toBe(2);
|
||||
|
||||
// Old child got shutdown
|
||||
expect(oldChild.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" }));
|
||||
|
||||
await kernel.stop();
|
||||
});
|
||||
|
||||
it("restartGroup on unknown group does nothing", async () => {
|
||||
const config = makeConfig();
|
||||
const kernel = createKernel(config, "/tmp/nerve-test");
|
||||
|
||||
expect(mockChildren.length).toBe(1);
|
||||
await kernel.restartGroup("nonexistent");
|
||||
// No new child spawned
|
||||
expect(mockChildren.length).toBe(1);
|
||||
|
||||
await kernel.stop();
|
||||
});
|
||||
});
|
||||
|
||||
describe("kernel — reloadConfig", () => {
|
||||
beforeEach(() => {
|
||||
mockChildren.length = 0;
|
||||
vi.useFakeTimers({ shouldAdvanceTime: true });
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("adds new group worker when new sense group appears", async () => {
|
||||
const config = makeConfig();
|
||||
const kernel = createKernel(config, "/tmp/nerve-test");
|
||||
|
||||
expect(mockChildren.length).toBe(1); // only system group
|
||||
expect(kernel.groups.has("network")).toBe(false);
|
||||
|
||||
kernel.reloadConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
});
|
||||
|
||||
expect(kernel.groups.has("network")).toBe(true);
|
||||
expect(mockChildren.length).toBe(2); // system + network
|
||||
|
||||
await kernel.stop();
|
||||
});
|
||||
|
||||
it("removes group worker when its senses are all removed", async () => {
|
||||
const config: NerveConfig = {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
};
|
||||
const kernel = createKernel(config, "/tmp/nerve-test");
|
||||
|
||||
expect(mockChildren.length).toBe(2);
|
||||
expect(kernel.groups.has("network")).toBe(true);
|
||||
|
||||
const networkChild = mockChildren[1];
|
||||
|
||||
kernel.reloadConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
});
|
||||
|
||||
expect(kernel.groups.has("network")).toBe(false);
|
||||
// Network child should have received shutdown
|
||||
expect(networkChild.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" }));
|
||||
|
||||
await kernel.stop();
|
||||
});
|
||||
|
||||
it("health reflects updated sense count after reloadConfig", async () => {
|
||||
const config = makeConfig();
|
||||
const kernel = createKernel(config, "/tmp/nerve-test");
|
||||
|
||||
expect(kernel.getHealth().activeSenses).toBe(1);
|
||||
|
||||
kernel.reloadConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"disk-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
});
|
||||
|
||||
expect(kernel.getHealth().activeSenses).toBe(2);
|
||||
|
||||
await kernel.stop();
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,117 @@
|
||||
import { mkdtempSync, rmSync } from "node:fs";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
import type { NerveConfig, Signal } from "@uncaged/nerve-core";
|
||||
import { createLogStore } from "../log-store.js";
|
||||
import type { LogStore } from "../log-store.js";
|
||||
import { createReflexScheduler } from "../reflex-scheduler.js";
|
||||
import { createSignalBus } from "../signal-bus.js";
|
||||
|
||||
describe("LogStore + ReflexScheduler integration", () => {
|
||||
let tmpDir: string;
|
||||
let logStore: LogStore;
|
||||
|
||||
beforeEach(() => {
|
||||
tmpDir = mkdtempSync(join(tmpdir(), "nerve-log-int-"));
|
||||
logStore = createLogStore(join(tmpDir, "logs.db"));
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
logStore.close();
|
||||
rmSync(tmpDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it("logs run_start when reflex triggers a compute", () => {
|
||||
const config: NerveConfig = {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
|
||||
workflows: null,
|
||||
};
|
||||
const bus = createSignalBus();
|
||||
const triggered: string[] = [];
|
||||
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name), {
|
||||
logStore,
|
||||
});
|
||||
|
||||
const signal: Signal = { id: 1, senseId: "cpu-usage", payload: 42, ts: Date.now() };
|
||||
bus.emit(signal);
|
||||
|
||||
const logs = logStore.query({ source: "reflex", type: "run_start" });
|
||||
expect(logs).toHaveLength(1);
|
||||
expect(logs[0].refId).toBe("cpu-usage");
|
||||
expect(triggered).toHaveLength(1);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
|
||||
it("interval reflex produces run_start logs", () => {
|
||||
vi.useFakeTimers();
|
||||
|
||||
const config: NerveConfig = {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 1000, on: null }],
|
||||
workflows: null,
|
||||
};
|
||||
const bus = createSignalBus();
|
||||
const ref: { scheduler: ReturnType<typeof createReflexScheduler> | null } = { scheduler: null };
|
||||
const scheduler = createReflexScheduler(
|
||||
config,
|
||||
bus,
|
||||
(name) => {
|
||||
ref.scheduler?.onComputeComplete(name);
|
||||
},
|
||||
{ logStore },
|
||||
);
|
||||
ref.scheduler = scheduler;
|
||||
|
||||
vi.advanceTimersByTime(3000);
|
||||
|
||||
const logs = logStore.query({ source: "reflex", type: "run_start" });
|
||||
expect(logs.length).toBeGreaterThanOrEqual(3);
|
||||
expect(logs.every((l) => l.refId === "cpu-usage")).toBe(true);
|
||||
|
||||
scheduler.stop();
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it("logs cannot trigger reflexes (architectural constraint)", () => {
|
||||
const config: NerveConfig = {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
|
||||
workflows: null,
|
||||
};
|
||||
const bus = createSignalBus();
|
||||
const triggered: string[] = [];
|
||||
const scheduler = createReflexScheduler(
|
||||
config,
|
||||
bus,
|
||||
(name) => {
|
||||
triggered.push(name);
|
||||
scheduler.onComputeComplete(name);
|
||||
},
|
||||
{ logStore },
|
||||
);
|
||||
|
||||
logStore.append({
|
||||
source: "reflex",
|
||||
type: "run_complete",
|
||||
refId: "cpu-usage",
|
||||
payload: '{"v":99}',
|
||||
ts: Date.now(),
|
||||
});
|
||||
|
||||
// Writing to the log store should NOT trigger any reflex.
|
||||
// Only bus.emit(signal) triggers reflexes.
|
||||
expect(triggered).toHaveLength(0);
|
||||
|
||||
scheduler.stop();
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,214 @@
|
||||
import { mkdtempSync, rmSync } from "node:fs";
|
||||
import { tmpdir } from "node:os";
|
||||
import { join } from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it } from "vitest";
|
||||
|
||||
import { createLogStore } from "../log-store.js";
|
||||
import type { LogStore } from "../log-store.js";
|
||||
|
||||
describe("LogStore", () => {
|
||||
let tmpDir: string;
|
||||
let store: LogStore;
|
||||
|
||||
beforeEach(() => {
|
||||
tmpDir = mkdtempSync(join(tmpdir(), "nerve-log-test-"));
|
||||
store = createLogStore(join(tmpDir, "data", "logs.db"));
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
store.close();
|
||||
rmSync(tmpDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
describe("append + query", () => {
|
||||
it("appends an entry and returns it with an id", () => {
|
||||
const entry = store.append({
|
||||
source: "system",
|
||||
type: "start",
|
||||
refId: null,
|
||||
payload: null,
|
||||
ts: 1000,
|
||||
});
|
||||
|
||||
expect(entry.id).toBe(1);
|
||||
expect(entry.source).toBe("system");
|
||||
expect(entry.type).toBe("start");
|
||||
});
|
||||
|
||||
it("auto-increments ids", () => {
|
||||
const e1 = store.append({
|
||||
source: "system",
|
||||
type: "start",
|
||||
refId: null,
|
||||
payload: null,
|
||||
ts: 1000,
|
||||
});
|
||||
const e2 = store.append({
|
||||
source: "system",
|
||||
type: "stop",
|
||||
refId: null,
|
||||
payload: null,
|
||||
ts: 2000,
|
||||
});
|
||||
|
||||
expect(e2.id).toBe((e1.id ?? 0) + 1);
|
||||
});
|
||||
|
||||
it("returns all entries when queried with no filter", () => {
|
||||
store.append({ source: "system", type: "start", refId: null, payload: null, ts: 1000 });
|
||||
store.append({ source: "reflex", type: "run_start", refId: "cpu", payload: null, ts: 2000 });
|
||||
store.append({
|
||||
source: "reflex",
|
||||
type: "run_complete",
|
||||
refId: "cpu",
|
||||
payload: '{"v":42}',
|
||||
ts: 3000,
|
||||
});
|
||||
|
||||
const all = store.query();
|
||||
expect(all).toHaveLength(3);
|
||||
});
|
||||
});
|
||||
|
||||
describe("query filters", () => {
|
||||
beforeEach(() => {
|
||||
store.append({ source: "system", type: "start", refId: null, payload: null, ts: 1000 });
|
||||
store.append({ source: "reflex", type: "run_start", refId: "cpu", payload: null, ts: 2000 });
|
||||
store.append({
|
||||
source: "reflex",
|
||||
type: "run_complete",
|
||||
refId: "cpu",
|
||||
payload: '{"v":42}',
|
||||
ts: 3000,
|
||||
});
|
||||
store.append({
|
||||
source: "system",
|
||||
type: "error",
|
||||
refId: "disk",
|
||||
payload: '{"error":"fail"}',
|
||||
ts: 4000,
|
||||
});
|
||||
store.append({ source: "system", type: "stop", refId: null, payload: null, ts: 5000 });
|
||||
});
|
||||
|
||||
it("filters by source", () => {
|
||||
const results = store.query({ source: "reflex" });
|
||||
expect(results).toHaveLength(2);
|
||||
expect(results.every((r) => r.source === "reflex")).toBe(true);
|
||||
});
|
||||
|
||||
it("filters by type", () => {
|
||||
const results = store.query({ type: "error" });
|
||||
expect(results).toHaveLength(1);
|
||||
expect(results[0].refId).toBe("disk");
|
||||
});
|
||||
|
||||
it("filters by refId", () => {
|
||||
const results = store.query({ refId: "cpu" });
|
||||
expect(results).toHaveLength(2);
|
||||
});
|
||||
|
||||
it("filters by since (inclusive)", () => {
|
||||
const results = store.query({ since: 3000 });
|
||||
expect(results).toHaveLength(3);
|
||||
expect(results[0].ts).toBe(3000);
|
||||
});
|
||||
|
||||
it("filters by until (inclusive)", () => {
|
||||
const results = store.query({ until: 2000 });
|
||||
expect(results).toHaveLength(2);
|
||||
});
|
||||
|
||||
it("filters by since + until range", () => {
|
||||
const results = store.query({ since: 2000, until: 4000 });
|
||||
expect(results).toHaveLength(3);
|
||||
});
|
||||
|
||||
it("applies limit", () => {
|
||||
const results = store.query({ limit: 2 });
|
||||
expect(results).toHaveLength(2);
|
||||
expect(results[0].type).toBe("start");
|
||||
expect(results[1].type).toBe("run_start");
|
||||
});
|
||||
|
||||
it("combines multiple filters", () => {
|
||||
const results = store.query({ source: "system", since: 4000 });
|
||||
expect(results).toHaveLength(2);
|
||||
expect(results[0].type).toBe("error");
|
||||
expect(results[1].type).toBe("stop");
|
||||
});
|
||||
|
||||
it("returns empty array when no matches", () => {
|
||||
const results = store.query({ source: "workflow" });
|
||||
expect(results).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe("query ordering", () => {
|
||||
it("returns entries in insertion order (ascending id)", () => {
|
||||
store.append({ source: "system", type: "start", refId: null, payload: null, ts: 5000 });
|
||||
store.append({ source: "reflex", type: "run_start", refId: "a", payload: null, ts: 1000 });
|
||||
|
||||
const all = store.query();
|
||||
expect(all[0].ts).toBe(5000);
|
||||
expect(all[1].ts).toBe(1000);
|
||||
});
|
||||
});
|
||||
|
||||
describe("meta table", () => {
|
||||
it("returns null for nonexistent key", () => {
|
||||
expect(store.getMeta("missing")).toBeNull();
|
||||
});
|
||||
|
||||
it("sets and gets a value", () => {
|
||||
store.setMeta("archived_up_to", "2026-03-22");
|
||||
expect(store.getMeta("archived_up_to")).toBe("2026-03-22");
|
||||
});
|
||||
|
||||
it("upserts on duplicate key", () => {
|
||||
store.setMeta("version", "1");
|
||||
store.setMeta("version", "2");
|
||||
expect(store.getMeta("version")).toBe("2");
|
||||
});
|
||||
|
||||
it("supports multiple keys", () => {
|
||||
store.setMeta("a", "1");
|
||||
store.setMeta("b", "2");
|
||||
expect(store.getMeta("a")).toBe("1");
|
||||
expect(store.getMeta("b")).toBe("2");
|
||||
});
|
||||
});
|
||||
|
||||
describe("append-only semantics", () => {
|
||||
it("ids are always increasing", () => {
|
||||
const entries = Array.from({ length: 10 }, (_, i) =>
|
||||
store.append({ source: "system", type: "test", refId: null, payload: null, ts: i }),
|
||||
);
|
||||
|
||||
for (let i = 1; i < entries.length; i++) {
|
||||
expect(entries[i].id).toBeGreaterThan(entries[i - 1].id ?? 0);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
describe("payload JSON round-trip", () => {
|
||||
it("preserves JSON payload", () => {
|
||||
const payload = JSON.stringify({ cpu: 95, host: "node-1" });
|
||||
store.append({ source: "reflex", type: "run_complete", refId: "cpu", payload, ts: 1000 });
|
||||
|
||||
const results = store.query({ refId: "cpu" });
|
||||
expect(results).toHaveLength(1);
|
||||
expect(JSON.parse(results[0].payload ?? "null")).toEqual({ cpu: 95, host: "node-1" });
|
||||
});
|
||||
});
|
||||
|
||||
describe("creates parent directories", () => {
|
||||
it("creates nested directory structure for db path", () => {
|
||||
const deepPath = join(tmpDir, "a", "b", "c", "test.db");
|
||||
const deepStore = createLogStore(deepPath);
|
||||
deepStore.append({ source: "system", type: "start", refId: null, payload: null, ts: 1000 });
|
||||
expect(deepStore.query()).toHaveLength(1);
|
||||
deepStore.close();
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,361 @@
|
||||
/**
|
||||
* Phase 6 integration tests — hot reload, error isolation, grace period, health.
|
||||
*/
|
||||
|
||||
import { dirname, join } from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
|
||||
import type { Signal } from "@uncaged/nerve-core";
|
||||
import type { NerveConfig } from "@uncaged/nerve-core";
|
||||
import { afterEach, describe, expect, it } from "vitest";
|
||||
|
||||
import { createKernel } from "../kernel.js";
|
||||
import type { Kernel } from "../kernel.js";
|
||||
|
||||
const __filename = fileURLToPath(import.meta.url);
|
||||
const __dir = dirname(__filename);
|
||||
const MOCK_WORKER = join(__dir, "fixtures", "mock-worker.mjs");
|
||||
const ERROR_WORKER = join(__dir, "fixtures", "error-worker.mjs");
|
||||
|
||||
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
|
||||
return {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
async function pollUntil(
|
||||
predicate: () => boolean,
|
||||
timeoutMs: number,
|
||||
intervalMs = 50,
|
||||
): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
const timer = setTimeout(
|
||||
() => reject(new Error(`pollUntil timed out after ${timeoutMs}ms`)),
|
||||
timeoutMs,
|
||||
);
|
||||
const check = setInterval(() => {
|
||||
if (predicate()) {
|
||||
clearTimeout(timer);
|
||||
clearInterval(check);
|
||||
resolve();
|
||||
}
|
||||
}, intervalMs);
|
||||
});
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Hot Reload — restartGroup
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("phase6 — restartGroup", () => {
|
||||
let kernel: Kernel | null = null;
|
||||
|
||||
afterEach(async () => {
|
||||
if (kernel !== null) {
|
||||
await kernel.stop();
|
||||
kernel = null;
|
||||
}
|
||||
});
|
||||
|
||||
it("restartGroup stops old worker and spawns a new one", async () => {
|
||||
const config = makeConfig();
|
||||
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
|
||||
workerScript: MOCK_WORKER,
|
||||
});
|
||||
|
||||
await kernel.ready;
|
||||
|
||||
const oldPid = kernel.getWorkerPid("system");
|
||||
expect(oldPid).not.toBeNull();
|
||||
|
||||
await kernel.restartGroup("system");
|
||||
|
||||
// Wait for new worker to become ready
|
||||
await pollUntil(() => {
|
||||
const newPid = kernel?.getWorkerPid("system");
|
||||
return newPid !== null && newPid !== oldPid;
|
||||
}, 5000);
|
||||
|
||||
const newPid = kernel.getWorkerPid("system");
|
||||
expect(newPid).not.toBeNull();
|
||||
expect(newPid).not.toBe(oldPid);
|
||||
|
||||
// Verify new worker is functional
|
||||
const received: Signal[] = [];
|
||||
const unsub = kernel.bus.subscribe((s) => received.push(s));
|
||||
kernel.triggerCompute("cpu-usage");
|
||||
await pollUntil(() => received.length > 0, 3000);
|
||||
expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
|
||||
unsub();
|
||||
}, 15_000);
|
||||
|
||||
it("restartGroup on nonexistent group does nothing", async () => {
|
||||
const config = makeConfig();
|
||||
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
|
||||
workerScript: MOCK_WORKER,
|
||||
});
|
||||
await kernel.ready;
|
||||
|
||||
// Should not throw
|
||||
await kernel.restartGroup("nonexistent");
|
||||
}, 5_000);
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Hot Reload — reloadConfig
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("phase6 — reloadConfig", () => {
|
||||
let kernel: Kernel | null = null;
|
||||
|
||||
afterEach(async () => {
|
||||
if (kernel !== null) {
|
||||
await kernel.stop();
|
||||
kernel = null;
|
||||
}
|
||||
});
|
||||
|
||||
it("adds new group when new sense group is introduced", async () => {
|
||||
const config = makeConfig();
|
||||
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
|
||||
workerScript: MOCK_WORKER,
|
||||
});
|
||||
await kernel.ready;
|
||||
|
||||
expect(kernel.groups.has("network")).toBe(false);
|
||||
|
||||
const newConfig: NerveConfig = {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
};
|
||||
|
||||
kernel.reloadConfig(newConfig);
|
||||
|
||||
expect(kernel.groups.has("network")).toBe(true);
|
||||
|
||||
// Wait for the new network worker to start
|
||||
await pollUntil(() => kernel?.getWorkerPid("network") !== null, 3000);
|
||||
expect(kernel.getWorkerPid("network")).not.toBeNull();
|
||||
}, 10_000);
|
||||
|
||||
it("removes group when all its senses are removed", async () => {
|
||||
const config: NerveConfig = {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
};
|
||||
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
|
||||
workerScript: MOCK_WORKER,
|
||||
});
|
||||
await kernel.ready;
|
||||
|
||||
expect(kernel.groups.has("network")).toBe(true);
|
||||
|
||||
const newConfig: NerveConfig = {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
};
|
||||
|
||||
kernel.reloadConfig(newConfig);
|
||||
|
||||
expect(kernel.groups.has("network")).toBe(false);
|
||||
}, 10_000);
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Error Isolation
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("phase6 — error isolation", () => {
|
||||
let kernel: Kernel | null = null;
|
||||
|
||||
afterEach(async () => {
|
||||
if (kernel !== null) {
|
||||
await kernel.stop();
|
||||
kernel = null;
|
||||
}
|
||||
});
|
||||
|
||||
it("error from one sense does not crash the worker — other senses still work", async () => {
|
||||
const config: NerveConfig = {
|
||||
senses: {
|
||||
"good-sense": { group: "mixed", throttle: null, timeout: null, gracePeriod: null },
|
||||
"bad-sense": { group: "mixed", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
};
|
||||
|
||||
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
|
||||
workerScript: MOCK_WORKER,
|
||||
});
|
||||
await kernel.ready;
|
||||
|
||||
// Both senses go through the same worker (mock-worker responds to all compute with signal)
|
||||
const received: Signal[] = [];
|
||||
const unsub = kernel.bus.subscribe((s) => received.push(s));
|
||||
|
||||
kernel.triggerCompute("good-sense");
|
||||
await pollUntil(() => received.length > 0, 3000);
|
||||
expect(received[0]).toMatchObject({ senseId: "good-sense" });
|
||||
|
||||
kernel.triggerCompute("bad-sense");
|
||||
await pollUntil(() => received.length > 1, 3000);
|
||||
expect(received[1]).toMatchObject({ senseId: "bad-sense" });
|
||||
|
||||
unsub();
|
||||
}, 10_000);
|
||||
|
||||
it("error worker sends error messages, kernel still running", async () => {
|
||||
const stderrMessages: string[] = [];
|
||||
const stderrSpy = ((original) => {
|
||||
return (chunk: string | Uint8Array) => {
|
||||
stderrMessages.push(String(chunk));
|
||||
return original.call(process.stderr, chunk);
|
||||
};
|
||||
})(process.stderr.write);
|
||||
const origWrite = process.stderr.write;
|
||||
process.stderr.write = stderrSpy as typeof process.stderr.write;
|
||||
|
||||
const config = makeConfig();
|
||||
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
|
||||
workerScript: ERROR_WORKER,
|
||||
});
|
||||
await kernel.ready;
|
||||
|
||||
kernel.triggerCompute("cpu-usage");
|
||||
|
||||
// Wait for the error to be logged
|
||||
await pollUntil(() => stderrMessages.some((m) => m.includes("simulated compute error")), 3000);
|
||||
|
||||
process.stderr.write = origWrite;
|
||||
|
||||
// Kernel should still be running (not crashed)
|
||||
expect(kernel.getWorkerPid("system")).not.toBeNull();
|
||||
}, 10_000);
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// getHealth
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("phase6 — getHealth", () => {
|
||||
let kernel: Kernel | null = null;
|
||||
|
||||
afterEach(async () => {
|
||||
if (kernel !== null) {
|
||||
await kernel.stop();
|
||||
kernel = null;
|
||||
}
|
||||
});
|
||||
|
||||
it("returns health snapshot with correct shape", async () => {
|
||||
const config = makeConfig({
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"disk-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
});
|
||||
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
|
||||
workerScript: MOCK_WORKER,
|
||||
});
|
||||
await kernel.ready;
|
||||
|
||||
const health = kernel.getHealth();
|
||||
|
||||
expect(health.uptime).toBeGreaterThanOrEqual(0);
|
||||
expect(health.activeSenses).toBe(3);
|
||||
expect(health.activeGroups).toBe(2);
|
||||
expect(health.memoryUsage).toBeDefined();
|
||||
expect(typeof health.memoryUsage.heapUsed).toBe("number");
|
||||
}, 10_000);
|
||||
|
||||
it("health reflects config changes after reloadConfig", async () => {
|
||||
const config = makeConfig();
|
||||
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
|
||||
workerScript: MOCK_WORKER,
|
||||
});
|
||||
await kernel.ready;
|
||||
|
||||
expect(kernel.getHealth().activeSenses).toBe(1);
|
||||
|
||||
const newConfig: NerveConfig = {
|
||||
senses: {
|
||||
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
|
||||
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
|
||||
},
|
||||
reflexes: [],
|
||||
workflows: null,
|
||||
};
|
||||
kernel.reloadConfig(newConfig);
|
||||
|
||||
expect(kernel.getHealth().activeSenses).toBe(2);
|
||||
expect(kernel.getHealth().activeGroups).toBe(2);
|
||||
}, 10_000);
|
||||
});
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Auto-respawn on crash (existing test extended)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
describe("phase6 — auto-respawn on worker crash", () => {
|
||||
let kernel: Kernel | null = null;
|
||||
|
||||
afterEach(async () => {
|
||||
if (kernel !== null) {
|
||||
await kernel.stop();
|
||||
kernel = null;
|
||||
}
|
||||
});
|
||||
|
||||
it("kernel auto-respawns worker and new worker is functional", async () => {
|
||||
const config = makeConfig();
|
||||
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
|
||||
workerScript: MOCK_WORKER,
|
||||
});
|
||||
await kernel.ready;
|
||||
|
||||
const originalPid = kernel.getWorkerPid("system");
|
||||
expect(originalPid).not.toBeNull();
|
||||
|
||||
// Kill worker to simulate crash
|
||||
process.kill(originalPid as number, "SIGKILL");
|
||||
|
||||
// Wait for respawn
|
||||
await pollUntil(() => {
|
||||
const pid = kernel?.getWorkerPid("system");
|
||||
return pid !== null && pid !== originalPid;
|
||||
}, 5000);
|
||||
|
||||
const newPid = kernel.getWorkerPid("system");
|
||||
expect(newPid).not.toBeNull();
|
||||
expect(newPid).not.toBe(originalPid);
|
||||
|
||||
// Verify new worker responds
|
||||
const received: Signal[] = [];
|
||||
const unsub = kernel.bus.subscribe((s) => received.push(s));
|
||||
kernel.triggerCompute("cpu-usage");
|
||||
await pollUntil(() => received.length > 0, 5000);
|
||||
expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
|
||||
unsub();
|
||||
|
||||
await kernel.stop();
|
||||
kernel = null;
|
||||
}, 15_000);
|
||||
});
|
||||
@@ -0,0 +1,113 @@
|
||||
/**
|
||||
* File Watcher — watches nerveRoot for file changes and signals the kernel.
|
||||
*
|
||||
* Uses Node.js fs.watch (no external dependencies).
|
||||
*
|
||||
* Watched events:
|
||||
* - .ts file under senses/ modified → callback with { kind: "sense", senseName, filePath }
|
||||
* - nerve.yaml modified → callback with { kind: "config", filePath }
|
||||
*
|
||||
* Debounces rapid changes (e.g. editor save flicker) with a configurable delay.
|
||||
*
|
||||
* Linux compatibility note: fs.watch with { recursive: true } relies on inotify, which may
|
||||
* not reliably report the filename for events in deeply nested subdirectories. On Linux,
|
||||
* the `filename` argument passed to the callback can be null for some inotify events, and
|
||||
* recursive watching is only available in Node.js ≥22 on Linux. If null-filename events
|
||||
* become a problem, consider using a dedicated watcher library (e.g. chokidar).
|
||||
*/
|
||||
|
||||
import { watch } from "node:fs";
|
||||
import type { FSWatcher } from "node:fs";
|
||||
import { join, relative, sep } from "node:path";
|
||||
|
||||
export type SenseFileChange = {
|
||||
kind: "sense";
|
||||
senseName: string;
|
||||
filePath: string;
|
||||
};
|
||||
|
||||
export type ConfigFileChange = {
|
||||
kind: "config";
|
||||
filePath: string;
|
||||
};
|
||||
|
||||
export type FileChange = SenseFileChange | ConfigFileChange;
|
||||
|
||||
export type FileChangeHandler = (change: FileChange) => void;
|
||||
|
||||
export type FileWatcher = {
|
||||
close: () => void;
|
||||
};
|
||||
|
||||
const DEFAULT_DEBOUNCE_MS = 300;
|
||||
|
||||
export function createFileWatcher(
|
||||
nerveRoot: string,
|
||||
handler: FileChangeHandler,
|
||||
debounceMs: number = DEFAULT_DEBOUNCE_MS,
|
||||
): FileWatcher {
|
||||
const watchers: FSWatcher[] = [];
|
||||
const debounceTimers = new Map<string, ReturnType<typeof setTimeout>>();
|
||||
|
||||
function debounced(key: string, fn: () => void): void {
|
||||
const existing = debounceTimers.get(key);
|
||||
if (existing !== undefined) clearTimeout(existing);
|
||||
debounceTimers.set(
|
||||
key,
|
||||
setTimeout(() => {
|
||||
debounceTimers.delete(key);
|
||||
fn();
|
||||
}, debounceMs),
|
||||
);
|
||||
}
|
||||
|
||||
function handleFsEvent(_eventType: string, filename: string | null): void {
|
||||
if (filename === null) return;
|
||||
|
||||
const normalized = filename.split(sep).join("/");
|
||||
|
||||
if (normalized === "nerve.yaml") {
|
||||
debounced("config", () => {
|
||||
handler({ kind: "config", filePath: join(nerveRoot, "nerve.yaml") });
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (normalized.startsWith("senses/") && normalized.endsWith(".ts")) {
|
||||
const rel = relative("senses", normalized);
|
||||
const senseName = rel.split("/")[0];
|
||||
if (senseName) {
|
||||
debounced(`sense:${senseName}`, () => {
|
||||
handler({
|
||||
kind: "sense",
|
||||
senseName,
|
||||
filePath: join(nerveRoot, filename),
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
const w = watch(nerveRoot, { recursive: true }, (eventType, filename) => {
|
||||
handleFsEvent(eventType, filename);
|
||||
});
|
||||
watchers.push(w);
|
||||
} catch (e) {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`[file-watcher] Failed to watch "${nerveRoot}": ${msg}\n`);
|
||||
}
|
||||
|
||||
function close(): void {
|
||||
for (const timer of debounceTimers.values()) {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
debounceTimers.clear();
|
||||
for (const w of watchers) {
|
||||
w.close();
|
||||
}
|
||||
watchers.length = 0;
|
||||
}
|
||||
|
||||
return { close };
|
||||
}
|
||||
@@ -1,6 +1,8 @@
|
||||
export type {
|
||||
ComputeMessage,
|
||||
ShutdownMessage,
|
||||
HealthRequestMessage,
|
||||
HealthResponseMessage,
|
||||
ParentToWorkerMessage,
|
||||
SignalMessage,
|
||||
ErrorMessage,
|
||||
@@ -21,4 +23,10 @@ export {
|
||||
} from "./sense-runtime.js";
|
||||
|
||||
export { createKernel } from "./kernel.js";
|
||||
export type { Kernel, KernelOptions } from "./kernel.js";
|
||||
export type { Kernel, KernelOptions, KernelHealth } from "./kernel.js";
|
||||
|
||||
export { createFileWatcher } from "./file-watcher.js";
|
||||
export type { FileWatcher, FileChange, FileChangeHandler } from "./file-watcher.js";
|
||||
|
||||
export { createLogStore } from "./log-store.js";
|
||||
export type { LogStore, LogEntry, LogQuery } from "./log-store.js";
|
||||
|
||||
+62
-26
@@ -17,8 +17,13 @@ export type ShutdownMessage = {
|
||||
type: "shutdown";
|
||||
};
|
||||
|
||||
/** Parent → Worker: request health info from worker */
|
||||
export type HealthRequestMessage = {
|
||||
type: "health-request";
|
||||
};
|
||||
|
||||
/** Union of all messages the parent sends to a worker */
|
||||
export type ParentToWorkerMessage = ComputeMessage | ShutdownMessage;
|
||||
export type ParentToWorkerMessage = ComputeMessage | ShutdownMessage | HealthRequestMessage;
|
||||
|
||||
/** Worker → Parent: compute produced a signal */
|
||||
export type SignalMessage = {
|
||||
@@ -39,8 +44,21 @@ export type ReadyMessage = {
|
||||
type: "ready";
|
||||
};
|
||||
|
||||
/** Worker → Parent: health info response */
|
||||
export type HealthResponseMessage = {
|
||||
type: "health-response";
|
||||
senses: string[];
|
||||
inFlightCount: number;
|
||||
};
|
||||
|
||||
/** Union of all messages a worker sends to the parent */
|
||||
export type WorkerToParentMessage = SignalMessage | ErrorMessage | ReadyMessage;
|
||||
export type WorkerToParentMessage =
|
||||
| SignalMessage
|
||||
| ErrorMessage
|
||||
| ReadyMessage
|
||||
| HealthResponseMessage;
|
||||
|
||||
const PARENT_MSG_TYPES = new Set(["compute", "shutdown", "health-request"]);
|
||||
|
||||
/** Validate and parse an unknown IPC message received from the parent process. */
|
||||
export function parseParentMessage(raw: unknown): Result<ParentToWorkerMessage> {
|
||||
@@ -51,13 +69,47 @@ export function parseParentMessage(raw: unknown): Result<ParentToWorkerMessage>
|
||||
if (typeof obj.type !== "string") {
|
||||
return err(new Error("IPC message missing string 'type' field"));
|
||||
}
|
||||
const type = obj.type;
|
||||
if (type !== "compute" && type !== "shutdown") {
|
||||
return err(new Error(`Unknown IPC message type: "${type}"`));
|
||||
if (!PARENT_MSG_TYPES.has(obj.type)) {
|
||||
return err(new Error(`Unknown IPC message type: "${obj.type}"`));
|
||||
}
|
||||
return ok(raw as ParentToWorkerMessage);
|
||||
}
|
||||
|
||||
function parseSignalMsg(obj: Record<string, unknown>, raw: unknown): Result<WorkerToParentMessage> {
|
||||
if (typeof obj.sense !== "string") {
|
||||
return err(new Error("Worker 'signal' message missing string 'sense' field"));
|
||||
}
|
||||
if (!("payload" in obj)) {
|
||||
return err(new Error("Worker 'signal' message missing 'payload' field"));
|
||||
}
|
||||
return ok(raw as SignalMessage);
|
||||
}
|
||||
|
||||
function parseErrorMsg(obj: Record<string, unknown>, raw: unknown): Result<WorkerToParentMessage> {
|
||||
if (typeof obj.sense !== "string") {
|
||||
return err(new Error("Worker 'error' message missing string 'sense' field"));
|
||||
}
|
||||
if (typeof obj.error !== "string") {
|
||||
return err(new Error("Worker 'error' message missing string 'error' field"));
|
||||
}
|
||||
return ok(raw as ErrorMessage);
|
||||
}
|
||||
|
||||
function parseHealthResponseMsg(
|
||||
obj: Record<string, unknown>,
|
||||
raw: unknown,
|
||||
): Result<WorkerToParentMessage> {
|
||||
if (!Array.isArray(obj.senses)) {
|
||||
return err(new Error("Worker 'health-response' message missing 'senses' array"));
|
||||
}
|
||||
if (typeof obj.inFlightCount !== "number") {
|
||||
return err(new Error("Worker 'health-response' message missing 'inFlightCount' number"));
|
||||
}
|
||||
return ok(raw as HealthResponseMessage);
|
||||
}
|
||||
|
||||
const WORKER_MSG_TYPES = new Set(["signal", "error", "ready", "health-response"]);
|
||||
|
||||
/** Validate and parse an unknown IPC message received from a worker process. */
|
||||
export function parseWorkerMessage(raw: unknown): Result<WorkerToParentMessage> {
|
||||
if (raw === null || typeof raw !== "object") {
|
||||
@@ -67,27 +119,11 @@ export function parseWorkerMessage(raw: unknown): Result<WorkerToParentMessage>
|
||||
if (typeof obj.type !== "string") {
|
||||
return err(new Error("Worker IPC message missing string 'type' field"));
|
||||
}
|
||||
const type = obj.type;
|
||||
if (type !== "signal" && type !== "error" && type !== "ready") {
|
||||
return err(new Error(`Unknown worker IPC message type: "${type}"`));
|
||||
}
|
||||
if (type === "signal") {
|
||||
if (typeof obj.sense !== "string") {
|
||||
return err(new Error("Worker 'signal' message missing string 'sense' field"));
|
||||
}
|
||||
if (!("payload" in obj)) {
|
||||
return err(new Error("Worker 'signal' message missing 'payload' field"));
|
||||
}
|
||||
return ok(raw as SignalMessage);
|
||||
}
|
||||
if (type === "error") {
|
||||
if (typeof obj.sense !== "string") {
|
||||
return err(new Error("Worker 'error' message missing string 'sense' field"));
|
||||
}
|
||||
if (typeof obj.error !== "string") {
|
||||
return err(new Error("Worker 'error' message missing string 'error' field"));
|
||||
}
|
||||
return ok(raw as ErrorMessage);
|
||||
if (!WORKER_MSG_TYPES.has(obj.type)) {
|
||||
return err(new Error(`Unknown worker IPC message type: "${obj.type}"`));
|
||||
}
|
||||
if (obj.type === "signal") return parseSignalMsg(obj, raw);
|
||||
if (obj.type === "error") return parseErrorMsg(obj, raw);
|
||||
if (obj.type === "health-response") return parseHealthResponseMsg(obj, raw);
|
||||
return ok({ type: "ready" });
|
||||
}
|
||||
|
||||
+247
-15
@@ -8,33 +8,58 @@
|
||||
* - Route ErrorMessage from workers → stderr log
|
||||
* - Drive compute triggers via ReflexScheduler
|
||||
* - Graceful shutdown: stop scheduler, send shutdown to all workers
|
||||
* - Hot reload: restartGroup, reloadConfig, file watcher integration
|
||||
* - Health reporting: getHealth
|
||||
*/
|
||||
|
||||
import { fork } from "node:child_process";
|
||||
import type { ChildProcess } from "node:child_process";
|
||||
import { readFileSync } from "node:fs";
|
||||
import { dirname, join } from "node:path";
|
||||
import { fileURLToPath } from "node:url";
|
||||
|
||||
import type { NerveConfig, Signal } from "@uncaged/nerve-core";
|
||||
import { parseNerveConfig } from "@uncaged/nerve-core";
|
||||
|
||||
import { createFileWatcher } from "./file-watcher.js";
|
||||
import type { FileWatcher } from "./file-watcher.js";
|
||||
import type { ComputeMessage, ShutdownMessage } from "./ipc.js";
|
||||
import { parseWorkerMessage } from "./ipc.js";
|
||||
import { createLogStore } from "./log-store.js";
|
||||
import type { LogStore } from "./log-store.js";
|
||||
import { createReflexScheduler } from "./reflex-scheduler.js";
|
||||
import type { ReflexScheduler } from "./reflex-scheduler.js";
|
||||
import { createSignalBus } from "./signal-bus.js";
|
||||
import type { SignalBus } from "./signal-bus.js";
|
||||
|
||||
export type KernelHealth = {
|
||||
uptime: number;
|
||||
activeSenses: number;
|
||||
activeGroups: number;
|
||||
pendingComputes: number;
|
||||
memoryUsage: NodeJS.MemoryUsage;
|
||||
};
|
||||
|
||||
export type Kernel = {
|
||||
stop: () => Promise<void>;
|
||||
groups: Set<string>;
|
||||
senseCount: number;
|
||||
bus: SignalBus;
|
||||
logStore: LogStore;
|
||||
/** Resolves when all workers have sent their initial "ready" message. */
|
||||
ready: Promise<void>;
|
||||
/** Returns the PID of the worker process for a given group, or null if not found. */
|
||||
getWorkerPid: (group: string) => number | null;
|
||||
/** Sends a compute message to the worker responsible for the given sense. */
|
||||
triggerCompute: (senseName: string) => void;
|
||||
/** Gracefully restart a group worker (wait for exit, then respawn). */
|
||||
restartGroup: (group: string) => Promise<void>;
|
||||
/** Reload config from a new NerveConfig, incrementally updating scheduler and workers.
|
||||
* Note: any pending/throttled computes in the old scheduler are silently dropped on reload.
|
||||
* In-flight state is not preserved across reloadConfig. */
|
||||
reloadConfig: (newConfig: NerveConfig) => void;
|
||||
/** Return daemon health info. */
|
||||
getHealth: () => KernelHealth;
|
||||
};
|
||||
|
||||
type WorkerEntry = {
|
||||
@@ -66,7 +91,6 @@ function sendCompute(worker: ChildProcess, senseName: string): void {
|
||||
}
|
||||
|
||||
function sendShutdown(worker: ChildProcess): void {
|
||||
// worker.connected is false when the IPC channel has been closed (e.g. worker crashed)
|
||||
if (worker.connected === false) return;
|
||||
const msg: ShutdownMessage = { type: "shutdown" };
|
||||
try {
|
||||
@@ -84,21 +108,35 @@ function groupForSense(config: NerveConfig, senseName: string): string | null {
|
||||
|
||||
export type KernelOptions = {
|
||||
workerScript: string;
|
||||
enableFileWatcher?: boolean;
|
||||
/** Override the LogStore instance (useful for testing). */
|
||||
logStore?: LogStore;
|
||||
};
|
||||
|
||||
function defaultKernelOptions(): KernelOptions {
|
||||
return { workerScript: resolveWorkerScript() };
|
||||
return { workerScript: resolveWorkerScript(), enableFileWatcher: false };
|
||||
}
|
||||
|
||||
export function createKernel(
|
||||
config: NerveConfig,
|
||||
initialConfig: NerveConfig,
|
||||
nerveRoot: string,
|
||||
options: KernelOptions = defaultKernelOptions(),
|
||||
): Kernel {
|
||||
const bus: SignalBus = createSignalBus();
|
||||
const workerScript = options.workerScript;
|
||||
const startTime = Date.now();
|
||||
const logStore: LogStore = options.logStore ?? createLogStore(join(nerveRoot, "data", "logs.db"));
|
||||
|
||||
logStore.append({
|
||||
source: "system",
|
||||
type: "start",
|
||||
refId: null,
|
||||
payload: null,
|
||||
ts: startTime,
|
||||
});
|
||||
|
||||
let config = initialConfig;
|
||||
|
||||
// Signal ID counter is instance-scoped (fix #2)
|
||||
let _signalIdCounter = 0;
|
||||
function nextSignalId(): number {
|
||||
_signalIdCounter += 1;
|
||||
@@ -112,10 +150,9 @@ export function createKernel(
|
||||
|
||||
const workers = new Map<string, WorkerEntry>();
|
||||
let stopped = false;
|
||||
// eslint-disable-next-line prefer-const
|
||||
let scheduler: ReflexScheduler = null as unknown as ReflexScheduler;
|
||||
|
||||
let readyResolve: () => void;
|
||||
let readyResolve: (() => void) | undefined;
|
||||
const ready = new Promise<void>((resolve) => {
|
||||
readyResolve = resolve;
|
||||
});
|
||||
@@ -138,13 +175,20 @@ export function createKernel(
|
||||
if (msg.type === "ready") {
|
||||
pendingReadyCount -= 1;
|
||||
if (pendingReadyCount <= 0) {
|
||||
readyResolve();
|
||||
readyResolve?.();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.type === "error") {
|
||||
process.stderr.write(`[kernel] sense "${msg.sense}" error: ${msg.error}\n`);
|
||||
logStore.append({
|
||||
source: "system",
|
||||
type: "error",
|
||||
refId: msg.sense,
|
||||
payload: JSON.stringify({ error: msg.error }),
|
||||
ts: Date.now(),
|
||||
});
|
||||
scheduler.onComputeComplete(msg.sense);
|
||||
return;
|
||||
}
|
||||
@@ -156,24 +200,44 @@ export function createKernel(
|
||||
payload: msg.payload,
|
||||
ts: Date.now(),
|
||||
};
|
||||
logStore.append({
|
||||
source: "reflex",
|
||||
type: "run_complete",
|
||||
refId: msg.sense,
|
||||
payload: JSON.stringify(msg.payload),
|
||||
ts: signal.ts,
|
||||
});
|
||||
bus.emit(signal);
|
||||
scheduler.onComputeComplete(msg.sense);
|
||||
}
|
||||
|
||||
// health-response is handled externally by the caller; no action needed here
|
||||
}
|
||||
|
||||
function startWorker(group: string): void {
|
||||
function startWorker(group: string): Promise<void> {
|
||||
const child = spawnWorker(nerveRoot, group, workerScript);
|
||||
|
||||
child.on("message", handleWorkerMessage);
|
||||
let workerReadyResolve: (() => void) | undefined;
|
||||
const workerReady = new Promise<void>((resolve) => {
|
||||
workerReadyResolve = resolve;
|
||||
});
|
||||
|
||||
child.on("message", (raw: unknown) => {
|
||||
const result = parseWorkerMessage(raw);
|
||||
if (result.ok && result.value.type === "ready") {
|
||||
workerReadyResolve?.();
|
||||
}
|
||||
handleWorkerMessage(raw);
|
||||
});
|
||||
|
||||
child.on("exit", (code) => {
|
||||
process.stderr.write(
|
||||
`[kernel] worker for group "${group}" exited with code ${code ?? "null"}\n`,
|
||||
);
|
||||
// Respawn on unexpected exit (fix #4)
|
||||
// Resolve ready in case the worker exits before sending ready (prevents hangs)
|
||||
workerReadyResolve?.();
|
||||
if (!stopped && code !== 0) {
|
||||
process.stderr.write(`[kernel] respawning worker for group "${group}" in 1s\n`);
|
||||
// Clear any stuck in-flight state for all senses in this group
|
||||
for (const senseName of sensesForGroup(group)) {
|
||||
scheduler.onComputeComplete(senseName);
|
||||
}
|
||||
@@ -186,6 +250,7 @@ export function createKernel(
|
||||
});
|
||||
|
||||
workers.set(group, { group, process: child });
|
||||
return workerReady;
|
||||
}
|
||||
|
||||
function triggerFn(senseName: string): void {
|
||||
@@ -202,17 +267,16 @@ export function createKernel(
|
||||
sendCompute(entry.process, senseName);
|
||||
}
|
||||
|
||||
scheduler = createReflexScheduler(config, bus, triggerFn);
|
||||
scheduler = createReflexScheduler(config, bus, triggerFn, { logStore });
|
||||
|
||||
if (groups.size === 0) {
|
||||
readyResolve();
|
||||
readyResolve?.();
|
||||
}
|
||||
|
||||
for (const group of groups) {
|
||||
startWorker(group);
|
||||
}
|
||||
|
||||
// Wait for a worker process to exit, with a timeout + SIGKILL fallback (fix #5)
|
||||
function waitForExit(child: ChildProcess, timeoutMs: number): Promise<void> {
|
||||
return new Promise((resolve) => {
|
||||
const timer = setTimeout(() => {
|
||||
@@ -226,8 +290,156 @@ export function createKernel(
|
||||
});
|
||||
}
|
||||
|
||||
// --- restartGroup: gracefully stop worker, then respawn and await ready ---
|
||||
async function restartGroup(group: string): Promise<void> {
|
||||
const entry = workers.get(group);
|
||||
if (entry === undefined) return;
|
||||
|
||||
for (const senseName of sensesForGroup(group)) {
|
||||
scheduler.onComputeComplete(senseName);
|
||||
}
|
||||
|
||||
sendShutdown(entry.process);
|
||||
await waitForExit(entry.process, 5000);
|
||||
|
||||
if (!stopped) {
|
||||
await startWorker(group);
|
||||
}
|
||||
}
|
||||
|
||||
function collectGroups(cfg: NerveConfig): Set<string> {
|
||||
const result = new Set<string>();
|
||||
for (const sc of Object.values(cfg.senses)) {
|
||||
result.add(sc.group);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
function sensesForGroupInConfig(cfg: NerveConfig, group: string): Set<string> {
|
||||
const result = new Set<string>();
|
||||
for (const [name, sc] of Object.entries(cfg.senses)) {
|
||||
if (sc.group === group) result.add(name);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
function removeStaleGroups(oldGroups: Set<string>, newGroups: Set<string>): void {
|
||||
for (const g of oldGroups) {
|
||||
if (newGroups.has(g)) continue;
|
||||
const entry = workers.get(g);
|
||||
if (entry !== undefined) {
|
||||
sendShutdown(entry.process);
|
||||
workers.delete(g);
|
||||
}
|
||||
groups.delete(g);
|
||||
}
|
||||
}
|
||||
|
||||
function addNewGroups(oldGroups: Set<string>, newGroups: Set<string>): void {
|
||||
for (const g of newGroups) {
|
||||
if (oldGroups.has(g)) continue;
|
||||
groups.add(g);
|
||||
if (!stopped) startWorker(g);
|
||||
}
|
||||
}
|
||||
|
||||
function reloadConfig(newConfig: NerveConfig): void {
|
||||
const oldGroups = collectGroups(config);
|
||||
const oldConfig = config;
|
||||
config = newConfig;
|
||||
// Note: pending/throttled computes in the old scheduler are silently dropped here.
|
||||
// In-flight state is not preserved across reloadConfig.
|
||||
scheduler.stop();
|
||||
scheduler = createReflexScheduler(config, bus, triggerFn, { logStore });
|
||||
const newGroups = collectGroups(newConfig);
|
||||
removeStaleGroups(oldGroups, newGroups);
|
||||
addNewGroups(oldGroups, newGroups);
|
||||
|
||||
// Restart existing groups that gained new senses — the running worker process
|
||||
// was spawned with the old config and will report "Unknown sense" for any newly
|
||||
// added sense until it is restarted.
|
||||
for (const g of newGroups) {
|
||||
if (!oldGroups.has(g)) continue; // already handled by addNewGroups
|
||||
const oldSenses = sensesForGroupInConfig(oldConfig, g);
|
||||
const newSenses = sensesForGroupInConfig(newConfig, g);
|
||||
const gained = [...newSenses].some((s) => !oldSenses.has(s));
|
||||
if (gained) {
|
||||
restartGroup(g).catch((e) => {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`[kernel] reloadConfig restartGroup error for "${g}": ${msg}\n`);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function getHealth(): KernelHealth {
|
||||
return {
|
||||
uptime: Date.now() - startTime,
|
||||
activeSenses: Object.keys(config.senses).length,
|
||||
activeGroups: workers.size,
|
||||
pendingComputes: 0,
|
||||
memoryUsage: process.memoryUsage(),
|
||||
};
|
||||
}
|
||||
|
||||
function handleSenseFileChange(senseName: string): void {
|
||||
const sc = config.senses[senseName];
|
||||
if (sc === undefined) return;
|
||||
process.stderr.write(
|
||||
`[kernel] sense file changed: "${senseName}", restarting group "${sc.group}"\n`,
|
||||
);
|
||||
logStore.append({
|
||||
source: "system",
|
||||
type: "sense_reload",
|
||||
refId: senseName,
|
||||
payload: null,
|
||||
ts: Date.now(),
|
||||
});
|
||||
restartGroup(sc.group).catch((e) => {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`[kernel] restartGroup error: ${msg}\n`);
|
||||
});
|
||||
}
|
||||
|
||||
function handleConfigFileChange(): void {
|
||||
process.stderr.write("[kernel] nerve.yaml changed, reloading config\n");
|
||||
logStore.append({
|
||||
source: "system",
|
||||
type: "config_reload",
|
||||
refId: null,
|
||||
payload: null,
|
||||
ts: Date.now(),
|
||||
});
|
||||
try {
|
||||
const raw = readFileSync(join(nerveRoot, "nerve.yaml"), "utf8");
|
||||
const parseResult = parseNerveConfig(raw);
|
||||
if (!parseResult.ok) {
|
||||
process.stderr.write(
|
||||
`[kernel] config parse error, keeping current config: ${parseResult.error.message}\n`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
reloadConfig(parseResult.value);
|
||||
} catch (e) {
|
||||
const msg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(`[kernel] failed to read nerve.yaml, keeping current config: ${msg}\n`);
|
||||
}
|
||||
}
|
||||
|
||||
let fileWatcher: FileWatcher | null = null;
|
||||
if (options.enableFileWatcher) {
|
||||
fileWatcher = createFileWatcher(nerveRoot, (change) => {
|
||||
if (change.kind === "sense") handleSenseFileChange(change.senseName);
|
||||
if (change.kind === "config") handleConfigFileChange();
|
||||
});
|
||||
}
|
||||
|
||||
async function stop(): Promise<void> {
|
||||
stopped = true;
|
||||
if (fileWatcher !== null) {
|
||||
fileWatcher.close();
|
||||
fileWatcher = null;
|
||||
}
|
||||
scheduler.stop();
|
||||
const exitPromises: Promise<void>[] = [];
|
||||
for (const entry of workers.values()) {
|
||||
@@ -235,6 +447,14 @@ export function createKernel(
|
||||
exitPromises.push(waitForExit(entry.process, 5000));
|
||||
}
|
||||
await Promise.all(exitPromises);
|
||||
logStore.append({
|
||||
source: "system",
|
||||
type: "stop",
|
||||
refId: null,
|
||||
payload: null,
|
||||
ts: Date.now(),
|
||||
});
|
||||
logStore.close();
|
||||
}
|
||||
|
||||
function getWorkerPid(group: string): number | null {
|
||||
@@ -243,5 +463,17 @@ export function createKernel(
|
||||
|
||||
const senseCount = Object.keys(config.senses).length;
|
||||
|
||||
return { stop, groups, senseCount, bus, ready, getWorkerPid, triggerCompute: triggerFn };
|
||||
return {
|
||||
stop,
|
||||
groups,
|
||||
senseCount,
|
||||
bus,
|
||||
logStore,
|
||||
ready,
|
||||
getWorkerPid,
|
||||
triggerCompute: triggerFn,
|
||||
restartGroup,
|
||||
reloadConfig,
|
||||
getHealth,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -0,0 +1,150 @@
|
||||
/**
|
||||
* Log Store — append-only structured log storage backed by SQLite.
|
||||
*
|
||||
* Stores system, reflex, and workflow log entries in a single table.
|
||||
* Logs are data assets for audit/analysis — they MUST NOT trigger reflexes.
|
||||
*
|
||||
* Also provides a `meta` key-value table for bookkeeping (e.g. archive watermarks).
|
||||
*/
|
||||
|
||||
import { mkdirSync } from "node:fs";
|
||||
import { dirname } from "node:path";
|
||||
import Database from "better-sqlite3";
|
||||
import type BetterSqlite3 from "better-sqlite3";
|
||||
|
||||
export type LogEntry = {
|
||||
id?: number;
|
||||
source: string;
|
||||
type: string;
|
||||
refId: string | null;
|
||||
payload: string | null;
|
||||
ts: number;
|
||||
};
|
||||
|
||||
export type LogQuery = {
|
||||
source?: string;
|
||||
type?: string;
|
||||
refId?: string;
|
||||
since?: number;
|
||||
until?: number;
|
||||
limit?: number;
|
||||
};
|
||||
|
||||
export type LogStore = {
|
||||
append: (entry: Omit<LogEntry, "id">) => LogEntry;
|
||||
query: (filter?: LogQuery) => LogEntry[];
|
||||
getMeta: (key: string) => string | null;
|
||||
setMeta: (key: string, value: string) => void;
|
||||
close: () => void;
|
||||
};
|
||||
|
||||
const SCHEMA_SQL = `
|
||||
CREATE TABLE IF NOT EXISTS logs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
source TEXT NOT NULL,
|
||||
type TEXT NOT NULL,
|
||||
ref_id TEXT,
|
||||
payload TEXT,
|
||||
ts INTEGER NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_logs_source_type ON logs(source, type);
|
||||
CREATE INDEX IF NOT EXISTS idx_logs_ts ON logs(ts);
|
||||
CREATE INDEX IF NOT EXISTS idx_logs_ref_id ON logs(ref_id);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS meta (
|
||||
key TEXT PRIMARY KEY,
|
||||
value TEXT NOT NULL
|
||||
);
|
||||
`;
|
||||
|
||||
export function createLogStore(dbPath: string): LogStore {
|
||||
mkdirSync(dirname(dbPath), { recursive: true });
|
||||
|
||||
const sqlite: BetterSqlite3.Database = new Database(dbPath);
|
||||
sqlite.pragma("journal_mode = WAL");
|
||||
sqlite.exec(SCHEMA_SQL);
|
||||
|
||||
const insertStmt = sqlite.prepare(
|
||||
"INSERT INTO logs (source, type, ref_id, payload, ts) VALUES (@source, @type, @refId, @payload, @ts)",
|
||||
);
|
||||
|
||||
const getMetaStmt = sqlite.prepare("SELECT value FROM meta WHERE key = ?");
|
||||
const setMetaStmt = sqlite.prepare(
|
||||
"INSERT INTO meta (key, value) VALUES (@key, @value) ON CONFLICT(key) DO UPDATE SET value = @value",
|
||||
);
|
||||
|
||||
function append(entry: Omit<LogEntry, "id">): LogEntry {
|
||||
const info = insertStmt.run({
|
||||
source: entry.source,
|
||||
type: entry.type,
|
||||
refId: entry.refId,
|
||||
payload: entry.payload,
|
||||
ts: entry.ts,
|
||||
});
|
||||
return { ...entry, id: Number(info.lastInsertRowid) };
|
||||
}
|
||||
|
||||
function query(filter: LogQuery = {}): LogEntry[] {
|
||||
const conditions: string[] = [];
|
||||
const params: Record<string, unknown> = {};
|
||||
|
||||
if (filter.source !== undefined) {
|
||||
conditions.push("source = @source");
|
||||
params.source = filter.source;
|
||||
}
|
||||
if (filter.type !== undefined) {
|
||||
conditions.push("type = @type");
|
||||
params.type = filter.type;
|
||||
}
|
||||
if (filter.refId !== undefined) {
|
||||
conditions.push("ref_id = @refId");
|
||||
params.refId = filter.refId;
|
||||
}
|
||||
if (filter.since !== undefined) {
|
||||
conditions.push("ts >= @since");
|
||||
params.since = filter.since;
|
||||
}
|
||||
if (filter.until !== undefined) {
|
||||
conditions.push("ts <= @until");
|
||||
params.until = filter.until;
|
||||
}
|
||||
|
||||
const where = conditions.length > 0 ? `WHERE ${conditions.join(" AND ")}` : "";
|
||||
const limit = filter.limit !== undefined ? `LIMIT ${filter.limit}` : "";
|
||||
const sql = `SELECT id, source, type, ref_id, payload, ts FROM logs ${where} ORDER BY id ASC ${limit}`;
|
||||
|
||||
const rows = sqlite.prepare(sql).all(params) as Array<{
|
||||
id: number;
|
||||
source: string;
|
||||
type: string;
|
||||
ref_id: string | null;
|
||||
payload: string | null;
|
||||
ts: number;
|
||||
}>;
|
||||
|
||||
return rows.map((r) => ({
|
||||
id: r.id,
|
||||
source: r.source,
|
||||
type: r.type,
|
||||
refId: r.ref_id,
|
||||
payload: r.payload,
|
||||
ts: r.ts,
|
||||
}));
|
||||
}
|
||||
|
||||
function getMeta(key: string): string | null {
|
||||
const row = getMetaStmt.get(key) as { value: string } | undefined;
|
||||
return row?.value ?? null;
|
||||
}
|
||||
|
||||
function setMeta(key: string, value: string): void {
|
||||
setMetaStmt.run({ key, value });
|
||||
}
|
||||
|
||||
function close(): void {
|
||||
sqlite.close();
|
||||
}
|
||||
|
||||
return { append, query, getMeta, setMeta, close };
|
||||
}
|
||||
@@ -10,6 +10,7 @@
|
||||
*/
|
||||
|
||||
import type { NerveConfig } from "@uncaged/nerve-core";
|
||||
import type { LogStore } from "./log-store.js";
|
||||
import type { SignalBus, Unsubscribe } from "./signal-bus.js";
|
||||
|
||||
/** Sends a compute message to the worker responsible for the given sense. */
|
||||
@@ -34,19 +35,26 @@ function makeSenseState(): SenseState {
|
||||
return { lastComputeAt: 0, inFlight: false, pending: false, deferredTimer: null };
|
||||
}
|
||||
|
||||
export type ReflexSchedulerOptions = {
|
||||
logStore?: LogStore;
|
||||
};
|
||||
|
||||
/**
|
||||
* Create and start a reflex scheduler.
|
||||
*
|
||||
* @param config Full NerveConfig (reads senses for throttle/timeout, reflexes for schedule).
|
||||
* @param bus SignalBus to subscribe for event-driven reflexes.
|
||||
* @param triggerFn Called with the sense name when a compute should be dispatched.
|
||||
* @param opts Optional: logStore for structured logging.
|
||||
* @returns ReflexScheduler with stop() and onComputeComplete() methods.
|
||||
*/
|
||||
export function createReflexScheduler(
|
||||
config: NerveConfig,
|
||||
bus: SignalBus,
|
||||
triggerFn: TriggerFn,
|
||||
opts?: ReflexSchedulerOptions,
|
||||
): ReflexScheduler {
|
||||
const logStore = opts?.logStore;
|
||||
const intervals: ReturnType<typeof setInterval>[] = [];
|
||||
const unsubscribers: Unsubscribe[] = [];
|
||||
const states = new Map<string, SenseState>();
|
||||
@@ -65,6 +73,13 @@ export function createReflexScheduler(
|
||||
state.inFlight = true;
|
||||
state.pending = false;
|
||||
state.lastComputeAt = Date.now();
|
||||
logStore?.append({
|
||||
source: "reflex",
|
||||
type: "run_start",
|
||||
refId: senseName,
|
||||
payload: null,
|
||||
ts: Date.now(),
|
||||
});
|
||||
triggerFn(senseName);
|
||||
}
|
||||
|
||||
|
||||
@@ -79,18 +79,12 @@ async function initSense(
|
||||
|
||||
const dbResult = openSenseDb(dbPath, migrationsDir);
|
||||
if (!dbResult.ok) {
|
||||
process.stderr.write(
|
||||
`[sense-worker] Failed to init DB for "${senseName}": ${dbResult.error.message}\n`,
|
||||
);
|
||||
process.exit(1);
|
||||
throw new Error(`Failed to init DB for "${senseName}": ${dbResult.error.message}`);
|
||||
}
|
||||
|
||||
const computeResult = await loadComputeFn(senseIndexPath);
|
||||
if (!computeResult.ok) {
|
||||
process.stderr.write(
|
||||
`[sense-worker] Failed to load compute for "${senseName}": ${computeResult.error.message}\n`,
|
||||
);
|
||||
process.exit(1);
|
||||
throw new Error(`Failed to load compute for "${senseName}": ${computeResult.error.message}`);
|
||||
}
|
||||
|
||||
const { db } = dbResult.value;
|
||||
@@ -129,12 +123,75 @@ function buildPeers(
|
||||
return Object.fromEntries(entries);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Grace period: hard kill after soft timeout
|
||||
//
|
||||
// Trade-off: grace period kills the entire worker process (process.exit), which
|
||||
// terminates all senses in the group — not just the one that timed out. This is
|
||||
// intentional: a hung compute may hold locks or corrupt shared state. Restarting
|
||||
// the full worker ensures a clean slate, but other senses in the same group will
|
||||
// lose any in-flight work until the kernel respawns the process.
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const gracePeriodTimers = new Map<string, ReturnType<typeof setTimeout>>();
|
||||
|
||||
function scheduleGracePeriodKill(senseName: string, gracePeriodMs: number): void {
|
||||
if (gracePeriodTimers.has(senseName)) return;
|
||||
process.stderr.write(`[sense-worker] grace period for "${senseName}" (${gracePeriodMs}ms)\n`);
|
||||
const timer = setTimeout(() => {
|
||||
process.stderr.write(`[sense-worker] grace period expired for "${senseName}", hard kill\n`);
|
||||
process.exit(1);
|
||||
}, gracePeriodMs);
|
||||
gracePeriodTimers.set(senseName, timer);
|
||||
}
|
||||
|
||||
function clearGracePeriodTimer(senseName: string): void {
|
||||
const timer = gracePeriodTimers.get(senseName);
|
||||
if (timer === undefined) return;
|
||||
clearTimeout(timer);
|
||||
gracePeriodTimers.delete(senseName);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Compute execution with error isolation
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async function runCompute(
|
||||
senseName: string,
|
||||
runtime: SenseRuntime,
|
||||
peers: PeerMap,
|
||||
timeoutMs: number,
|
||||
gracePeriodMs: number | null,
|
||||
): Promise<void> {
|
||||
try {
|
||||
const result = await executeCompute(runtime, peers, timeoutMs);
|
||||
if (!result.ok) {
|
||||
sendError(senseName, result.error.message);
|
||||
if (gracePeriodMs !== null && result.error.message.includes("timed out")) {
|
||||
scheduleGracePeriodKill(senseName, gracePeriodMs);
|
||||
}
|
||||
return;
|
||||
}
|
||||
clearGracePeriodTimer(senseName);
|
||||
if (result.value !== null) {
|
||||
sendSignal(senseName, result.value);
|
||||
}
|
||||
} catch (e: unknown) {
|
||||
const errMsg = e instanceof Error ? e.message : String(e);
|
||||
sendError(senseName, errMsg);
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// IPC message dispatch
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
function handleMessage(
|
||||
raw: unknown,
|
||||
runtimes: Map<string, SenseRuntime>,
|
||||
peers: PeerMap,
|
||||
group: string,
|
||||
timeoutMs: number,
|
||||
senseConfigs: Map<string, { timeout: number | null; gracePeriod: number | null }>,
|
||||
inFlight: Map<string, Promise<void>>,
|
||||
): void {
|
||||
const parseResult = parseParentMessage(raw);
|
||||
@@ -149,33 +206,36 @@ function handleMessage(
|
||||
process.exit(0);
|
||||
}
|
||||
|
||||
if (msg.type === "compute") {
|
||||
const runtime = runtimes.get(msg.sense);
|
||||
if (!runtime) {
|
||||
sendError(msg.sense, `Unknown sense "${msg.sense}" in group "${group}"`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Serialize computes for the same sense
|
||||
const previous = inFlight.get(msg.sense) ?? Promise.resolve();
|
||||
const next = previous.then(async () => {
|
||||
const result = await executeCompute(runtime, peers, timeoutMs);
|
||||
if (!result.ok) {
|
||||
sendError(msg.sense, result.error.message);
|
||||
return;
|
||||
}
|
||||
if (result.value !== null) {
|
||||
sendSignal(msg.sense, result.value);
|
||||
}
|
||||
if (msg.type === "health-request") {
|
||||
send({
|
||||
type: "health-response",
|
||||
senses: [...runtimes.keys()],
|
||||
inFlightCount: inFlight.size,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const tracked = next.catch((e: unknown) => {
|
||||
if (msg.type !== "compute") return;
|
||||
|
||||
const runtime = runtimes.get(msg.sense);
|
||||
if (!runtime) {
|
||||
sendError(msg.sense, `Unknown sense "${msg.sense}" in group "${group}"`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Look up timeout/gracePeriod per-sense at compute time (RFC §5.3: these are per-sense)
|
||||
const sc = senseConfigs.get(msg.sense);
|
||||
const timeoutMs = sc?.timeout ?? DEFAULT_TIMEOUT_MS;
|
||||
const gracePeriodMs = sc?.gracePeriod ?? null;
|
||||
|
||||
const previous = inFlight.get(msg.sense) ?? Promise.resolve();
|
||||
const next = previous
|
||||
.then(() => runCompute(msg.sense, runtime, peers, timeoutMs, gracePeriodMs))
|
||||
.catch((e: unknown) => {
|
||||
const errMsg = e instanceof Error ? e.message : String(e);
|
||||
sendError(msg.sense, errMsg);
|
||||
});
|
||||
|
||||
inFlight.set(msg.sense, tracked);
|
||||
}
|
||||
inFlight.set(msg.sense, next);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -198,29 +258,47 @@ async function bootstrap(nerveRoot: string, group: string): Promise<void> {
|
||||
|
||||
const runtimes = new Map<string, SenseRuntime>();
|
||||
const ownDbs = new Map<string, DrizzleDB>();
|
||||
const failedSenses: string[] = [];
|
||||
|
||||
for (const senseName of groupSenses) {
|
||||
const { db, runtime } = await initSense(nerveRoot, senseName);
|
||||
ownDbs.set(senseName, db);
|
||||
runtimes.set(senseName, runtime);
|
||||
try {
|
||||
const { db, runtime } = await initSense(nerveRoot, senseName);
|
||||
ownDbs.set(senseName, db);
|
||||
runtimes.set(senseName, runtime);
|
||||
} catch (e: unknown) {
|
||||
const eMsg = e instanceof Error ? e.message : String(e);
|
||||
process.stderr.write(
|
||||
`[sense-worker] Failed to load sense "${senseName}", skipping: ${eMsg}\n`,
|
||||
);
|
||||
failedSenses.push(senseName);
|
||||
}
|
||||
}
|
||||
|
||||
// If ALL senses failed, exit with error so kernel respawns
|
||||
if (runtimes.size === 0) {
|
||||
process.stderr.write(`[sense-worker] All senses in group "${group}" failed to load, exiting\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const groupSenseNames = new Set(groupSenses);
|
||||
const peers = buildPeers(nerveRoot, Object.keys(config.senses), ownDbs, groupSenseNames);
|
||||
|
||||
// Read timeout from config (uses first group sense's config, or default)
|
||||
const firstSenseConfig = config.senses[groupSenses[0]];
|
||||
const timeoutMs =
|
||||
typeof (firstSenseConfig as Record<string, unknown>).timeoutMs === "number"
|
||||
? ((firstSenseConfig as Record<string, unknown>).timeoutMs as number)
|
||||
: DEFAULT_TIMEOUT_MS;
|
||||
// Build per-sense timeout/gracePeriod map (RFC §5.3: these are per-sense, not per-group)
|
||||
const senseConfigs = new Map<string, { timeout: number | null; gracePeriod: number | null }>();
|
||||
for (const senseName of groupSenses) {
|
||||
const sc = config.senses[senseName];
|
||||
senseConfigs.set(senseName, {
|
||||
timeout: sc?.timeout ?? null,
|
||||
gracePeriod: sc?.gracePeriod ?? null,
|
||||
});
|
||||
}
|
||||
|
||||
const inFlight = new Map<string, Promise<void>>();
|
||||
|
||||
sendReady();
|
||||
|
||||
process.on("message", (raw: unknown) => {
|
||||
handleMessage(raw, runtimes, peers, group, timeoutMs, inFlight);
|
||||
handleMessage(raw, runtimes, peers, group, senseConfigs, inFlight);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -2,7 +2,8 @@
|
||||
"extends": "../../tsconfig.json",
|
||||
"compilerOptions": {
|
||||
"outDir": "dist",
|
||||
"rootDir": "src"
|
||||
"rootDir": "src",
|
||||
"composite": false
|
||||
},
|
||||
"include": ["src"]
|
||||
}
|
||||
|
||||
Generated
+30
-2
@@ -26,6 +26,13 @@ importers:
|
||||
'@uncaged/nerve-daemon':
|
||||
specifier: workspace:*
|
||||
version: link:../daemon
|
||||
citty:
|
||||
specifier: ^0.1.6
|
||||
version: 0.1.6
|
||||
devDependencies:
|
||||
'@types/node':
|
||||
specifier: ^22.0.0
|
||||
version: 22.19.17
|
||||
|
||||
packages/core:
|
||||
dependencies:
|
||||
@@ -559,6 +566,9 @@ packages:
|
||||
'@types/estree@1.0.8':
|
||||
resolution: {integrity: sha512-dWHzHa2WqEXI/O1E9OjrocMTKJl2mSrEolh1Iomrv6U+JuNwaHXsXx9bLu5gG7BUWFIN0skIQJQ/L1rIex4X6w==}
|
||||
|
||||
'@types/node@22.19.17':
|
||||
resolution: {integrity: sha512-wGdMcf+vPYM6jikpS/qhg6WiqSV/OhG+jeeHT/KlVqxYfD40iYJf9/AE1uQxVWFvU7MipKRkRv8NSHiCGgPr8Q==}
|
||||
|
||||
'@types/node@25.6.0':
|
||||
resolution: {integrity: sha512-+qIYRKdNYJwY3vRCZMdJbPLJAtGjQBudzZzdzwQYkEPQd+PJGixUL5QfvCLDaULoLv+RhT3LDkwEfKaAkgSmNQ==}
|
||||
|
||||
@@ -639,6 +649,9 @@ packages:
|
||||
chownr@1.1.4:
|
||||
resolution: {integrity: sha512-jJ0bqzaylmJtVnNgzTeSOs8DPavpbYgEr/b0YL8/2GO3xJEhInFmhKMUnEJQjZumK7KXGFhUy89PrsJWlakBVg==}
|
||||
|
||||
citty@0.1.6:
|
||||
resolution: {integrity: sha512-tskPPKEs8D2KPafUypv2gxwJP8h/OaJmC82QQGGDQcHvXX43xF2VDACcJVmZ0EuSxkpO9Kc4MlrA3q0+FG58AQ==}
|
||||
|
||||
commander@4.1.1:
|
||||
resolution: {integrity: sha512-NOKm8xhkzAjzFx8B2v5OAHT+u5pRQc2UCa2Vq9jYL/31o2wi9mxBA7LIFs3sV5VSC49z6pEhfbMULvShKj26WA==}
|
||||
engines: {node: '>= 6'}
|
||||
@@ -1142,6 +1155,9 @@ packages:
|
||||
ufo@1.6.3:
|
||||
resolution: {integrity: sha512-yDJTmhydvl5lJzBmy/hyOAA0d+aqCBuwl818haVdYCRrWV84o7YyeVm4QlVHStqNrrJSTb6jKuFAVqAFsr+K3Q==}
|
||||
|
||||
undici-types@6.21.0:
|
||||
resolution: {integrity: sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==}
|
||||
|
||||
undici-types@7.19.2:
|
||||
resolution: {integrity: sha512-qYVnV5OEm2AW8cJMCpdV20CDyaN3g0AjDlOGf1OW4iaDEx8MwdtChUp4zu4H0VP3nDRF/8RKWH+IPp9uW0YGZg==}
|
||||
|
||||
@@ -1534,7 +1550,7 @@ snapshots:
|
||||
|
||||
'@types/better-sqlite3@7.6.13':
|
||||
dependencies:
|
||||
'@types/node': 25.6.0
|
||||
'@types/node': 22.19.17
|
||||
|
||||
'@types/chai@5.2.3':
|
||||
dependencies:
|
||||
@@ -1545,9 +1561,14 @@ snapshots:
|
||||
|
||||
'@types/estree@1.0.8': {}
|
||||
|
||||
'@types/node@22.19.17':
|
||||
dependencies:
|
||||
undici-types: 6.21.0
|
||||
|
||||
'@types/node@25.6.0':
|
||||
dependencies:
|
||||
undici-types: 7.19.2
|
||||
optional: true
|
||||
|
||||
'@vitest/expect@4.1.5':
|
||||
dependencies:
|
||||
@@ -1633,6 +1654,10 @@ snapshots:
|
||||
|
||||
chownr@1.1.4: {}
|
||||
|
||||
citty@0.1.6:
|
||||
dependencies:
|
||||
consola: 3.4.2
|
||||
|
||||
commander@4.1.1: {}
|
||||
|
||||
confbox@0.1.8: {}
|
||||
@@ -2057,7 +2082,10 @@ snapshots:
|
||||
|
||||
ufo@1.6.3: {}
|
||||
|
||||
undici-types@7.19.2: {}
|
||||
undici-types@6.21.0: {}
|
||||
|
||||
undici-types@7.19.2:
|
||||
optional: true
|
||||
|
||||
util-deprecate@1.0.2: {}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user