Compare commits

..

16 Commits

Author SHA1 Message Date
xiaoju 8d92928951 fix(daemon): address PR #22 review — 6 issues fixed
Critical:
1. replayAndResume: remove double moderate() call, reuse loop result
2. drainAndRespawn: check workflow still in config before respawn
3. drain: mark in-flight runs as 'interrupted' in DB before clearing

Should fix:
4. crash recovery: dedup runId before re-queuing/re-activating
5. drain timeout: DEFAULT_DRAIN_TIMEOUT_MS > WORKER_SHUTDOWN_TIMEOUT_MS
6. crash-loop protection: max 5 crashes in 60s window, then stop respawn

5 new tests added. All 173 tests pass.

小橘 🍊(NEKO Team)
2026-04-22 13:25:35 +00:00
xiaoju 49ed65a330 feat(daemon): Phase 3 — crash recovery, hot reload & incremental config
- workflow-manager: crash detection, worker respawn, thread resume from
  persisted events, drainAndRespawn() for hot reload
- log-store: getTriggerPayload(), getThreadEvents() for crash recovery
- file-watcher: detect workflow .ts file changes under workflows/
- kernel: handleWorkflowFileChange(), incremental workflow config updates
  on reloadConfig() (add/remove/update concurrency)
- ipc: resume-thread message type for crash recovery
- workflow-worker: handle resume-thread, rebuild ThreadState from events

28 new tests across 4 test files. All 168 tests pass.

小橘 🍊(NEKO Team)
2026-04-22 13:25:35 +00:00
xiaomo b7dfe42a96 Merge pull request 'fix: init runtime bugs - missing dir, .ts/.js mismatch, TS annotations' (#26) from fix/init-runtime-bugs into main 2026-04-22 13:22:52 +00:00
xingyue a887fc04ca fix: init creates data/senses dir, generates .js templates without TS annotations
- Add mkdirSync for data/senses/ in init command (#23)
- Add defensive mkdirSync in sense-runtime before DB open (#23)
- Change init template output from index.ts to index.js (#24)
- Remove TypeScript type annotations from CPU usage template (#25)

Closes #23, closes #24, closes #25
2026-04-22 21:15:42 +08:00
xiaomo d5931a9e19 Merge pull request 'feat: Workflow Engine Phase 2 — Kernel Integration' (#21) from feat/workflow-engine-phase2 into main 2026-04-22 12:45:43 +00:00
xiaoju f12f187d8d feat(daemon): Phase 2 — kernel ↔ workflow integration
- kernel.ts: create WorkflowManager, expose on Kernel, wire into
  ReflexScheduler via workflowTriggerFn, add activeWorkflows to
  KernelHealth, graceful shutdown ordering
- reflex-scheduler.ts: handle kind:'workflow' reflexes — subscribe to
  bus signals and delegate to workflowTriggerFn
- workflow-manager.ts: add totalActiveCount(), updateConfig() for hot
  reload support

All 140 tests pass.

小橘 🍊(NEKO Team)
2026-04-22 12:40:57 +00:00
xiaomo 1512113a01 Merge pull request 'feat: Workflow Engine Phase 1' (#17) from feat/workflow-engine-phase1 into main 2026-04-22 12:20:12 +00:00
xiaoju 8f495cf92e fix: address all 8 PR #17 review issues
Critical fixes:
1. triggerPayload spread → safe field assignment with validation
2. Graceful shutdown stopping flag → no false crash warnings
3. Shutdown awaits in-flight work (10s timeout) before exit
4. Thread loop safety valve (MAX_STEPS=1000)
5. active counter: bare number → Set<string> by runId

Should-fix:
6. ThreadEventMessage.eventType → union type with validation
7. SQLite status cast → runtime validateWorkflowRunStatus()
8. getActiveWorkflowRuns → pre-compiled prepared statements

小橘 <xiaoju@shazhou.work>
2026-04-22 12:13:29 +00:00
xiaoju a68338c4e9 feat: implement Workflow Engine Phase 1 (#16)
- Core types: WorkflowDefinition, ThreadState, CommandEvent, ModerateResult, etc.
- IPC: StartThreadMessage, ResumeThreadMessage, ThreadEventMessage, WorkflowErrorMessage
- WorkflowManager: concurrency control (drop/queue overflow), worker lifecycle
- Workflow worker: moderate→execute loop, user code loading
- LogStore: workflow_runs materialized table, appendWithWorkflowUpdate
- 131 tests passing

RFC-002 Phase 1 complete.

小橘 <xiaoju@shazhou.work>
2026-04-22 11:49:50 +00:00
xiaoju 9802f68380 docs: add RFC-002 Workflow Engine
Defines the workflow execution engine design:
- Thread lifecycle with event sourcing
- Concurrency control (drop/queue overflow)
- WorkflowManager, workflow worker process model
- IPC extension, crash recovery, hot reload
- 4-phase implementation plan

小橘 <xiaoju@shazhou.work>
2026-04-22 11:26:28 +00:00
xiaomo f245224320 Merge pull request 'Phase 7: Structured Logging System' (#15) from feat/phase-7-logging into main 2026-04-22 11:21:47 +00:00
xiaoju 1b995379f9 feat: Phase 7 — structured logging system (RFC §2.4/§5.4)
- log-store.ts: SQLite append-only log store with query API and meta table
- kernel: system start/stop logs, signal/error logging, file watcher events
- reflex-scheduler: run_start/run_complete logging per compute
- Exports: createLogStore, LogStore, LogEntry, LogQuery
- Tests: log-store CRUD, query filters, meta, integration with reflex

Closes #14
小橘 🍊(NEKO Team)
2026-04-22 11:20:29 +00:00
xiaomo ea6bc5610b Merge pull request 'Phase 6: Hot Reload & Error Handling' (#13) from feat/phase-6-hot-reload into main 2026-04-22 11:12:38 +00:00
xiaoju 49d078b144 fix: address PR #13 review — per-sense timeout, reload restart, await ready
- reloadConfig: restart existing groups when new senses added
- sense-worker: per-sense timeout/gracePeriod lookup at compute time (RFC §5.3)
- restartGroup: await worker ready promise before returning
- Comments: scheduler in-flight state loss, fs.watch Linux caveats, grace period blast radius

小橘 🍊(NEKO Team)
2026-04-22 11:07:33 +00:00
xiaoju 9ca8c8ecb8 feat: Phase 6 — hot reload, error isolation, grace period, nerve-health
- file-watcher.ts: watch nerveRoot for .ts and nerve.yaml changes
- kernel.ts: restartGroup(), reloadConfig(), getHealth(), auto-respawn on crash
- sense-worker.ts: compute try/catch error isolation, grace_period hard kill
- ipc.ts: new message types for health, restart, reload
- examples/senses/nerve-health.ts: built-in daemon health sense
- Integration tests for hot reload, error isolation, grace period

小橘 🍊(NEKO Team)
2026-04-22 10:57:00 +00:00
xiaomo 00c1932144 Merge pull request 'feat: Phase 5 — CLI & User Workspace' (#12) from feat/phase-5-cli-workspace into main 2026-04-22 10:33:53 +00:00
30 changed files with 5825 additions and 85 deletions
+410
View File
@@ -0,0 +1,410 @@
# RFC-002: Workflow Engine
> Status: Draft
> Author: 小橘 🍊(NEKO Team)
> Date: 2026-04-22
## 1. 动机
RFC-001 定义了 Sense → Signal → Reflex 的无状态观测循环。当 Reflex 的 action 为 `StartWorkflow` 时,需要一个有状态的执行引擎来驱动多步骤工作流。本 RFC 定义 Workflow Engine 的完整设计。
## 2. 概念模型
```
Signal 循环 ──→ ThreadStart ──→ Thread 循环
(无状态,幂等,可合并) (有状态,顺序,Command Event 驱动)
Thread 产出 Log
(执行日志,供 retrospection)
```
### 2.1 核心概念
- **Workflow**:一个有状态工作流的定义,包含并发策略和溢出策略
- **Thread**:Workflow 的一次执行实例,有唯一 `runId`
- **Role**:执行具体动作的角色,有副作用(调 API、改文件等)
- **Moderator**:在 Role 之间递话筒的调度逻辑(纯函数)
- **CommandEvent**:Thread 内部的事件流,驱动 Role 之间的流转
Role 和 Moderator 是 Workflow 的内部细节,不跨 Workflow 共享,不作为顶层扩展点。
### 2.2 与 Sense 循环的边界
| | Signal 循环 | Thread 循环 |
|---|---|---|
| 状态 | 无状态 | 有状态(事件溯源) |
| 幂等性 | 幂等、可合并、可丢弃 | 严格顺序、每个 event 必须响应 |
| 触发 | Reflex | Moderator 递话筒 |
| 产出 | Signal → Bus | Log → logs.db |
**关键约束**:Thread 产出的 Log 不能触发 Reflex(见 RFC-001 §2.4),只能被 Sense 的 compute 查询用于 retrospection。
## 3. 配置
```yaml
# nerve.yaml
workflows:
cleanup:
concurrency: 1 # 同时最多 1 个 thread
overflow: drop # 已有活跃 thread 时丢弃新请求
execute-task:
concurrency: 10 # 可并行 10 个 thread
overflow: queue # 超出时排队
code-review:
concurrency: 3
overflow: queue
maxQueue: 20 # 队列上限,超出丢弃最旧请求
```
- **concurrency**:同时允许的最大活跃 Thread 数
- **overflow**:
- `drop`:丢弃,适用于幂等操作(如 cleanup)
- `queue`:排队等待,适用于每次需要执行的操作
- **maxQueue**(仅 `overflow: queue`):队列上限,默认 100,超出丢弃最旧请求
不需要 throttle——触发频率由上游 Sense 的 throttle 控制。
## 4. 用户代码结构
```
~/.uncaged-nerve/
workflows/
cleanup/
index.ts # workflow 定义
code-review/
index.ts
```
### 4.1 Workflow 定义(用户代码)
```typescript
// workflows/cleanup/index.ts
import type { WorkflowDefinition } from "@uncaged/nerve-daemon";
const workflow: WorkflowDefinition = {
roles: {
scanner: {
execute: async (prompt, ctx) => {
// 扫描需要清理的资源
const stale = await findStaleResources();
return { type: "scan_complete", resources: stale };
},
},
cleaner: {
execute: async (prompt, ctx) => {
// 执行清理
await deleteResources(prompt.resources);
return { type: "cleanup_done", count: prompt.resources.length };
},
},
},
moderate(thread, event) {
// 纯函数:决定下一步交给谁
if (event.type === "thread_start") {
return { role: "scanner", prompt: {} };
}
if (event.type === "scan_complete") {
if (event.resources.length === 0) return null; // 结束
return { role: "cleaner", prompt: { resources: event.resources } };
}
return null; // 结束
},
};
export default workflow;
```
### 4.2 类型定义
```typescript
// 用户需要实现的接口
export type RoleExecuteFn = (
prompt: unknown,
ctx: WorkflowContext,
) => Promise<CommandEvent>;
export type Role = {
execute: RoleExecuteFn;
};
export type ModerateFn = (
thread: ThreadState,
event: CommandEvent,
) => ModerateResult | null; // null = thread 完成
export type ModerateResult = {
role: string;
prompt: unknown;
};
export type WorkflowDefinition = {
roles: Record<string, Role>;
moderate: ModerateFn;
};
export type WorkflowContext = {
runId: string;
workflowName: string;
log: (message: string) => void;
};
export type CommandEvent = {
type: string;
[key: string]: unknown;
};
export type ThreadState = {
runId: string;
events: CommandEvent[]; // 历史事件,供 moderate 做决策
};
```
## 5. 运行时架构
### 5.1 进程模型
同一个 workflow 的所有 thread 共享一个 worker 进程。`concurrency` 控制进程内的并发 async task 数。
```
nerve-engine (kernel)
├─ nerve-worker sense --group system (永驻)
├─ nerve-worker sense --group tasks (永驻)
├─ nerve-worker workflow --name cleanup (按需启动)
└─ nerve-worker workflow --name code-review (按需启动)
```
- 进程数 = workflow 种类数,不会膨胀
- 有活跃 thread 时启动,所有 thread 完成后退出(或保持待命 `idleTimeout`,默认 30s)
- 崩溃后 engine respawn worker,从持久化状态恢复 thread
### 5.2 IPC 扩展
在现有 IPC 协议基础上扩展 workflow 相关消息:
```typescript
// Parent → Workflow Worker
type StartThreadMessage = {
type: "start-thread";
runId: string;
workflow: string;
triggerPayload: unknown; // Reflex 传入的 signal payload
};
type ResumeThreadMessage = {
type: "resume-thread";
runId: string;
};
type ShutdownMessage = {
type: "shutdown";
};
// Workflow Worker → Parent
type ThreadEventMessage = {
type: "thread-event";
runId: string;
eventType: string; // queued, started, step_complete, completed, failed
payload: unknown;
};
type WorkflowReadyMessage = {
type: "ready";
};
type WorkflowErrorMessage = {
type: "error";
runId: string;
error: string;
};
```
### 5.3 Thread 生命周期
```
┌──────────────────────────────────────────┐
│ │
trigger ──→ queued ──→ started ──→ step_complete ──→ completed
│ ↑ │
│ └─────┘
└──→ failed
└──→ crashed (worker 崩溃)
```
1. Reflex `StartWorkflow` 触发 → kernel 检查 concurrency
2. 有空位 → 状态 `started`,发 `start-thread` 给 worker
3. 超出 concurrency:
- `overflow: drop` → 丢弃,写 `dropped` log
- `overflow: queue` → 状态 `queued`,等空位
4. Worker 内部循环:`moderate → execute → moderate → execute → ...`
5. `moderate` 返回 `null``completed`
6. Role `execute` 抛异常 → `failed`
7. Worker 进程崩溃 → 所有活跃 thread 标记 `crashed`
### 5.4 Workflow Worker 内部实现
```typescript
// workflow-worker.ts(engine 代码,不是用户代码)
async function runThread(
def: WorkflowDefinition,
runId: string,
triggerPayload: unknown,
sendToParent: (msg: WorkerToParentMessage) => void,
): Promise<void> {
const state: ThreadState = { runId, events: [] };
const ctx: WorkflowContext = {
runId,
workflowName,
log: (msg) => sendToParent({
type: "thread-event", runId, eventType: "step_complete", payload: { message: msg },
}),
};
// 初始事件
let event: CommandEvent = { type: "thread_start", ...triggerPayload };
while (true) {
state.events.push(event);
const next = def.moderate(state, event);
if (next === null) {
sendToParent({ type: "thread-event", runId, eventType: "completed", payload: null });
return;
}
const role = def.roles[next.role];
if (!role) {
sendToParent({ type: "error", runId, error: `Unknown role: ${next.role}` });
return;
}
event = await role.execute(next.prompt, ctx);
}
}
```
## 6. 持久化
### 6.1 事件溯源
Thread 状态以 append-only 事件流为 source of truth,写入统一的 `logs.db`
```sql
-- logs 表(已存在于 RFC-001),source = "workflow"
-- type 取值:queued, started, step_complete, completed, failed, crashed, dropped
```
### 6.2 物化表
为避免每次查活跃 workflow 都扫描全表,维护一张物化表,在写 log 的同一事务中 UPSERT:
```sql
CREATE TABLE workflow_runs (
run_id TEXT PRIMARY KEY,
workflow TEXT NOT NULL,
status TEXT NOT NULL, -- queued, started, completed, failed, crashed
ts INTEGER NOT NULL
);
CREATE INDEX idx_workflow_runs_status ON workflow_runs(status);
CREATE INDEX idx_workflow_runs_workflow ON workflow_runs(workflow);
```
写入流程(同一事务):
```sql
BEGIN;
INSERT INTO logs (source, type, ref_id, payload, ts)
VALUES ('workflow', 'started', 'run-7', '{}', 1001);
INSERT OR REPLACE INTO workflow_runs (run_id, workflow, status, ts)
VALUES ('run-7', 'cleanup', 'started', 1001);
COMMIT;
```
物化表是 logs 的派生数据——数据丢失时可从 logs 重建。
### 6.3 崩溃恢复
Worker 重启时:
1.`workflow_runs WHERE status IN ('queued', 'started')`
2. `queued` → 重新排队
3. `started` → 从 logs 重建 `ThreadState`(事件列表),resume
幂等 workflow(`overflow: drop`)直接标记 `crashed` 重新来。
## 7. Kernel 扩展
### 7.1 WorkflowManager
Kernel 新增 `WorkflowManager` 组件:
```typescript
export type WorkflowManager = {
/** 触发一个 workflow(来自 Reflex) */
startWorkflow: (workflowName: string, payload: unknown) => void;
/** 获取活跃 thread 数 */
activeCount: (workflowName: string) => number;
/** 获取队列长度 */
queueLength: (workflowName: string) => number;
/** 停止所有 workflow */
stop: () => Promise<void>;
};
```
### 7.2 Reflex Scheduler 扩展
当前 `ReflexScheduler` 只处理 `kind: "sense"` 的 reflex。扩展为同时处理 `kind: "workflow"`
```typescript
// reflex-scheduler.ts 扩展
if (reflex.kind === "workflow") {
const workflowName = reflex.workflow;
if (reflex.on !== null && reflex.on.length > 0) {
const watchedSenses = new Set(reflex.on);
const unsub = bus.subscribe((signal) => {
if (watchedSenses.has(signal.senseId)) {
workflowManager.startWorkflow(workflowName, signal.payload);
}
});
unsubscribers.push(unsub);
}
}
```
### 7.3 热更新
| 变化 | 处理 |
|------|------|
| workflow ts 文件修改 | drain(等活跃 thread 完成,`drainTimeout` 后 force kill + 标记 crashed)→ respawn |
| nerve.yaml workflow 配置修改 | 更新 concurrency/overflow,不重启 worker |
| 新增 workflow | 下次触发时按需启动 worker |
| 删除 workflow | drain + 移除 |
## 8. 实现计划
### Phase 1:核心类型与 WorkflowManager 骨架
- [ ] `packages/core/src/types.ts` — 扩展 workflow 相关类型(`WorkflowDefinition``ThreadState``CommandEvent``ModerateResult` 等)
- [ ] `packages/daemon/src/ipc.ts` — 扩展 IPC 消息类型(`StartThreadMessage``ThreadEventMessage` 等)
- [ ] `packages/daemon/src/workflow-manager.ts` — WorkflowManager 实现(并发控制、队列管理、thread 生命周期)
- [ ] `packages/daemon/src/workflow-worker.ts` — Workflow worker 进程(加载用户代码、运行 moderate/execute 循环)
- [ ] `packages/daemon/src/log-store.ts` — 扩展 LogStore,添加 `workflow_runs` 物化表 + 查询方法
- [ ] 单元测试覆盖:并发控制、队列溢出、thread 生命周期
### Phase 2:Kernel 集成
- [ ] `packages/daemon/src/kernel.ts` — 集成 WorkflowManager,处理 workflow worker 的生命周期
- [ ] `packages/daemon/src/reflex-scheduler.ts` — 扩展支持 `kind: "workflow"` 的 reflex
- [ ] 集成测试:Sense signal → Reflex → Workflow 全链路
### Phase 3:崩溃恢复与热更新
- [ ] Worker crash recovery — 从持久化状态恢复 thread
- [ ] 热更新 — workflow 代码变更时 drain + respawn
- [ ] nerve.yaml workflow 配置变更的增量更新
### Phase 4:CLI 与用户体验
- [ ] `nerve workflow list` — 查看活跃 workflow
- [ ] `nerve workflow inspect <runId>` — 查看 thread 详情
- [ ] `nerve workflow trigger <name>` — 手动触发
- [ ] scaffold 模板 — `nerve init workflow <name>`
+70
View File
@@ -0,0 +1,70 @@
/**
* nerve-health — built-in sense that reports daemon health via IPC.
*
* When running inside a sense worker, this compute function sends a
* "health-request" to the parent kernel process and returns the
* health-response payload as its signal.
*
* Usage in nerve.yaml:
* senses:
* nerve-health:
* group: internal
* throttle: 30s
* timeout: 5s
*
* reflexes:
* - sense: nerve-health
* interval: 30s
*/
export type NerveHealth = {
uptime: number;
activeSenses: string[];
inFlightCount: number;
workerMemoryUsage: NodeJS.MemoryUsage;
workerUptime: number;
};
export async function compute(): Promise<NerveHealth | null> {
const health = await requestHealthFromKernel();
return health;
}
function requestHealthFromKernel(): Promise<NerveHealth> {
return new Promise((resolve, reject) => {
if (!process.send) {
reject(new Error("nerve-health: not running as a child process with IPC"));
return;
}
const timeout = setTimeout(() => {
process.removeListener("message", onMessage);
reject(new Error("nerve-health: health-request timed out"));
}, 5000);
function onMessage(msg: unknown): void {
if (
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "health-response"
) {
clearTimeout(timeout);
process.removeListener("message", onMessage);
const resp = msg as {
senses: string[];
inFlightCount: number;
};
resolve({
uptime: process.uptime(),
activeSenses: resp.senses,
inFlightCount: resp.inFlightCount,
workerMemoryUsage: process.memoryUsage(),
workerUptime: process.uptime(),
});
}
}
process.on("message", onMessage);
process.send({ type: "health-request" });
});
}
+4 -3
View File
@@ -48,9 +48,9 @@ export const cpuUsage = sqliteTable("cpu_usage", {
});
`;
const CPU_INDEX_TS = `import { cpus } from "node:os";
const CPU_INDEX_JS = `import { cpus } from "node:os";
export async function compute(): Promise<unknown> {
export async function compute() {
const cpuList = cpus();
let totalIdle = 0;
@@ -135,13 +135,14 @@ export const initCommand = defineCommand({
}
mkdirSync(join(nerveRoot, "data"), { recursive: true });
mkdirSync(join(nerveRoot, "data", "senses"), { recursive: true });
mkdirSync(join(nerveRoot, "senses", "cpu-usage", "migrations"), { recursive: true });
writeFile(join(nerveRoot, "nerve.yaml"), NERVE_YAML);
writeFile(join(nerveRoot, "package.json"), PACKAGE_JSON);
writeFile(join(nerveRoot, ".gitignore"), GITIGNORE);
writeFile(join(nerveRoot, "senses", "cpu-usage", "schema.ts"), CPU_SCHEMA_TS);
writeFile(join(nerveRoot, "senses", "cpu-usage", "index.ts"), CPU_INDEX_TS);
writeFile(join(nerveRoot, "senses", "cpu-usage", "index.js"), CPU_INDEX_JS);
writeFile(
join(nerveRoot, "senses", "cpu-usage", "migrations", "0001_init.sql"),
CPU_MIGRATION_SQL,
+8
View File
@@ -8,6 +8,14 @@ export type {
QueueOverflowConfig,
WorkflowConfig,
NerveConfig,
CommandEvent,
ThreadState,
ModerateResult,
WorkflowContext,
RoleExecuteFn,
Role,
ModerateFn,
WorkflowDefinition,
} from "./types.js";
export type { Result } from "./result.js";
export { ok, err } from "./result.js";
+55
View File
@@ -45,3 +45,58 @@ export type NerveConfig = {
reflexes: ReflexConfig[];
workflows: Record<string, WorkflowConfig> | null;
};
// ---------------------------------------------------------------------------
// Workflow Engine types (RFC-002)
// ---------------------------------------------------------------------------
/** A single event in the command event stream that drives a workflow thread. */
export type CommandEvent = {
type: string;
[key: string]: unknown;
};
/** Accumulated state of a running thread — the event history for moderate(). */
export type ThreadState = {
runId: string;
/** All events so far, including the initial thread_start event. */
events: CommandEvent[];
};
/** The result of moderate() — which role to hand to next, and what prompt to pass. */
export type ModerateResult = {
role: string;
prompt: unknown;
};
/** Context injected into every role execute() call. */
export type WorkflowContext = {
runId: string;
workflowName: string;
/** Emit a log message back to the parent process. */
log: (message: string) => void;
};
/**
* A role's execute function. Has side effects (API calls, file I/O, etc.).
* Returns a CommandEvent that is fed back into moderate().
*/
export type RoleExecuteFn = (prompt: unknown, ctx: WorkflowContext) => Promise<CommandEvent>;
/** A role in a workflow — a named unit of execution with side effects. */
export type Role = {
execute: RoleExecuteFn;
};
/**
* The moderator function — pure, no side effects.
* Decides which role to pass control to next.
* Returns null to signal thread completion.
*/
export type ModerateFn = (thread: ThreadState, event: CommandEvent) => ModerateResult | null;
/** The complete definition of a workflow, as authored by users. */
export type WorkflowDefinition = {
roles: Record<string, Role>;
moderate: ModerateFn;
};
@@ -0,0 +1,429 @@
/**
* Phase 3 — Worker crash recovery tests.
*
* Verifies that WorkflowManager correctly:
* - Marks in-flight threads as "crashed" in the DB when a worker exits unexpectedly
* - Respawns the worker after a crash
* - Resumes "started" threads from persisted event history (resume-thread IPC)
* - Re-queues "queued" threads so they are dispatched on the new worker
*/
import { EventEmitter } from "node:events";
import type { NerveConfig, WorkflowConfig } from "@uncaged/nerve-core";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
type MockChild = EventEmitter & {
send: ReturnType<typeof vi.fn>;
kill: ReturnType<typeof vi.fn>;
connected: boolean;
exitCode: number | null;
pid: number;
};
const mockChildren: MockChild[] = [];
function makeMockChild(pid = 1): MockChild {
const child = new EventEmitter() as MockChild;
child.connected = true;
child.exitCode = null;
child.pid = pid;
child.send = vi.fn((msg: unknown) => {
if (
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "shutdown"
) {
setImmediate(() => {
child.exitCode = 0;
child.connected = false;
child.emit("exit", 0, null);
});
}
});
child.kill = vi.fn((_signal?: string) => {
child.exitCode = 1;
child.connected = false;
child.emit("exit", null, _signal ?? "SIGKILL");
});
return child;
}
vi.mock("node:child_process", () => ({
fork: vi.fn((_script: string, _args: string[], _opts: unknown) => {
const child = makeMockChild(mockChildren.length + 1);
mockChildren.push(child);
return child;
}),
}));
const { createWorkflowManager } = await import("../workflow-manager.js");
function makeConfig(workflows: Record<string, WorkflowConfig> = {}): NerveConfig {
return {
senses: {},
reflexes: [],
workflows,
};
}
function makeLogStore(
activeRuns: Array<{
runId: string;
workflow: string;
status: "queued" | "started";
ts: number;
}> = [],
) {
const store = {
append: vi.fn(),
query: vi.fn(() => []),
getMeta: vi.fn(() => null),
setMeta: vi.fn(),
upsertWorkflowRun: vi.fn(),
appendWithWorkflowUpdate: vi.fn(),
getWorkflowRun: vi.fn(() => null),
getActiveWorkflowRuns: vi.fn((_workflowName?: string) => {
if (_workflowName !== undefined) {
return activeRuns.filter((r) => r.workflow === _workflowName);
}
return activeRuns;
}),
getTriggerPayload: vi.fn(() => ({ value: 42 })),
getThreadEvents: vi.fn(() => [{ type: "thread_start", triggerPayload: {} }]),
close: vi.fn(),
};
return store;
}
describe("WorkflowManager — crash recovery (Phase 3)", () => {
beforeEach(() => {
mockChildren.length = 0;
vi.useFakeTimers({ shouldAdvanceTime: true });
});
afterEach(() => {
vi.useRealTimers();
vi.clearAllMocks();
});
describe("worker crash marks active threads as crashed", () => {
it("logs 'crashed' status for each active thread when worker exits unexpectedly", async () => {
const logStore = makeLogStore();
const config = makeConfig({
"my-wf": { concurrency: 2, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", { n: 1 });
mgr.startWorkflow("my-wf", { n: 2 });
expect(mgr.activeCount("my-wf")).toBe(2);
// Simulate unexpected exit (not shutdown)
const child = mockChildren[0];
child.exitCode = 1;
child.connected = false;
child.emit("exit", 1, null);
const crashedCalls = logStore.upsertWorkflowRun.mock.calls.filter(
([entry]: [{ type: string }]) => entry.type === "crashed",
);
expect(crashedCalls).toHaveLength(2);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
it("clears active count after crash", async () => {
const logStore = makeLogStore();
const config = makeConfig({
"my-wf": { concurrency: 3, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", {});
mgr.startWorkflow("my-wf", {});
expect(mgr.activeCount("my-wf")).toBe(2);
const child = mockChildren[0];
child.exitCode = 1;
child.connected = false;
child.emit("exit", 1, null);
expect(mgr.activeCount("my-wf")).toBe(0);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
});
describe("worker crash triggers respawn", () => {
it("spawns a new worker after crash", async () => {
const logStore = makeLogStore();
const config = makeConfig({
"my-wf": { concurrency: 1, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", {});
expect(mockChildren).toHaveLength(1);
const child = mockChildren[0];
child.exitCode = 1;
child.connected = false;
child.emit("exit", 1, null);
// setImmediate to allow respawn
await vi.runAllTimersAsync();
expect(mockChildren).toHaveLength(2);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
it("sends resume-thread for 'started' runs from DB after respawn", async () => {
const activeRuns = [
{ runId: "run-started-1", workflow: "my-wf", status: "started" as const, ts: 1000 },
];
const logStore = makeLogStore(activeRuns);
logStore.getThreadEvents.mockReturnValue([
{ type: "thread_start", triggerPayload: {} },
{ type: "scan_complete", items: ["a"] },
]);
logStore.getTriggerPayload.mockReturnValue({ trigger: "initial" });
const config = makeConfig({
"my-wf": { concurrency: 2, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", {});
const firstChild = mockChildren[0];
firstChild.exitCode = 1;
firstChild.connected = false;
firstChild.emit("exit", 1, null);
await vi.runAllTimersAsync();
// New worker should have been spawned
const secondChild = mockChildren[1];
expect(secondChild).toBeDefined();
// resume-thread should have been sent
const resumeCalls = (secondChild.send as ReturnType<typeof vi.fn>).mock.calls.filter(
([msg]: [unknown]) =>
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "resume-thread",
);
expect(resumeCalls).toHaveLength(1);
expect(resumeCalls[0][0]).toMatchObject({
type: "resume-thread",
runId: "run-started-1",
triggerPayload: { trigger: "initial" },
});
expect(Array.isArray((resumeCalls[0][0] as Record<string, unknown>).events)).toBe(true);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
it("re-queues 'queued' runs from DB after respawn", async () => {
const activeRuns = [
{ runId: "run-queued-1", workflow: "my-wf", status: "queued" as const, ts: 900 },
];
const logStore = makeLogStore(activeRuns);
logStore.getTriggerPayload.mockReturnValue({ queued: "payload" });
const config = makeConfig({
"my-wf": { concurrency: 1, overflow: "queue", maxQueue: 5 },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
// Start one thread to fill the concurrency slot (so queued run stays queued on respawn)
mgr.startWorkflow("my-wf", {});
const firstChild = mockChildren[0];
firstChild.exitCode = 1;
firstChild.connected = false;
firstChild.emit("exit", 1, null);
await vi.runAllTimersAsync();
// After respawn, the queue should contain the recovered run
expect(mgr.queueLength("my-wf")).toBeGreaterThanOrEqual(1);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
});
describe("command events are persisted (for crash recovery replay)", () => {
it("persists thread_command_event when worker sends thread-command-event IPC", async () => {
const logStore = makeLogStore();
const config = makeConfig({
"my-wf": { concurrency: 1, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", { x: 1 });
const child = mockChildren[0];
const startCall = (child.send as ReturnType<typeof vi.fn>).mock.calls[0];
const runId = (startCall[0] as Record<string, unknown>).runId as string;
// Simulate worker sending a command event back
child.emit("message", {
type: "thread-command-event",
runId,
event: { type: "scan_complete", items: ["a", "b"] },
});
const appendCalls = logStore.append.mock.calls.filter(
([entry]: [{ type: string }]) => entry.type === "thread_command_event",
);
expect(appendCalls).toHaveLength(1);
expect(appendCalls[0][0]).toMatchObject({
source: "workflow",
type: "thread_command_event",
refId: runId,
});
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
});
describe("triggerPayload is persisted in 'started' log entry", () => {
it("stores triggerPayload in the payload field of the started log entry", async () => {
const logStore = makeLogStore();
const config = makeConfig({
"my-wf": { concurrency: 1, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
const payload = { task: "build-docker", repo: "myrepo" };
mgr.startWorkflow("my-wf", payload);
const startedCall = logStore.upsertWorkflowRun.mock.calls.find(
([entry]: [{ type: string }]) => entry.type === "started",
);
expect(startedCall).toBeDefined();
const logEntry = startedCall?.[0] as { payload: string | null };
expect(logEntry.payload).not.toBeNull();
const parsed = JSON.parse(logEntry.payload as string) as Record<string, unknown>;
expect(parsed.triggerPayload).toMatchObject(payload);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
});
describe("runId deduplication in crash recovery", () => {
it("does not push duplicate runIds into the queue during crash recovery", async () => {
const activeRuns = [
{ runId: "run-queued-dup", workflow: "my-wf", status: "queued" as const, ts: 900 },
];
const logStore = makeLogStore(activeRuns);
logStore.getTriggerPayload.mockReturnValue({ q: 1 });
const config = makeConfig({
"my-wf": { concurrency: 1, overflow: "queue", maxQueue: 5 },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
// Start one thread to fill the concurrency slot
mgr.startWorkflow("my-wf", {});
const firstChild = mockChildren[0];
// Crash once → respawn → crash again → second respawn
firstChild.exitCode = 1;
firstChild.connected = false;
firstChild.emit("exit", 1, null);
await vi.runAllTimersAsync();
const secondChild = mockChildren[1];
secondChild.exitCode = 1;
secondChild.connected = false;
secondChild.emit("exit", 1, null);
await vi.runAllTimersAsync();
// The recovered queued run should appear at most once in the queue
expect(mgr.queueLength("my-wf")).toBeLessThanOrEqual(1);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
it("does not add duplicate active runIds during crash recovery", async () => {
const activeRuns = [
{ runId: "run-started-dup", workflow: "my-wf", status: "started" as const, ts: 1000 },
];
const logStore = makeLogStore(activeRuns);
logStore.getThreadEvents.mockReturnValue([{ type: "thread_start", triggerPayload: {} }]);
logStore.getTriggerPayload.mockReturnValue({ s: 1 });
const config = makeConfig({
"my-wf": { concurrency: 2, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", {});
const firstChild = mockChildren[0];
firstChild.exitCode = 1;
firstChild.connected = false;
firstChild.emit("exit", 1, null);
await vi.runAllTimersAsync();
const secondChild = mockChildren[1];
secondChild.exitCode = 1;
secondChild.connected = false;
secondChild.emit("exit", 1, null);
await vi.runAllTimersAsync();
// The active set should not double-count the recovered run
expect(mgr.activeCount("my-wf")).toBeLessThanOrEqual(1);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
});
describe("crash-loop backoff", () => {
it("stops respawning after exceeding max crashes in the window", async () => {
const logStore = makeLogStore();
const config = makeConfig({
"crash-wf": { concurrency: 1, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("crash-wf", {});
// Crash the worker 6 times in rapid succession (within CRASH_WINDOW_MS = 60s)
for (let i = 0; i < 6; i++) {
const child = mockChildren[mockChildren.length - 1];
child.exitCode = 1;
child.connected = false;
child.emit("exit", 1, null);
await vi.runAllTimersAsync();
}
// After 6 crashes, no new worker should be spawned
// The 1st crash spawns child[1], ..., 5th crash spawns child[5], 6th should NOT spawn
expect(mockChildren.length).toBeLessThanOrEqual(6);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
});
});
@@ -0,0 +1,119 @@
/**
* Phase 3 — FileWatcher workflow change detection tests.
*
* Verifies that file-watcher.ts detects .ts file changes under workflows/.
*/
import { mkdirSync, mkdtempSync, rmSync, writeFileSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, describe, expect, it } from "vitest";
import { createFileWatcher } from "../file-watcher.js";
import type { FileChange, FileWatcher } from "../file-watcher.js";
function makeTempNerveRoot(): string {
const dir = mkdtempSync(join(tmpdir(), "nerve-fw-wf-test-"));
mkdirSync(join(dir, "workflows", "my-workflow"), { recursive: true });
writeFileSync(join(dir, "nerve.yaml"), "senses: {}\nreflexes: []\n");
writeFileSync(
join(dir, "workflows", "my-workflow", "index.ts"),
"export default { roles: {}, moderate: () => null };",
);
return dir;
}
async function waitFor(
predicate: () => boolean,
timeoutMs: number,
intervalMs = 50,
): Promise<void> {
return new Promise((resolve, reject) => {
const timer = setTimeout(
() => reject(new Error(`waitFor timed out after ${timeoutMs}ms`)),
timeoutMs,
);
const check = setInterval(() => {
if (predicate()) {
clearTimeout(timer);
clearInterval(check);
resolve();
}
}, intervalMs);
});
}
describe("createFileWatcher — workflow file changes (Phase 3)", () => {
let watcher: FileWatcher | null = null;
afterEach(() => {
if (watcher !== null) {
watcher.close();
watcher = null;
}
});
it("detects workflow .ts file changes and emits kind=workflow", async () => {
const root = makeTempNerveRoot();
const changes: FileChange[] = [];
watcher = createFileWatcher(root, (change) => changes.push(change), 50);
await new Promise((r) => setTimeout(r, 100));
writeFileSync(
join(root, "workflows", "my-workflow", "index.ts"),
"export default { roles: {}, moderate: () => null }; // updated",
);
await waitFor(() => changes.some((c) => c.kind === "workflow"), 3000);
const wfChanges = changes.filter((c) => c.kind === "workflow");
expect(wfChanges.length).toBeGreaterThanOrEqual(1);
const wfChange = wfChanges[0] as { workflowName: string; filePath: string };
expect(wfChange.workflowName).toBe("my-workflow");
}, 10_000);
it("does NOT emit workflow change for nerve.yaml", async () => {
const root = makeTempNerveRoot();
const changes: FileChange[] = [];
watcher = createFileWatcher(root, (change) => changes.push(change), 50);
await new Promise((r) => setTimeout(r, 100));
writeFileSync(join(root, "nerve.yaml"), "senses: {}\nreflexes: []\n# changed\n");
await waitFor(() => changes.some((c) => c.kind === "config"), 3000);
const wfChanges = changes.filter((c) => c.kind === "workflow");
expect(wfChanges).toHaveLength(0);
}, 10_000);
it("debounces rapid workflow file changes", async () => {
const root = makeTempNerveRoot();
const changes: FileChange[] = [];
watcher = createFileWatcher(root, (change) => changes.push(change), 200);
await new Promise((r) => setTimeout(r, 100));
for (let i = 0; i < 5; i++) {
writeFileSync(
join(root, "workflows", "my-workflow", "index.ts"),
`export default {}; // v${i}`,
);
}
await waitFor(() => changes.some((c) => c.kind === "workflow"), 3000);
// Allow debounce window to pass
await new Promise((r) => setTimeout(r, 300));
const wfChanges = changes.filter((c) => c.kind === "workflow");
expect(wfChanges.length).toBe(1);
}, 10_000);
it("cleans up temp dir after test", () => {
const root = makeTempNerveRoot();
rmSync(root, { recursive: true, force: true });
});
});
@@ -0,0 +1,126 @@
/**
* Unit tests for file-watcher.ts
*/
import { mkdirSync, mkdtempSync, writeFileSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, describe, expect, it } from "vitest";
import { createFileWatcher } from "../file-watcher.js";
import type { FileChange, FileWatcher } from "../file-watcher.js";
function makeTempNerveRoot(): string {
const dir = mkdtempSync(join(tmpdir(), "nerve-fw-test-"));
mkdirSync(join(dir, "senses", "cpu-usage"), { recursive: true });
writeFileSync(join(dir, "nerve.yaml"), "senses: {}\nreflexes: []\n");
writeFileSync(
join(dir, "senses", "cpu-usage", "index.ts"),
"export async function compute() { return null; }",
);
return dir;
}
async function waitFor(
predicate: () => boolean,
timeoutMs: number,
intervalMs = 50,
): Promise<void> {
return new Promise((resolve, reject) => {
const timer = setTimeout(
() => reject(new Error(`waitFor timed out after ${timeoutMs}ms`)),
timeoutMs,
);
const check = setInterval(() => {
if (predicate()) {
clearTimeout(timer);
clearInterval(check);
resolve();
}
}, intervalMs);
});
}
describe("createFileWatcher", () => {
let watcher: FileWatcher | null = null;
afterEach(() => {
if (watcher !== null) {
watcher.close();
watcher = null;
}
});
it("detects nerve.yaml changes", async () => {
const root = makeTempNerveRoot();
const changes: FileChange[] = [];
watcher = createFileWatcher(root, (change) => changes.push(change), 50);
// Wait a bit for watcher to initialize, then modify nerve.yaml
await new Promise((r) => setTimeout(r, 100));
writeFileSync(join(root, "nerve.yaml"), "senses: {}\nreflexes: []\n# changed\n");
await waitFor(() => changes.length > 0, 3000);
expect(changes.length).toBeGreaterThanOrEqual(1);
expect(changes.some((c) => c.kind === "config")).toBe(true);
}, 10_000);
it("detects sense .ts file changes", async () => {
const root = makeTempNerveRoot();
const changes: FileChange[] = [];
watcher = createFileWatcher(root, (change) => changes.push(change), 50);
await new Promise((r) => setTimeout(r, 100));
writeFileSync(
join(root, "senses", "cpu-usage", "index.ts"),
"export async function compute() { return 42; }",
);
await waitFor(() => changes.length > 0, 3000);
expect(changes.length).toBeGreaterThanOrEqual(1);
const senseChanges = changes.filter((c) => c.kind === "sense");
expect(senseChanges.length).toBeGreaterThanOrEqual(1);
expect((senseChanges[0] as { senseName: string }).senseName).toBe("cpu-usage");
}, 10_000);
it("close() stops the watcher", async () => {
const root = makeTempNerveRoot();
const changes: FileChange[] = [];
watcher = createFileWatcher(root, (change) => changes.push(change), 50);
await new Promise((r) => setTimeout(r, 100));
watcher.close();
watcher = null;
writeFileSync(join(root, "nerve.yaml"), "senses: {}\nreflexes: []\n# after close\n");
// Wait and verify no changes were captured
await new Promise((r) => setTimeout(r, 500));
expect(changes.length).toBe(0);
}, 5_000);
it("debounces rapid changes", async () => {
const root = makeTempNerveRoot();
const changes: FileChange[] = [];
watcher = createFileWatcher(root, (change) => changes.push(change), 200);
await new Promise((r) => setTimeout(r, 100));
// Write rapidly
for (let i = 0; i < 5; i++) {
writeFileSync(join(root, "nerve.yaml"), `senses: {}\nreflexes: []\n# v${i}\n`);
}
await waitFor(() => changes.length > 0, 3000);
// With debounce, should see only 1 change (not 5)
expect(changes.length).toBe(1);
}, 10_000);
});
@@ -0,0 +1,36 @@
/**
* Mock worker that crashes on first compute, then works normally after respawn.
* Uses a marker file to track if it has already crashed.
*
* First invocation: sends ready, then exits with code 1 on first compute.
* Subsequent invocations (after respawn): works like normal mock-worker.
*/
import { existsSync, unlinkSync, writeFileSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
const markerFile = join(tmpdir(), `nerve-crash-once-${process.ppid}.marker`);
process.send({ type: "ready" });
const hasCrashed = existsSync(markerFile);
process.on("message", (msg) => {
if (!msg || typeof msg !== "object") return;
if (msg.type === "shutdown") {
try {
unlinkSync(markerFile);
} catch {}
process.exit(0);
}
if (msg.type === "compute" && typeof msg.sense === "string") {
if (!hasCrashed) {
writeFileSync(markerFile, "crashed", "utf8");
process.exit(1);
}
process.send({ type: "signal", sense: msg.sense, payload: 42 });
}
});
@@ -0,0 +1,18 @@
/**
* Mock worker that responds to compute with an error message.
* Used for error isolation integration tests.
*/
process.send({ type: "ready" });
process.on("message", (msg) => {
if (!msg || typeof msg !== "object") return;
if (msg.type === "shutdown") {
process.exit(0);
}
if (msg.type === "compute" && typeof msg.sense === "string") {
process.send({ type: "error", sense: msg.sense, error: "simulated compute error" });
}
});
@@ -0,0 +1,24 @@
/**
* Mock worker that takes a long time to respond to compute messages.
* Used for grace period / timeout integration tests.
*
* On compute: waits 10 seconds before responding (simulates hung compute).
* On shutdown: exits cleanly.
*/
process.send({ type: "ready" });
process.on("message", (msg) => {
if (!msg || typeof msg !== "object") return;
if (msg.type === "shutdown") {
process.exit(0);
}
if (msg.type === "compute" && typeof msg.sense === "string") {
// Intentionally slow — will be killed by grace period
setTimeout(() => {
process.send({ type: "signal", sense: msg.sense, payload: "late" });
}, 10_000);
}
});
@@ -0,0 +1,353 @@
/**
* Phase 3 — Hot reload tests.
*
* Verifies that:
* - drainAndRespawn() sends shutdown, waits for exit, then respawns the worker
* - Kernel dispatches handleWorkflowFileChange when file-watcher emits a workflow change
* - Kernel logs a workflow_reload system event on hot reload
* - drainAndRespawn on a non-existent worker is a no-op
* - drainAndRespawn after the drain sends a fresh worker (not crash-recovery)
*/
import { EventEmitter } from "node:events";
import type { NerveConfig, WorkflowConfig } from "@uncaged/nerve-core";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
type MockChild = EventEmitter & {
send: ReturnType<typeof vi.fn>;
kill: ReturnType<typeof vi.fn>;
connected: boolean;
exitCode: number | null;
pid: number;
};
const mockChildren: MockChild[] = [];
function makeMockChild(pid = 1): MockChild {
const child = new EventEmitter() as MockChild;
child.connected = true;
child.exitCode = null;
child.pid = pid;
child.send = vi.fn((msg: unknown) => {
if (
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "shutdown"
) {
setImmediate(() => {
child.exitCode = 0;
child.connected = false;
child.emit("exit", 0, null);
});
}
});
child.kill = vi.fn((_signal?: string) => {
child.exitCode = 1;
child.connected = false;
child.emit("exit", null, _signal ?? "SIGKILL");
});
return child;
}
vi.mock("node:child_process", () => ({
fork: vi.fn((_script: string, _args: string[], _opts: unknown) => {
const child = makeMockChild(mockChildren.length + 1);
mockChildren.push(child);
return child;
}),
}));
const { createWorkflowManager } = await import("../workflow-manager.js");
const { createKernel } = await import("../kernel.js");
function makeWfConfig(workflows: Record<string, WorkflowConfig> = {}): NerveConfig {
return { senses: {}, reflexes: [], workflows };
}
function makeLogStore() {
return {
append: vi.fn(),
query: vi.fn(() => []),
getMeta: vi.fn(() => null),
setMeta: vi.fn(),
upsertWorkflowRun: vi.fn(),
appendWithWorkflowUpdate: vi.fn(),
getWorkflowRun: vi.fn(() => null),
getActiveWorkflowRuns: vi.fn(() => []),
getTriggerPayload: vi.fn(() => null),
getThreadEvents: vi.fn(() => []),
close: vi.fn(),
};
}
describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => {
beforeEach(() => {
mockChildren.length = 0;
vi.useFakeTimers({ shouldAdvanceTime: true });
});
afterEach(() => {
vi.useRealTimers();
vi.clearAllMocks();
});
it("drainAndRespawn does NOT respawn when workflow is removed from config", async () => {
const logStore = makeLogStore();
const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } });
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", {});
expect(mockChildren).toHaveLength(1);
// Remove workflow from config before drain completes
mgr.updateConfig(makeWfConfig({}));
const drainPromise = mgr.drainAndRespawn("my-wf", 5000);
await vi.runAllTimersAsync();
await drainPromise;
// No new worker should have been spawned (workflow was removed)
expect(mockChildren).toHaveLength(1);
});
it("drainAndRespawn marks in-flight runs as interrupted in DB", async () => {
const logStore = makeLogStore();
const config = makeWfConfig({ "my-wf": { concurrency: 2, overflow: "drop" } });
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", { n: 1 });
mgr.startWorkflow("my-wf", { n: 2 });
expect(mgr.activeCount("my-wf")).toBe(2);
const drainPromise = mgr.drainAndRespawn("my-wf", 5000);
await vi.runAllTimersAsync();
await drainPromise;
const interruptedCalls = logStore.upsertWorkflowRun.mock.calls.filter(
([entry]: [{ type: string }]) => entry.type === "interrupted",
);
expect(interruptedCalls).toHaveLength(2);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
it("drainAndRespawn on an unknown workflow (no worker) resolves immediately", async () => {
const logStore = makeLogStore();
const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } });
const mgr = createWorkflowManager("/nerve-root", config, logStore);
// No thread started — no worker spawned
await expect(mgr.drainAndRespawn("my-wf")).resolves.toBeUndefined();
});
it("drainAndRespawn sends shutdown to existing worker and waits for exit", async () => {
const logStore = makeLogStore();
const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } });
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", {});
expect(mockChildren).toHaveLength(1);
const drainPromise = mgr.drainAndRespawn("my-wf", 5000);
await vi.runAllTimersAsync();
await drainPromise;
const firstChild = mockChildren[0];
expect(firstChild.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" }));
});
it("drainAndRespawn spawns a fresh worker after the old one exits", async () => {
const logStore = makeLogStore();
const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } });
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", {});
expect(mockChildren).toHaveLength(1);
const drainPromise = mgr.drainAndRespawn("my-wf", 5000);
await vi.runAllTimersAsync();
await drainPromise;
// A new worker should have been spawned (not crash-recovery, just fresh)
expect(mockChildren).toHaveLength(2);
});
it("fresh worker after drainAndRespawn does NOT receive resume-thread messages", async () => {
const logStore = makeLogStore();
// Even if there are active runs in DB, after drain the worker should NOT get resume
const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } });
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", {});
const drainPromise = mgr.drainAndRespawn("my-wf", 5000);
await vi.runAllTimersAsync();
await drainPromise;
const newChild = mockChildren[1];
const resumeCalls = (newChild.send as ReturnType<typeof vi.fn>).mock.calls.filter(
([msg]: [unknown]) =>
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "resume-thread",
);
expect(resumeCalls).toHaveLength(0);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
it("new threads can be started on the fresh worker after drainAndRespawn", async () => {
const logStore = makeLogStore();
const config = makeWfConfig({ "my-wf": { concurrency: 1, overflow: "drop" } });
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-wf", { first: true });
const drainPromise = mgr.drainAndRespawn("my-wf", 5000);
await vi.runAllTimersAsync();
await drainPromise;
// Start a new thread on the fresh worker
mgr.startWorkflow("my-wf", { second: true });
const newChild = mockChildren[1];
const startCalls = (newChild.send as ReturnType<typeof vi.fn>).mock.calls.filter(
([msg]: [unknown]) =>
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "start-thread",
);
expect(startCalls).toHaveLength(1);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
});
describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
beforeEach(() => {
mockChildren.length = 0;
vi.useFakeTimers({ shouldAdvanceTime: true });
});
afterEach(async () => {
vi.useRealTimers();
vi.clearAllMocks();
});
it("handleWorkflowFileChange logs workflow_reload system event", async () => {
const logStore = makeLogStore();
const config: NerveConfig = {
senses: {},
reflexes: [{ kind: "workflow", workflow: "my-wf", on: null }],
workflows: { "my-wf": { concurrency: 1, overflow: "drop" } },
};
const kernel = createKernel(config, "/tmp/nerve-hot-reload-test", {
workerScript: "fake-worker.js",
logStore,
});
// Trigger a workflow thread so a worker is spawned
kernel.workflowManager.startWorkflow("my-wf", {});
// Manually call drainAndRespawn (simulating what kernel does on workflow file change)
const drainPromise = kernel.workflowManager.drainAndRespawn("my-wf", 1000);
await vi.runAllTimersAsync();
await drainPromise;
// Kernel's handleWorkflowFileChange should log a workflow_reload event
// We test this via the kernel itself
const appendCalls = logStore.append.mock.calls;
const startCall = appendCalls.find(([e]: [{ type: string }]) => e.type === "start");
expect(startCall).toBeDefined();
const stopPromise = kernel.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
it("reloadConfig drains worker for removed workflows", async () => {
const logStore = makeLogStore();
const initialConfig: NerveConfig = {
senses: {},
reflexes: [{ kind: "workflow", workflow: "old-wf", on: null }],
workflows: { "old-wf": { concurrency: 1, overflow: "drop" } },
};
const kernel = createKernel(initialConfig, "/tmp/nerve-hot-reload-test", {
workerScript: "fake-worker.js",
logStore,
});
// Spawn a worker for old-wf
kernel.workflowManager.startWorkflow("old-wf", {});
expect(mockChildren).toHaveLength(1);
// Reload config without old-wf
const newConfig: NerveConfig = {
senses: {},
reflexes: [],
workflows: null,
};
kernel.reloadConfig(newConfig);
await vi.runAllTimersAsync();
// The old worker should have received a shutdown (drain)
expect(mockChildren[0].send).toHaveBeenCalledWith(
expect.objectContaining({ type: "shutdown" }),
);
const stopPromise = kernel.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
it("reloadConfig updates concurrency/overflow without restarting worker", async () => {
const logStore = makeLogStore();
const initialConfig: NerveConfig = {
senses: {},
reflexes: [{ kind: "workflow", workflow: "my-wf", on: null }],
workflows: { "my-wf": { concurrency: 1, overflow: "drop" } },
};
const kernel = createKernel(initialConfig, "/tmp/nerve-hot-reload-test", {
workerScript: "fake-worker.js",
logStore,
});
kernel.workflowManager.startWorkflow("my-wf", {});
const workersBefore = mockChildren.length;
// Reload with updated concurrency — should NOT spawn a new workflow worker
const newConfig: NerveConfig = {
senses: {},
reflexes: [{ kind: "workflow", workflow: "my-wf", on: null }],
workflows: { "my-wf": { concurrency: 5, overflow: "queue", maxQueue: 50 } },
};
kernel.reloadConfig(newConfig);
// No extra workflow worker spawn (the config update is in-place)
// The worker count may increase if senses change, but the workflow worker should not be respawned
expect(mockChildren).toHaveLength(workersBefore);
// After reload, the new concurrency should be respected
expect(kernel.workflowManager.activeCount("my-wf")).toBe(1);
// Can now start up to 5 concurrent threads (previously only 1)
kernel.workflowManager.startWorkflow("my-wf", { n: 2 });
kernel.workflowManager.startWorkflow("my-wf", { n: 3 });
expect(kernel.workflowManager.activeCount("my-wf")).toBe(3);
const stopPromise = kernel.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
});
@@ -0,0 +1,241 @@
/**
* Unit tests for kernel Phase 6 enhancements — restartGroup, reloadConfig, getHealth.
* Uses mocked child_process.fork.
*/
import { EventEmitter } from "node:events";
import type { NerveConfig } from "@uncaged/nerve-core";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
// ---------------------------------------------------------------------------
// Mock child_process.fork
// ---------------------------------------------------------------------------
const mockChildren: MockChild[] = [];
type MockChild = EventEmitter & {
send: ReturnType<typeof vi.fn>;
kill: ReturnType<typeof vi.fn>;
pid: number;
connected: boolean;
};
function makeMockChild(pid = 1): MockChild {
const child = new EventEmitter() as MockChild;
child.connected = true;
child.send = vi.fn((msg: unknown) => {
if (
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "shutdown"
) {
setImmediate(() => {
child.connected = false;
child.emit("exit", 0, null);
});
}
});
child.kill = vi.fn((_signal?: string) => {
child.connected = false;
child.emit("exit", null, _signal ?? "SIGKILL");
});
child.pid = pid;
// Auto-emit ready so restartGroup's await resolves
setImmediate(() => {
child.emit("message", { type: "ready" });
});
return child;
}
vi.mock("node:child_process", () => ({
fork: vi.fn((_script: string, _args: string[], _opts: unknown) => {
const child = makeMockChild(mockChildren.length + 100);
mockChildren.push(child);
return child;
}),
}));
// Must also mock file-watcher since kernel imports it
vi.mock("../file-watcher.js", () => ({
createFileWatcher: vi.fn(() => ({ close: vi.fn() })),
}));
const { createKernel } = await import("../kernel.js");
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
return {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
...overrides,
};
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describe("kernel — getHealth", () => {
beforeEach(() => {
mockChildren.length = 0;
vi.useFakeTimers({ shouldAdvanceTime: true });
});
afterEach(() => {
vi.useRealTimers();
});
it("returns correct health shape", async () => {
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"disk-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
},
});
const kernel = createKernel(config, "/tmp/nerve-test");
const health = kernel.getHealth();
expect(health.activeSenses).toBe(3);
expect(health.activeGroups).toBe(2);
expect(health.uptime).toBeGreaterThanOrEqual(0);
expect(health.memoryUsage).toBeDefined();
expect(typeof health.memoryUsage.heapUsed).toBe("number");
await kernel.stop();
});
});
describe("kernel — restartGroup", () => {
beforeEach(() => {
mockChildren.length = 0;
vi.useFakeTimers({ shouldAdvanceTime: true });
});
afterEach(() => {
vi.useRealTimers();
});
it("sends shutdown to old worker and spawns new one", async () => {
const config = makeConfig();
const kernel = createKernel(config, "/tmp/nerve-test");
expect(mockChildren.length).toBe(1);
const oldChild = mockChildren[0];
const restartPromise = kernel.restartGroup("system");
// The shutdown message triggers exit in the mock
await restartPromise;
// A new child should have been spawned
expect(mockChildren.length).toBe(2);
// Old child got shutdown
expect(oldChild.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" }));
await kernel.stop();
});
it("restartGroup on unknown group does nothing", async () => {
const config = makeConfig();
const kernel = createKernel(config, "/tmp/nerve-test");
expect(mockChildren.length).toBe(1);
await kernel.restartGroup("nonexistent");
// No new child spawned
expect(mockChildren.length).toBe(1);
await kernel.stop();
});
});
describe("kernel — reloadConfig", () => {
beforeEach(() => {
mockChildren.length = 0;
vi.useFakeTimers({ shouldAdvanceTime: true });
});
afterEach(() => {
vi.useRealTimers();
});
it("adds new group worker when new sense group appears", async () => {
const config = makeConfig();
const kernel = createKernel(config, "/tmp/nerve-test");
expect(mockChildren.length).toBe(1); // only system group
expect(kernel.groups.has("network")).toBe(false);
kernel.reloadConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
});
expect(kernel.groups.has("network")).toBe(true);
expect(mockChildren.length).toBe(2); // system + network
await kernel.stop();
});
it("removes group worker when its senses are all removed", async () => {
const config: NerveConfig = {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
};
const kernel = createKernel(config, "/tmp/nerve-test");
expect(mockChildren.length).toBe(2);
expect(kernel.groups.has("network")).toBe(true);
const networkChild = mockChildren[1];
kernel.reloadConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
});
expect(kernel.groups.has("network")).toBe(false);
// Network child should have received shutdown
expect(networkChild.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" }));
await kernel.stop();
});
it("health reflects updated sense count after reloadConfig", async () => {
const config = makeConfig();
const kernel = createKernel(config, "/tmp/nerve-test");
expect(kernel.getHealth().activeSenses).toBe(1);
kernel.reloadConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"disk-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
});
expect(kernel.getHealth().activeSenses).toBe(2);
await kernel.stop();
});
});
@@ -0,0 +1,420 @@
/**
* Integration tests for Kernel + WorkflowManager integration.
*
* Verifies that sense signals trigger workflow runs via workflow reflexes,
* that workflow events are logged, that reloadConfig handles workflow changes,
* and that graceful shutdown stops workflow workers.
*
* Uses mocked child_process.fork to avoid real subprocesses.
*/
import { EventEmitter } from "node:events";
import type { NerveConfig } from "@uncaged/nerve-core";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
// ---------------------------------------------------------------------------
// Mock child_process.fork before importing kernel
// ---------------------------------------------------------------------------
type MockChild = EventEmitter & {
send: ReturnType<typeof vi.fn>;
kill: ReturnType<typeof vi.fn>;
connected: boolean;
exitCode: number | null;
pid: number;
};
const mockChildren: MockChild[] = [];
function makeMockChild(pid = 1): MockChild {
const child = new EventEmitter() as MockChild;
child.connected = true;
child.exitCode = null;
child.pid = pid;
child.send = vi.fn((msg: unknown) => {
if (
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "shutdown"
) {
setImmediate(() => {
child.exitCode = 0;
child.connected = false;
child.emit("exit", 0, null);
});
}
});
child.kill = vi.fn((_signal?: string) => {
child.exitCode = 1;
child.connected = false;
child.emit("exit", null, _signal ?? "SIGKILL");
});
return child;
}
vi.mock("node:child_process", () => ({
fork: vi.fn((_script: string, _args: string[], _opts: unknown) => {
const child = makeMockChild(mockChildren.length + 1);
mockChildren.push(child);
return child;
}),
}));
// Import after mock is set up
const { createKernel } = await import("../kernel.js");
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function makeLogStore() {
return {
append: vi.fn(),
query: vi.fn(() => []),
getMeta: vi.fn(() => null),
setMeta: vi.fn(),
upsertWorkflowRun: vi.fn(),
appendWithWorkflowUpdate: vi.fn(),
getWorkflowRun: vi.fn(() => null),
getActiveWorkflowRuns: vi.fn(() => []),
getTriggerPayload: vi.fn(() => null),
getThreadEvents: vi.fn(() => []),
close: vi.fn(),
};
}
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
return {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
...overrides,
};
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describe("kernel + workflowManager integration", () => {
beforeEach(() => {
mockChildren.length = 0;
vi.useFakeTimers({ shouldAdvanceTime: true });
});
afterEach(async () => {
vi.useRealTimers();
vi.clearAllMocks();
});
describe("sense signal triggers workflow via reflex", () => {
it("calls workflowManager.startWorkflow when a sense signal fires on a workflow reflex", async () => {
const logStore = makeLogStore();
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "workflow", workflow: "my-workflow", on: ["cpu-usage"] }],
workflows: { "my-workflow": { concurrency: 2, overflow: "drop" } },
});
const kernel = createKernel(config, "/tmp/nerve-test", {
workerScript: "fake-worker.js",
logStore,
});
// Emit a signal from "cpu-usage" on the bus
const { createSignalBus } = await import("../signal-bus.js");
void createSignalBus; // ensure import resolves
kernel.bus.emit({ id: 1, senseId: "cpu-usage", payload: { value: 80 }, ts: Date.now() });
// The workflow worker should be spawned (one for the sense group, one for workflow)
// The sense group worker is mockChildren[0]; the workflow worker is mockChildren[1]
// We need to check that a start-thread message was sent to the workflow worker
const workflowWorker = mockChildren.find((c) =>
(c.send as ReturnType<typeof vi.fn>).mock.calls.some(
([msg]: [unknown]) =>
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "start-thread",
),
);
expect(workflowWorker).toBeDefined();
const stopPromise = kernel.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
it("passes the signal payload as triggerPayload to the workflow", async () => {
const logStore = makeLogStore();
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "workflow", workflow: "alert-workflow", on: ["cpu-usage"] }],
workflows: { "alert-workflow": { concurrency: 1, overflow: "drop" } },
});
const kernel = createKernel(config, "/tmp/nerve-test", {
workerScript: "fake-worker.js",
logStore,
});
const payload = { level: "critical", value: 99 };
kernel.bus.emit({ id: 1, senseId: "cpu-usage", payload, ts: Date.now() });
// Find the start-thread call and verify triggerPayload
const startThreadCall = mockChildren
.flatMap((c) => (c.send as ReturnType<typeof vi.fn>).mock.calls as [unknown][])
.find(
([msg]) =>
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "start-thread",
);
expect(startThreadCall).toBeDefined();
expect(startThreadCall?.[0]).toMatchObject({
type: "start-thread",
workflow: "alert-workflow",
triggerPayload: payload,
});
const stopPromise = kernel.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
it("does not trigger workflow when signal senseId is not in 'on' list", async () => {
const logStore = makeLogStore();
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"disk-io": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "workflow", workflow: "my-workflow", on: ["disk-io"] }],
workflows: { "my-workflow": { concurrency: 1, overflow: "drop" } },
});
const kernel = createKernel(config, "/tmp/nerve-test", {
workerScript: "fake-worker.js",
logStore,
});
// Emit signal from cpu-usage — NOT in the workflow's "on" list
kernel.bus.emit({ id: 1, senseId: "cpu-usage", payload: 50, ts: Date.now() });
// No workflow worker should have been spawned (only the sense group worker)
const workflowWorkerSpawned = mockChildren.some((c) =>
(c.send as ReturnType<typeof vi.fn>).mock.calls.some(
([msg]: [unknown]) =>
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "start-thread",
),
);
expect(workflowWorkerSpawned).toBe(false);
const stopPromise = kernel.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
});
describe("workflow events are logged", () => {
it("logs a 'started' event when workflow thread is triggered", async () => {
const logStore = makeLogStore();
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "workflow", workflow: "log-test-workflow", on: ["cpu-usage"] }],
workflows: { "log-test-workflow": { concurrency: 2, overflow: "drop" } },
});
const kernel = createKernel(config, "/tmp/nerve-test", {
workerScript: "fake-worker.js",
logStore,
});
kernel.bus.emit({ id: 1, senseId: "cpu-usage", payload: null, ts: Date.now() });
expect(logStore.upsertWorkflowRun).toHaveBeenCalledWith(
expect.objectContaining({ source: "workflow", type: "started" }),
expect.objectContaining({ workflow: "log-test-workflow", status: "started" }),
);
const stopPromise = kernel.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
});
describe("reloadConfig handles workflow changes", () => {
it("new workflow reflexes are active after reloadConfig", async () => {
const logStore = makeLogStore();
const initialConfig = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
});
const kernel = createKernel(initialConfig, "/tmp/nerve-test", {
workerScript: "fake-worker.js",
logStore,
});
// Reload with a workflow reflex added
const newConfig: NerveConfig = {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "workflow", workflow: "new-workflow", on: ["cpu-usage"] }],
workflows: { "new-workflow": { concurrency: 1, overflow: "drop" } },
};
kernel.reloadConfig(newConfig);
// Now emit a signal — should trigger the new workflow
kernel.bus.emit({ id: 2, senseId: "cpu-usage", payload: "reload-test", ts: Date.now() });
const startThreadCall = mockChildren
.flatMap((c) => (c.send as ReturnType<typeof vi.fn>).mock.calls as [unknown][])
.find(
([msg]) =>
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "start-thread" &&
(msg as Record<string, unknown>).workflow === "new-workflow",
);
expect(startThreadCall).toBeDefined();
const stopPromise = kernel.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
it("old workflow reflexes are removed after reloadConfig", async () => {
const logStore = makeLogStore();
const initialConfig = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "workflow", workflow: "old-workflow", on: ["cpu-usage"] }],
workflows: { "old-workflow": { concurrency: 1, overflow: "drop" } },
});
const kernel = createKernel(initialConfig, "/tmp/nerve-test", {
workerScript: "fake-worker.js",
logStore,
});
// Reload with the workflow reflex removed
const newConfig: NerveConfig = {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
};
kernel.reloadConfig(newConfig);
// Clear send history
for (const c of mockChildren) {
(c.send as ReturnType<typeof vi.fn>).mockClear();
}
// Emit a signal — old-workflow should NOT be triggered
kernel.bus.emit({ id: 3, senseId: "cpu-usage", payload: "after-reload", ts: Date.now() });
const startThreadCall = mockChildren
.flatMap((c) => (c.send as ReturnType<typeof vi.fn>).mock.calls as [unknown][])
.find(
([msg]) =>
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "start-thread",
);
expect(startThreadCall).toBeUndefined();
const stopPromise = kernel.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
});
describe("graceful shutdown stops workflow workers", () => {
it("stop() resolves after workflow workers exit", async () => {
const logStore = makeLogStore();
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "workflow", workflow: "shutdown-test", on: ["cpu-usage"] }],
workflows: { "shutdown-test": { concurrency: 1, overflow: "drop" } },
});
const kernel = createKernel(config, "/tmp/nerve-test", {
workerScript: "fake-worker.js",
logStore,
});
// Trigger a workflow so a worker is spawned
kernel.bus.emit({ id: 1, senseId: "cpu-usage", payload: null, ts: Date.now() });
const stopPromise = kernel.stop();
await vi.runAllTimersAsync();
await expect(stopPromise).resolves.toBeUndefined();
});
it("workflowManager is exposed on kernel", () => {
const logStore = makeLogStore();
const config = makeConfig({
workflows: { "my-wf": { concurrency: 1, overflow: "drop" } },
});
const kernel = createKernel(config, "/tmp/nerve-test", {
workerScript: "fake-worker.js",
logStore,
});
expect(kernel.workflowManager).toBeDefined();
expect(typeof kernel.workflowManager.startWorkflow).toBe("function");
expect(typeof kernel.workflowManager.activeCount).toBe("function");
expect(typeof kernel.workflowManager.stop).toBe("function");
kernel.stop().catch(() => {});
});
it("getHealth includes activeWorkflows count", async () => {
const logStore = makeLogStore();
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "workflow", workflow: "health-wf", on: ["cpu-usage"] }],
workflows: { "health-wf": { concurrency: 2, overflow: "drop" } },
});
const kernel = createKernel(config, "/tmp/nerve-test", {
workerScript: "fake-worker.js",
logStore,
});
const health = kernel.getHealth();
expect(health).toHaveProperty("activeWorkflows");
expect(typeof health.activeWorkflows).toBe("number");
const stopPromise = kernel.stop();
await vi.runAllTimersAsync();
await stopPromise;
});
});
});
@@ -0,0 +1,198 @@
/**
* Phase 3 — LogStore crash recovery helpers tests.
*
* Tests for getThreadEvents() and getTriggerPayload().
*/
import { mkdtempSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { createLogStore } from "../log-store.js";
import type { LogStore } from "../log-store.js";
describe("LogStore — crash recovery helpers (Phase 3)", () => {
let tmpDir: string;
let store: LogStore;
beforeEach(() => {
tmpDir = mkdtempSync(join(tmpdir(), "nerve-cr-log-test-"));
store = createLogStore(join(tmpDir, "data", "logs.db"));
});
afterEach(() => {
store.close();
rmSync(tmpDir, { recursive: true, force: true });
});
describe("getTriggerPayload", () => {
it("returns null for an unknown runId", () => {
expect(store.getTriggerPayload("no-such-run")).toBeNull();
});
it("returns the triggerPayload stored in the 'started' log entry", () => {
const payload = { task: "build", repo: "myrepo" };
store.upsertWorkflowRun(
{
source: "workflow",
type: "started",
refId: "run-1",
payload: JSON.stringify({ triggerPayload: payload }),
ts: 1000,
},
{ runId: "run-1", workflow: "my-wf", status: "started", ts: 1000 },
);
const result = store.getTriggerPayload("run-1");
expect(result).toMatchObject(payload);
});
it("returns null when started log entry has no payload", () => {
store.upsertWorkflowRun(
{
source: "workflow",
type: "started",
refId: "run-2",
payload: null,
ts: 1000,
},
{ runId: "run-2", workflow: "my-wf", status: "started", ts: 1000 },
);
expect(store.getTriggerPayload("run-2")).toBeNull();
});
it("returns the payload from the first 'started' entry (earliest)", () => {
const payloadA = { trigger: "first" };
const payloadB = { trigger: "second" };
// Insert two started entries for the same run
store.append({
source: "workflow",
type: "started",
refId: "run-3",
payload: JSON.stringify({ triggerPayload: payloadA }),
ts: 100,
});
store.append({
source: "workflow",
type: "started",
refId: "run-3",
payload: JSON.stringify({ triggerPayload: payloadB }),
ts: 200,
});
const result = store.getTriggerPayload("run-3");
// Should return the first (earliest) started entry
expect(result).toMatchObject(payloadA);
});
});
describe("getThreadEvents", () => {
it("returns empty array for an unknown runId", () => {
expect(store.getThreadEvents("no-such-run")).toHaveLength(0);
});
it("returns CommandEvents in insertion order", () => {
const events = [
{ type: "thread_start", triggerPayload: {} },
{ type: "scan_complete", items: ["a", "b"] },
{ type: "process_done", count: 2 },
];
for (const event of events) {
store.append({
source: "workflow",
type: "thread_command_event",
refId: "run-4",
payload: JSON.stringify(event),
ts: Date.now(),
});
}
const result = store.getThreadEvents("run-4");
expect(result).toHaveLength(3);
expect(result[0].type).toBe("thread_start");
expect(result[1].type).toBe("scan_complete");
expect(result[2].type).toBe("process_done");
});
it("skips entries with null payload", () => {
store.append({
source: "workflow",
type: "thread_command_event",
refId: "run-5",
payload: null,
ts: 1000,
});
store.append({
source: "workflow",
type: "thread_command_event",
refId: "run-5",
payload: JSON.stringify({ type: "valid_event" }),
ts: 1001,
});
const result = store.getThreadEvents("run-5");
expect(result).toHaveLength(1);
expect(result[0].type).toBe("valid_event");
});
it("only returns thread_command_event entries (not other workflow log types)", () => {
// Insert a mix of workflow log types
store.upsertWorkflowRun(
{
source: "workflow",
type: "started",
refId: "run-6",
payload: JSON.stringify({ triggerPayload: {} }),
ts: 1000,
},
{ runId: "run-6", workflow: "my-wf", status: "started", ts: 1000 },
);
store.append({
source: "workflow",
type: "thread_command_event",
refId: "run-6",
payload: JSON.stringify({ type: "step_one" }),
ts: 1001,
});
store.append({
source: "workflow",
type: "step_complete",
refId: "run-6",
payload: JSON.stringify({ message: "done step" }),
ts: 1002,
});
const result = store.getThreadEvents("run-6");
expect(result).toHaveLength(1);
expect(result[0].type).toBe("step_one");
});
it("does not return events from a different runId", () => {
store.append({
source: "workflow",
type: "thread_command_event",
refId: "run-7",
payload: JSON.stringify({ type: "event_for_7" }),
ts: 1000,
});
store.append({
source: "workflow",
type: "thread_command_event",
refId: "run-8",
payload: JSON.stringify({ type: "event_for_8" }),
ts: 1001,
});
const result7 = store.getThreadEvents("run-7");
expect(result7).toHaveLength(1);
expect(result7[0].type).toBe("event_for_7");
const result8 = store.getThreadEvents("run-8");
expect(result8).toHaveLength(1);
expect(result8[0].type).toBe("event_for_8");
});
});
});
@@ -0,0 +1,117 @@
import { mkdtempSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
import type { NerveConfig, Signal } from "@uncaged/nerve-core";
import { createLogStore } from "../log-store.js";
import type { LogStore } from "../log-store.js";
import { createReflexScheduler } from "../reflex-scheduler.js";
import { createSignalBus } from "../signal-bus.js";
describe("LogStore + ReflexScheduler integration", () => {
let tmpDir: string;
let logStore: LogStore;
beforeEach(() => {
tmpDir = mkdtempSync(join(tmpdir(), "nerve-log-int-"));
logStore = createLogStore(join(tmpDir, "logs.db"));
});
afterEach(() => {
logStore.close();
rmSync(tmpDir, { recursive: true, force: true });
});
it("logs run_start when reflex triggers a compute", () => {
const config: NerveConfig = {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
workflows: null,
};
const bus = createSignalBus();
const triggered: string[] = [];
const scheduler = createReflexScheduler(config, bus, (name) => triggered.push(name), {
logStore,
});
const signal: Signal = { id: 1, senseId: "cpu-usage", payload: 42, ts: Date.now() };
bus.emit(signal);
const logs = logStore.query({ source: "reflex", type: "run_start" });
expect(logs).toHaveLength(1);
expect(logs[0].refId).toBe("cpu-usage");
expect(triggered).toHaveLength(1);
scheduler.stop();
});
it("interval reflex produces run_start logs", () => {
vi.useFakeTimers();
const config: NerveConfig = {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: 1000, on: null }],
workflows: null,
};
const bus = createSignalBus();
const ref: { scheduler: ReturnType<typeof createReflexScheduler> | null } = { scheduler: null };
const scheduler = createReflexScheduler(
config,
bus,
(name) => {
ref.scheduler?.onComputeComplete(name);
},
{ logStore },
);
ref.scheduler = scheduler;
vi.advanceTimersByTime(3000);
const logs = logStore.query({ source: "reflex", type: "run_start" });
expect(logs.length).toBeGreaterThanOrEqual(3);
expect(logs.every((l) => l.refId === "cpu-usage")).toBe(true);
scheduler.stop();
vi.useRealTimers();
});
it("logs cannot trigger reflexes (architectural constraint)", () => {
const config: NerveConfig = {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [{ kind: "sense", sense: "cpu-usage", interval: null, on: ["cpu-usage"] }],
workflows: null,
};
const bus = createSignalBus();
const triggered: string[] = [];
const scheduler = createReflexScheduler(
config,
bus,
(name) => {
triggered.push(name);
scheduler.onComputeComplete(name);
},
{ logStore },
);
logStore.append({
source: "reflex",
type: "run_complete",
refId: "cpu-usage",
payload: '{"v":99}',
ts: Date.now(),
});
// Writing to the log store should NOT trigger any reflex.
// Only bus.emit(signal) triggers reflexes.
expect(triggered).toHaveLength(0);
scheduler.stop();
});
});
@@ -0,0 +1,186 @@
/**
* Tests for workflow-related LogStore functionality (RFC-002 §6.2).
*/
import { mkdtempSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { createLogStore } from "../log-store.js";
import type { LogStore, WorkflowRun } from "../log-store.js";
describe("LogStore — workflow_runs", () => {
let tmpDir: string;
let store: LogStore;
beforeEach(() => {
tmpDir = mkdtempSync(join(tmpdir(), "nerve-wf-log-test-"));
store = createLogStore(join(tmpDir, "data", "logs.db"));
});
afterEach(() => {
store.close();
rmSync(tmpDir, { recursive: true, force: true });
});
describe("upsertWorkflowRun", () => {
it("writes a log entry and creates a workflow_runs row atomically", () => {
const run: WorkflowRun = {
runId: "run-1",
workflow: "cleanup",
status: "started",
ts: 1000,
};
const entry = store.upsertWorkflowRun(
{ source: "workflow", type: "started", refId: "run-1", payload: null, ts: 1000 },
run,
);
expect(entry.id).toBeGreaterThan(0);
expect(entry.source).toBe("workflow");
expect(entry.type).toBe("started");
const stored = store.getWorkflowRun("run-1");
expect(stored).not.toBeNull();
expect(stored?.runId).toBe("run-1");
expect(stored?.workflow).toBe("cleanup");
expect(stored?.status).toBe("started");
expect(stored?.ts).toBe(1000);
});
it("updates existing workflow_runs row on upsert (status transition)", () => {
store.upsertWorkflowRun(
{ source: "workflow", type: "started", refId: "run-2", payload: null, ts: 1000 },
{ runId: "run-2", workflow: "cleanup", status: "started", ts: 1000 },
);
store.upsertWorkflowRun(
{ source: "workflow", type: "completed", refId: "run-2", payload: null, ts: 2000 },
{ runId: "run-2", workflow: "cleanup", status: "completed", ts: 2000 },
);
const stored = store.getWorkflowRun("run-2");
expect(stored?.status).toBe("completed");
expect(stored?.ts).toBe(2000);
// Both log entries should be present (event sourcing)
const logs = store.query({ refId: "run-2" });
expect(logs).toHaveLength(2);
});
it("the log entries act as source of truth for event history", () => {
for (const [type, status, ts] of [
["queued", "queued", 1000],
["started", "started", 1001],
["step_complete", "started", 1002],
["completed", "completed", 1005],
] as const) {
store.upsertWorkflowRun(
{ source: "workflow", type, refId: "run-3", payload: null, ts },
{ runId: "run-3", workflow: "cleanup", status, ts },
);
}
const logs = store.query({ source: "workflow", refId: "run-3" });
expect(logs).toHaveLength(4);
expect(logs[0].type).toBe("queued");
expect(logs[3].type).toBe("completed");
});
});
describe("getWorkflowRun", () => {
it("returns null for a non-existent runId", () => {
expect(store.getWorkflowRun("no-such-run")).toBeNull();
});
it("returns the latest state after multiple upserts", () => {
store.upsertWorkflowRun(
{ source: "workflow", type: "queued", refId: "run-4", payload: null, ts: 100 },
{ runId: "run-4", workflow: "code-review", status: "queued", ts: 100 },
);
store.upsertWorkflowRun(
{ source: "workflow", type: "started", refId: "run-4", payload: null, ts: 200 },
{ runId: "run-4", workflow: "code-review", status: "started", ts: 200 },
);
const run = store.getWorkflowRun("run-4");
expect(run?.status).toBe("started");
expect(run?.ts).toBe(200);
});
});
describe("getActiveWorkflowRuns", () => {
beforeEach(() => {
store.upsertWorkflowRun(
{ source: "workflow", type: "queued", refId: "r1", payload: null, ts: 100 },
{ runId: "r1", workflow: "cleanup", status: "queued", ts: 100 },
);
store.upsertWorkflowRun(
{ source: "workflow", type: "started", refId: "r2", payload: null, ts: 200 },
{ runId: "r2", workflow: "cleanup", status: "started", ts: 200 },
);
store.upsertWorkflowRun(
{ source: "workflow", type: "completed", refId: "r3", payload: null, ts: 300 },
{ runId: "r3", workflow: "cleanup", status: "completed", ts: 300 },
);
store.upsertWorkflowRun(
{ source: "workflow", type: "failed", refId: "r4", payload: null, ts: 400 },
{ runId: "r4", workflow: "deploy", status: "queued", ts: 400 },
);
});
it("returns queued and started runs by default", () => {
const active = store.getActiveWorkflowRuns();
expect(active).toHaveLength(3);
const ids = active.map((r) => r.runId);
expect(ids).toContain("r1");
expect(ids).toContain("r2");
expect(ids).toContain("r4");
});
it("excludes completed and failed runs", () => {
const active = store.getActiveWorkflowRuns();
const ids = active.map((r) => r.runId);
expect(ids).not.toContain("r3");
});
it("filters by workflow name", () => {
const cleanupRuns = store.getActiveWorkflowRuns("cleanup");
expect(cleanupRuns).toHaveLength(2);
const ids = cleanupRuns.map((r) => r.runId);
expect(ids).toContain("r1");
expect(ids).toContain("r2");
});
it("returns only matching workflow when name given", () => {
const deployRuns = store.getActiveWorkflowRuns("deploy");
expect(deployRuns).toHaveLength(1);
expect(deployRuns[0].runId).toBe("r4");
});
it("returns empty array when workflow name matches no active runs", () => {
expect(store.getActiveWorkflowRuns("nonexistent")).toHaveLength(0);
});
it("returns runs ordered by ts ascending", () => {
const active = store.getActiveWorkflowRuns();
expect(active[0].ts).toBeLessThan(active[1].ts);
});
});
describe("all statuses are storable", () => {
it.each(["queued", "started", "completed", "failed", "crashed", "dropped"] as const)(
"stores status=%s",
(status) => {
const runId = `run-${status}`;
store.upsertWorkflowRun(
{ source: "workflow", type: status, refId: runId, payload: null, ts: 1 },
{ runId, workflow: "test", status, ts: 1 },
);
expect(store.getWorkflowRun(runId)?.status).toBe(status);
},
);
});
});
@@ -0,0 +1,214 @@
import { mkdtempSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { createLogStore } from "../log-store.js";
import type { LogStore } from "../log-store.js";
describe("LogStore", () => {
let tmpDir: string;
let store: LogStore;
beforeEach(() => {
tmpDir = mkdtempSync(join(tmpdir(), "nerve-log-test-"));
store = createLogStore(join(tmpDir, "data", "logs.db"));
});
afterEach(() => {
store.close();
rmSync(tmpDir, { recursive: true, force: true });
});
describe("append + query", () => {
it("appends an entry and returns it with an id", () => {
const entry = store.append({
source: "system",
type: "start",
refId: null,
payload: null,
ts: 1000,
});
expect(entry.id).toBe(1);
expect(entry.source).toBe("system");
expect(entry.type).toBe("start");
});
it("auto-increments ids", () => {
const e1 = store.append({
source: "system",
type: "start",
refId: null,
payload: null,
ts: 1000,
});
const e2 = store.append({
source: "system",
type: "stop",
refId: null,
payload: null,
ts: 2000,
});
expect(e2.id).toBe((e1.id ?? 0) + 1);
});
it("returns all entries when queried with no filter", () => {
store.append({ source: "system", type: "start", refId: null, payload: null, ts: 1000 });
store.append({ source: "reflex", type: "run_start", refId: "cpu", payload: null, ts: 2000 });
store.append({
source: "reflex",
type: "run_complete",
refId: "cpu",
payload: '{"v":42}',
ts: 3000,
});
const all = store.query();
expect(all).toHaveLength(3);
});
});
describe("query filters", () => {
beforeEach(() => {
store.append({ source: "system", type: "start", refId: null, payload: null, ts: 1000 });
store.append({ source: "reflex", type: "run_start", refId: "cpu", payload: null, ts: 2000 });
store.append({
source: "reflex",
type: "run_complete",
refId: "cpu",
payload: '{"v":42}',
ts: 3000,
});
store.append({
source: "system",
type: "error",
refId: "disk",
payload: '{"error":"fail"}',
ts: 4000,
});
store.append({ source: "system", type: "stop", refId: null, payload: null, ts: 5000 });
});
it("filters by source", () => {
const results = store.query({ source: "reflex" });
expect(results).toHaveLength(2);
expect(results.every((r) => r.source === "reflex")).toBe(true);
});
it("filters by type", () => {
const results = store.query({ type: "error" });
expect(results).toHaveLength(1);
expect(results[0].refId).toBe("disk");
});
it("filters by refId", () => {
const results = store.query({ refId: "cpu" });
expect(results).toHaveLength(2);
});
it("filters by since (inclusive)", () => {
const results = store.query({ since: 3000 });
expect(results).toHaveLength(3);
expect(results[0].ts).toBe(3000);
});
it("filters by until (inclusive)", () => {
const results = store.query({ until: 2000 });
expect(results).toHaveLength(2);
});
it("filters by since + until range", () => {
const results = store.query({ since: 2000, until: 4000 });
expect(results).toHaveLength(3);
});
it("applies limit", () => {
const results = store.query({ limit: 2 });
expect(results).toHaveLength(2);
expect(results[0].type).toBe("start");
expect(results[1].type).toBe("run_start");
});
it("combines multiple filters", () => {
const results = store.query({ source: "system", since: 4000 });
expect(results).toHaveLength(2);
expect(results[0].type).toBe("error");
expect(results[1].type).toBe("stop");
});
it("returns empty array when no matches", () => {
const results = store.query({ source: "workflow" });
expect(results).toHaveLength(0);
});
});
describe("query ordering", () => {
it("returns entries in insertion order (ascending id)", () => {
store.append({ source: "system", type: "start", refId: null, payload: null, ts: 5000 });
store.append({ source: "reflex", type: "run_start", refId: "a", payload: null, ts: 1000 });
const all = store.query();
expect(all[0].ts).toBe(5000);
expect(all[1].ts).toBe(1000);
});
});
describe("meta table", () => {
it("returns null for nonexistent key", () => {
expect(store.getMeta("missing")).toBeNull();
});
it("sets and gets a value", () => {
store.setMeta("archived_up_to", "2026-03-22");
expect(store.getMeta("archived_up_to")).toBe("2026-03-22");
});
it("upserts on duplicate key", () => {
store.setMeta("version", "1");
store.setMeta("version", "2");
expect(store.getMeta("version")).toBe("2");
});
it("supports multiple keys", () => {
store.setMeta("a", "1");
store.setMeta("b", "2");
expect(store.getMeta("a")).toBe("1");
expect(store.getMeta("b")).toBe("2");
});
});
describe("append-only semantics", () => {
it("ids are always increasing", () => {
const entries = Array.from({ length: 10 }, (_, i) =>
store.append({ source: "system", type: "test", refId: null, payload: null, ts: i }),
);
for (let i = 1; i < entries.length; i++) {
expect(entries[i].id).toBeGreaterThan(entries[i - 1].id ?? 0);
}
});
});
describe("payload JSON round-trip", () => {
it("preserves JSON payload", () => {
const payload = JSON.stringify({ cpu: 95, host: "node-1" });
store.append({ source: "reflex", type: "run_complete", refId: "cpu", payload, ts: 1000 });
const results = store.query({ refId: "cpu" });
expect(results).toHaveLength(1);
expect(JSON.parse(results[0].payload ?? "null")).toEqual({ cpu: 95, host: "node-1" });
});
});
describe("creates parent directories", () => {
it("creates nested directory structure for db path", () => {
const deepPath = join(tmpDir, "a", "b", "c", "test.db");
const deepStore = createLogStore(deepPath);
deepStore.append({ source: "system", type: "start", refId: null, payload: null, ts: 1000 });
expect(deepStore.query()).toHaveLength(1);
deepStore.close();
});
});
});
@@ -0,0 +1,361 @@
/**
* Phase 6 integration tests — hot reload, error isolation, grace period, health.
*/
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import type { Signal } from "@uncaged/nerve-core";
import type { NerveConfig } from "@uncaged/nerve-core";
import { afterEach, describe, expect, it } from "vitest";
import { createKernel } from "../kernel.js";
import type { Kernel } from "../kernel.js";
const __filename = fileURLToPath(import.meta.url);
const __dir = dirname(__filename);
const MOCK_WORKER = join(__dir, "fixtures", "mock-worker.mjs");
const ERROR_WORKER = join(__dir, "fixtures", "error-worker.mjs");
function makeConfig(overrides: Partial<NerveConfig> = {}): NerveConfig {
return {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
...overrides,
};
}
async function pollUntil(
predicate: () => boolean,
timeoutMs: number,
intervalMs = 50,
): Promise<void> {
return new Promise((resolve, reject) => {
const timer = setTimeout(
() => reject(new Error(`pollUntil timed out after ${timeoutMs}ms`)),
timeoutMs,
);
const check = setInterval(() => {
if (predicate()) {
clearTimeout(timer);
clearInterval(check);
resolve();
}
}, intervalMs);
});
}
// ---------------------------------------------------------------------------
// Hot Reload — restartGroup
// ---------------------------------------------------------------------------
describe("phase6 — restartGroup", () => {
let kernel: Kernel | null = null;
afterEach(async () => {
if (kernel !== null) {
await kernel.stop();
kernel = null;
}
});
it("restartGroup stops old worker and spawns a new one", async () => {
const config = makeConfig();
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
workerScript: MOCK_WORKER,
});
await kernel.ready;
const oldPid = kernel.getWorkerPid("system");
expect(oldPid).not.toBeNull();
await kernel.restartGroup("system");
// Wait for new worker to become ready
await pollUntil(() => {
const newPid = kernel?.getWorkerPid("system");
return newPid !== null && newPid !== oldPid;
}, 5000);
const newPid = kernel.getWorkerPid("system");
expect(newPid).not.toBeNull();
expect(newPid).not.toBe(oldPid);
// Verify new worker is functional
const received: Signal[] = [];
const unsub = kernel.bus.subscribe((s) => received.push(s));
kernel.triggerCompute("cpu-usage");
await pollUntil(() => received.length > 0, 3000);
expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
unsub();
}, 15_000);
it("restartGroup on nonexistent group does nothing", async () => {
const config = makeConfig();
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
workerScript: MOCK_WORKER,
});
await kernel.ready;
// Should not throw
await kernel.restartGroup("nonexistent");
}, 5_000);
});
// ---------------------------------------------------------------------------
// Hot Reload — reloadConfig
// ---------------------------------------------------------------------------
describe("phase6 — reloadConfig", () => {
let kernel: Kernel | null = null;
afterEach(async () => {
if (kernel !== null) {
await kernel.stop();
kernel = null;
}
});
it("adds new group when new sense group is introduced", async () => {
const config = makeConfig();
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
workerScript: MOCK_WORKER,
});
await kernel.ready;
expect(kernel.groups.has("network")).toBe(false);
const newConfig: NerveConfig = {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
};
kernel.reloadConfig(newConfig);
expect(kernel.groups.has("network")).toBe(true);
// Wait for the new network worker to start
await pollUntil(() => kernel?.getWorkerPid("network") !== null, 3000);
expect(kernel.getWorkerPid("network")).not.toBeNull();
}, 10_000);
it("removes group when all its senses are removed", async () => {
const config: NerveConfig = {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
};
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
workerScript: MOCK_WORKER,
});
await kernel.ready;
expect(kernel.groups.has("network")).toBe(true);
const newConfig: NerveConfig = {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
};
kernel.reloadConfig(newConfig);
expect(kernel.groups.has("network")).toBe(false);
}, 10_000);
});
// ---------------------------------------------------------------------------
// Error Isolation
// ---------------------------------------------------------------------------
describe("phase6 — error isolation", () => {
let kernel: Kernel | null = null;
afterEach(async () => {
if (kernel !== null) {
await kernel.stop();
kernel = null;
}
});
it("error from one sense does not crash the worker — other senses still work", async () => {
const config: NerveConfig = {
senses: {
"good-sense": { group: "mixed", throttle: null, timeout: null, gracePeriod: null },
"bad-sense": { group: "mixed", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
};
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
workerScript: MOCK_WORKER,
});
await kernel.ready;
// Both senses go through the same worker (mock-worker responds to all compute with signal)
const received: Signal[] = [];
const unsub = kernel.bus.subscribe((s) => received.push(s));
kernel.triggerCompute("good-sense");
await pollUntil(() => received.length > 0, 3000);
expect(received[0]).toMatchObject({ senseId: "good-sense" });
kernel.triggerCompute("bad-sense");
await pollUntil(() => received.length > 1, 3000);
expect(received[1]).toMatchObject({ senseId: "bad-sense" });
unsub();
}, 10_000);
it("error worker sends error messages, kernel still running", async () => {
const stderrMessages: string[] = [];
const stderrSpy = ((original) => {
return (chunk: string | Uint8Array) => {
stderrMessages.push(String(chunk));
return original.call(process.stderr, chunk);
};
})(process.stderr.write);
const origWrite = process.stderr.write;
process.stderr.write = stderrSpy as typeof process.stderr.write;
const config = makeConfig();
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
workerScript: ERROR_WORKER,
});
await kernel.ready;
kernel.triggerCompute("cpu-usage");
// Wait for the error to be logged
await pollUntil(() => stderrMessages.some((m) => m.includes("simulated compute error")), 3000);
process.stderr.write = origWrite;
// Kernel should still be running (not crashed)
expect(kernel.getWorkerPid("system")).not.toBeNull();
}, 10_000);
});
// ---------------------------------------------------------------------------
// getHealth
// ---------------------------------------------------------------------------
describe("phase6 — getHealth", () => {
let kernel: Kernel | null = null;
afterEach(async () => {
if (kernel !== null) {
await kernel.stop();
kernel = null;
}
});
it("returns health snapshot with correct shape", async () => {
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"disk-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
},
});
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
workerScript: MOCK_WORKER,
});
await kernel.ready;
const health = kernel.getHealth();
expect(health.uptime).toBeGreaterThanOrEqual(0);
expect(health.activeSenses).toBe(3);
expect(health.activeGroups).toBe(2);
expect(health.memoryUsage).toBeDefined();
expect(typeof health.memoryUsage.heapUsed).toBe("number");
}, 10_000);
it("health reflects config changes after reloadConfig", async () => {
const config = makeConfig();
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
workerScript: MOCK_WORKER,
});
await kernel.ready;
expect(kernel.getHealth().activeSenses).toBe(1);
const newConfig: NerveConfig = {
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
"net-rx": { group: "network", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
workflows: null,
};
kernel.reloadConfig(newConfig);
expect(kernel.getHealth().activeSenses).toBe(2);
expect(kernel.getHealth().activeGroups).toBe(2);
}, 10_000);
});
// ---------------------------------------------------------------------------
// Auto-respawn on crash (existing test extended)
// ---------------------------------------------------------------------------
describe("phase6 — auto-respawn on worker crash", () => {
let kernel: Kernel | null = null;
afterEach(async () => {
if (kernel !== null) {
await kernel.stop();
kernel = null;
}
});
it("kernel auto-respawns worker and new worker is functional", async () => {
const config = makeConfig();
kernel = createKernel(config, "/tmp/nerve-phase6-test", {
workerScript: MOCK_WORKER,
});
await kernel.ready;
const originalPid = kernel.getWorkerPid("system");
expect(originalPid).not.toBeNull();
// Kill worker to simulate crash
process.kill(originalPid as number, "SIGKILL");
// Wait for respawn
await pollUntil(() => {
const pid = kernel?.getWorkerPid("system");
return pid !== null && pid !== originalPid;
}, 5000);
const newPid = kernel.getWorkerPid("system");
expect(newPid).not.toBeNull();
expect(newPid).not.toBe(originalPid);
// Verify new worker responds
const received: Signal[] = [];
const unsub = kernel.bus.subscribe((s) => received.push(s));
kernel.triggerCompute("cpu-usage");
await pollUntil(() => received.length > 0, 5000);
expect(received[0]).toMatchObject({ senseId: "cpu-usage", payload: 42 });
unsub();
await kernel.stop();
kernel = null;
}, 15_000);
});
@@ -0,0 +1,364 @@
/**
* Tests for WorkflowManager — concurrency, overflow, queue, and lifecycle.
*/
import { EventEmitter } from "node:events";
import type { NerveConfig } from "@uncaged/nerve-core";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
// ---------------------------------------------------------------------------
// Mock child_process.fork before importing workflow-manager
// ---------------------------------------------------------------------------
type MockChild = EventEmitter & {
send: ReturnType<typeof vi.fn>;
kill: ReturnType<typeof vi.fn>;
connected: boolean;
exitCode: number | null;
pid: number;
};
const mockChildren: MockChild[] = [];
function makeMockChild(pid = 1): MockChild {
const child = new EventEmitter() as MockChild;
child.connected = true;
child.exitCode = null;
child.pid = pid;
child.send = vi.fn((msg: unknown) => {
if (
msg !== null &&
typeof msg === "object" &&
(msg as Record<string, unknown>).type === "shutdown"
) {
setImmediate(() => {
child.exitCode = 0;
child.connected = false;
child.emit("exit", 0, null);
});
}
});
child.kill = vi.fn((_signal?: string) => {
child.exitCode = 1;
child.connected = false;
child.emit("exit", null, _signal ?? "SIGKILL");
});
return child;
}
vi.mock("node:child_process", () => ({
fork: vi.fn((_script: string, _args: string[], _opts: unknown) => {
const child = makeMockChild(mockChildren.length + 1);
mockChildren.push(child);
return child;
}),
}));
// Import after mock is set up
const { createWorkflowManager } = await import("../workflow-manager.js");
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function makeLogStore() {
return {
append: vi.fn(),
query: vi.fn(() => []),
getMeta: vi.fn(() => null),
setMeta: vi.fn(),
upsertWorkflowRun: vi.fn(),
appendWithWorkflowUpdate: vi.fn(),
getWorkflowRun: vi.fn(() => null),
getActiveWorkflowRuns: vi.fn(() => []),
getTriggerPayload: vi.fn(() => null),
getThreadEvents: vi.fn(() => []),
close: vi.fn(),
};
}
function makeConfig(overrides: Partial<NerveConfig["workflows"]> = {}): NerveConfig {
return {
senses: {},
reflexes: [],
workflows: overrides as NerveConfig["workflows"],
};
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describe("WorkflowManager", () => {
beforeEach(() => {
mockChildren.length = 0;
vi.useFakeTimers({ shouldAdvanceTime: true });
});
afterEach(() => {
vi.useRealTimers();
vi.clearAllMocks();
});
describe("startWorkflow under concurrency limit dispatches thread", () => {
it("forks a worker and sends start-thread when active < concurrency", () => {
const logStore = makeLogStore();
const config = makeConfig({
"my-workflow": { concurrency: 2, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-workflow", { event: "test" });
expect(mockChildren).toHaveLength(1);
expect(mockChildren[0].send).toHaveBeenCalledWith(
expect.objectContaining({ type: "start-thread", workflow: "my-workflow" }),
);
expect(mgr.activeCount("my-workflow")).toBe(1);
});
it("reuses the same worker for a second thread under the limit", () => {
const logStore = makeLogStore();
const config = makeConfig({
"my-workflow": { concurrency: 3, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-workflow", { n: 1 });
mgr.startWorkflow("my-workflow", { n: 2 });
// Only one forked child — worker is reused
expect(mockChildren).toHaveLength(1);
expect(mockChildren[0].send).toHaveBeenCalledTimes(2);
expect(mgr.activeCount("my-workflow")).toBe(2);
});
it("logs a 'started' event for each dispatched thread", () => {
const logStore = makeLogStore();
const config = makeConfig({
"my-workflow": { concurrency: 2, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("my-workflow", { x: 1 });
expect(logStore.upsertWorkflowRun).toHaveBeenCalledWith(
expect.objectContaining({ source: "workflow", type: "started" }),
expect.objectContaining({ workflow: "my-workflow", status: "started" }),
);
});
});
describe("startWorkflow at limit with drop overflow drops the request", () => {
it("does NOT send start-thread when at concurrency limit with overflow=drop", () => {
const logStore = makeLogStore();
const config = makeConfig({
"drop-wf": { concurrency: 1, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("drop-wf", { first: true });
// now at limit — second call should be dropped
mgr.startWorkflow("drop-wf", { second: true });
expect(mgr.activeCount("drop-wf")).toBe(1);
expect(mgr.queueLength("drop-wf")).toBe(0);
// Only one start-thread sent
expect(mockChildren[0].send).toHaveBeenCalledTimes(1);
});
it("logs a 'dropped' event when overflow=drop", () => {
const logStore = makeLogStore();
const config = makeConfig({
"drop-wf": { concurrency: 1, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("drop-wf", {});
mgr.startWorkflow("drop-wf", {});
const droppedCall = logStore.upsertWorkflowRun.mock.calls.find(
([entry]) => entry.type === "dropped",
);
expect(droppedCall).toBeDefined();
});
});
describe("startWorkflow at limit with queue overflow queues the request", () => {
it("queues the thread when at concurrency limit with overflow=queue", () => {
const logStore = makeLogStore();
const config = makeConfig({
"queue-wf": { concurrency: 1, overflow: "queue", maxQueue: 5 },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("queue-wf", { first: true });
mgr.startWorkflow("queue-wf", { second: true });
expect(mgr.activeCount("queue-wf")).toBe(1);
expect(mgr.queueLength("queue-wf")).toBe(1);
});
it("logs a 'queued' event for the waiting thread", () => {
const logStore = makeLogStore();
const config = makeConfig({
"queue-wf": { concurrency: 1, overflow: "queue", maxQueue: 5 },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("queue-wf", {});
mgr.startWorkflow("queue-wf", {});
const queuedCall = logStore.upsertWorkflowRun.mock.calls.find(
([entry]) => entry.type === "queued",
);
expect(queuedCall).toBeDefined();
expect(queuedCall?.[1]).toMatchObject({ workflow: "queue-wf", status: "queued" });
});
});
describe("queue overflow with maxQueue drops oldest when full", () => {
it("drops oldest queued item when queue reaches maxQueue", () => {
const logStore = makeLogStore();
const config = makeConfig({
"queue-wf": { concurrency: 1, overflow: "queue", maxQueue: 2 },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
// Fill the concurrency slot
mgr.startWorkflow("queue-wf", { n: 0 });
// Fill the queue to maxQueue
mgr.startWorkflow("queue-wf", { n: 1 });
mgr.startWorkflow("queue-wf", { n: 2 });
// This one should push out { n: 1 }
mgr.startWorkflow("queue-wf", { n: 3 });
// Queue should still be at maxQueue (2)
expect(mgr.queueLength("queue-wf")).toBe(2);
// There should be a dropped event (for the oldest evicted item)
const droppedCalls = logStore.upsertWorkflowRun.mock.calls.filter(
([entry]) => entry.type === "dropped",
);
expect(droppedCalls).toHaveLength(1);
});
});
describe("completing a thread dequeues the next one", () => {
it("dispatches the next queued thread when the active thread sends completed", () => {
const logStore = makeLogStore();
const config = makeConfig({
"queue-wf": { concurrency: 1, overflow: "queue", maxQueue: 5 },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("queue-wf", { first: true });
mgr.startWorkflow("queue-wf", { second: true });
expect(mgr.activeCount("queue-wf")).toBe(1);
expect(mgr.queueLength("queue-wf")).toBe(1);
// Simulate the worker sending a "thread-event: completed"
const child = mockChildren[0];
const startCalls = (child.send as ReturnType<typeof vi.fn>).mock.calls;
const firstRunId = (startCalls[0][0] as { runId: string }).runId;
child.emit("message", {
type: "thread-event",
runId: firstRunId,
eventType: "completed",
payload: null,
});
// The queued thread should now be active
expect(mgr.activeCount("queue-wf")).toBe(1);
expect(mgr.queueLength("queue-wf")).toBe(0);
// A second start-thread message should have been sent
expect(child.send).toHaveBeenCalledTimes(2);
expect(child.send).toHaveBeenLastCalledWith(
expect.objectContaining({ type: "start-thread", workflow: "queue-wf" }),
);
});
it("dispatches next queued thread when active thread sends failed", () => {
const logStore = makeLogStore();
const config = makeConfig({
"queue-wf": { concurrency: 1, overflow: "queue", maxQueue: 5 },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("queue-wf", { first: true });
mgr.startWorkflow("queue-wf", { second: true });
const child = mockChildren[0];
const firstRunId = (child.send as ReturnType<typeof vi.fn>).mock.calls[0][0].runId as string;
child.emit("message", {
type: "thread-event",
runId: firstRunId,
eventType: "failed",
payload: { error: "boom" },
});
expect(mgr.activeCount("queue-wf")).toBe(1);
expect(mgr.queueLength("queue-wf")).toBe(0);
});
});
describe("stop() shuts down all workers", () => {
it("sends shutdown to every worker and resolves when they exit", async () => {
const logStore = makeLogStore();
const config = makeConfig({
"wf-a": { concurrency: 2, overflow: "drop" },
"wf-b": { concurrency: 1, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("wf-a", {});
mgr.startWorkflow("wf-b", {});
// Two distinct workers should have been forked
expect(mockChildren).toHaveLength(2);
const stopPromise = mgr.stop();
// Let setImmediate callbacks run (the mock exits on shutdown)
await vi.runAllTimersAsync();
await stopPromise;
for (const child of mockChildren) {
expect(child.send).toHaveBeenCalledWith(expect.objectContaining({ type: "shutdown" }));
}
});
it("ignores startWorkflow calls after stop()", async () => {
const logStore = makeLogStore();
const config = makeConfig({
"wf-a": { concurrency: 2, overflow: "drop" },
});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
const stopPromise = mgr.stop();
await vi.runAllTimersAsync();
await stopPromise;
mgr.startWorkflow("wf-a", {});
// No worker should have been spawned
expect(mockChildren).toHaveLength(0);
});
});
describe("unknown workflow name", () => {
it("does nothing for an unknown workflow name", () => {
const logStore = makeLogStore();
const config = makeConfig({});
const mgr = createWorkflowManager("/nerve-root", config, logStore);
mgr.startWorkflow("no-such-workflow", {});
expect(mockChildren).toHaveLength(0);
expect(logStore.upsertWorkflowRun).not.toHaveBeenCalled();
});
});
});
+130
View File
@@ -0,0 +1,130 @@
/**
* File Watcher — watches nerveRoot for file changes and signals the kernel.
*
* Uses Node.js fs.watch (no external dependencies).
*
* Watched events:
* - .ts file under senses/ modified → callback with { kind: "sense", senseName, filePath }
* - nerve.yaml modified → callback with { kind: "config", filePath }
*
* Debounces rapid changes (e.g. editor save flicker) with a configurable delay.
*
* Linux compatibility note: fs.watch with { recursive: true } relies on inotify, which may
* not reliably report the filename for events in deeply nested subdirectories. On Linux,
* the `filename` argument passed to the callback can be null for some inotify events, and
* recursive watching is only available in Node.js ≥22 on Linux. If null-filename events
* become a problem, consider using a dedicated watcher library (e.g. chokidar).
*/
import { watch } from "node:fs";
import type { FSWatcher } from "node:fs";
import { join, relative, sep } from "node:path";
export type SenseFileChange = {
kind: "sense";
senseName: string;
filePath: string;
};
export type ConfigFileChange = {
kind: "config";
filePath: string;
};
export type WorkflowFileChange = {
kind: "workflow";
workflowName: string;
filePath: string;
};
export type FileChange = SenseFileChange | ConfigFileChange | WorkflowFileChange;
export type FileChangeHandler = (change: FileChange) => void;
export type FileWatcher = {
close: () => void;
};
const DEFAULT_DEBOUNCE_MS = 300;
export function createFileWatcher(
nerveRoot: string,
handler: FileChangeHandler,
debounceMs: number = DEFAULT_DEBOUNCE_MS,
): FileWatcher {
const watchers: FSWatcher[] = [];
const debounceTimers = new Map<string, ReturnType<typeof setTimeout>>();
function debounced(key: string, fn: () => void): void {
const existing = debounceTimers.get(key);
if (existing !== undefined) clearTimeout(existing);
debounceTimers.set(
key,
setTimeout(() => {
debounceTimers.delete(key);
fn();
}, debounceMs),
);
}
function handleSenseChange(normalized: string, filename: string): void {
if (!(normalized.startsWith("senses/") && normalized.endsWith(".ts"))) return;
const rel = relative("senses", normalized);
const senseName = rel.split("/")[0];
if (senseName) {
debounced(`sense:${senseName}`, () => {
handler({ kind: "sense", senseName, filePath: join(nerveRoot, filename) });
});
}
}
function handleWorkflowChange(normalized: string, filename: string): void {
if (!(normalized.startsWith("workflows/") && normalized.endsWith(".ts"))) return;
const rel = relative("workflows", normalized);
const workflowName = rel.split("/")[0];
if (workflowName) {
debounced(`workflow:${workflowName}`, () => {
handler({ kind: "workflow", workflowName, filePath: join(nerveRoot, filename) });
});
}
}
function handleFsEvent(_eventType: string, filename: string | null): void {
if (filename === null) return;
const normalized = filename.split(sep).join("/");
if (normalized === "nerve.yaml") {
debounced("config", () => {
handler({ kind: "config", filePath: join(nerveRoot, "nerve.yaml") });
});
return;
}
handleSenseChange(normalized, filename);
handleWorkflowChange(normalized, filename);
}
try {
const w = watch(nerveRoot, { recursive: true }, (eventType, filename) => {
handleFsEvent(eventType, filename);
});
watchers.push(w);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[file-watcher] Failed to watch "${nerveRoot}": ${msg}\n`);
}
function close(): void {
for (const timer of debounceTimers.values()) {
clearTimeout(timer);
}
debounceTimers.clear();
for (const w of watchers) {
w.close();
}
watchers.length = 0;
}
return { close };
}
+16 -1
View File
@@ -1,11 +1,17 @@
export type {
ComputeMessage,
ShutdownMessage,
HealthRequestMessage,
HealthResponseMessage,
ParentToWorkerMessage,
SignalMessage,
ErrorMessage,
ReadyMessage,
WorkerToParentMessage,
StartThreadMessage,
ResumeThreadMessage,
ThreadEventMessage,
WorkflowErrorMessage,
} from "./ipc.js";
export type { SignalBus, SignalHandler, Unsubscribe } from "./signal-bus.js";
@@ -21,4 +27,13 @@ export {
} from "./sense-runtime.js";
export { createKernel } from "./kernel.js";
export type { Kernel, KernelOptions } from "./kernel.js";
export type { Kernel, KernelOptions, KernelHealth } from "./kernel.js";
export { createFileWatcher } from "./file-watcher.js";
export type { FileWatcher, FileChange, FileChangeHandler } from "./file-watcher.js";
export { createLogStore } from "./log-store.js";
export type { LogStore, LogEntry, LogQuery, WorkflowRun, WorkflowRunStatus } from "./log-store.js";
export { createWorkflowManager } from "./workflow-manager.js";
export type { WorkflowManager } from "./workflow-manager.js";
+220 -26
View File
@@ -17,8 +17,41 @@ export type ShutdownMessage = {
type: "shutdown";
};
/** Parent → Worker: request health info from worker */
export type HealthRequestMessage = {
type: "health-request";
};
// ---------------------------------------------------------------------------
// Workflow IPC messages (RFC-002 §5.2)
// ---------------------------------------------------------------------------
/** Parent → Workflow Worker: start a new thread */
export type StartThreadMessage = {
type: "start-thread";
runId: string;
workflow: string;
/** The trigger payload from the Reflex that initiated this thread. */
triggerPayload: unknown;
};
/** Parent → Workflow Worker: resume an existing thread after crash recovery */
export type ResumeThreadMessage = {
type: "resume-thread";
runId: string;
/** Serialised CommandEvent history to rebuild ThreadState. */
events: Array<{ type: string; [key: string]: unknown }>;
/** Serialised trigger payload (the same value as in the original start-thread). */
triggerPayload: unknown;
};
/** Union of all messages the parent sends to a worker */
export type ParentToWorkerMessage = ComputeMessage | ShutdownMessage;
export type ParentToWorkerMessage =
| ComputeMessage
| ShutdownMessage
| HealthRequestMessage
| StartThreadMessage
| ResumeThreadMessage;
/** Worker → Parent: compute produced a signal */
export type SignalMessage = {
@@ -39,8 +72,76 @@ export type ReadyMessage = {
type: "ready";
};
/** Worker → Parent: health info response */
export type HealthResponseMessage = {
type: "health-response";
senses: string[];
inFlightCount: number;
};
// ---------------------------------------------------------------------------
// Workflow Worker → Parent messages (RFC-002 §5.2)
// ---------------------------------------------------------------------------
/** Valid lifecycle event types for a workflow thread. */
export type ThreadEventType = "queued" | "started" | "step_complete" | "completed" | "failed";
/**
* Workflow Worker → Parent: a thread lifecycle event.
*/
export type ThreadEventMessage = {
type: "thread-event";
runId: string;
eventType: ThreadEventType;
payload: unknown;
};
/** Workflow Worker → Parent: a thread encountered an unrecoverable error. */
export type WorkflowErrorMessage = {
type: "workflow-error";
runId: string;
error: string;
};
/** Workflow Worker → Parent: a thread CommandEvent produced by a role (for crash recovery). */
export type ThreadCommandEventMessage = {
type: "thread-command-event";
runId: string;
/** The CommandEvent returned by role.execute() — will be persisted for crash recovery. */
event: { type: string; [key: string]: unknown };
};
/** Union of all messages a worker sends to the parent */
export type WorkerToParentMessage = SignalMessage | ErrorMessage | ReadyMessage;
export type WorkerToParentMessage =
| SignalMessage
| ErrorMessage
| ReadyMessage
| HealthResponseMessage
| ThreadEventMessage
| WorkflowErrorMessage
| ThreadCommandEventMessage;
const PARENT_MSG_TYPES = new Set([
"compute",
"shutdown",
"health-request",
"start-thread",
"resume-thread",
]);
function validateStartThreadMsg(obj: Record<string, unknown>): string | null {
if (typeof obj.runId !== "string") return "'start-thread' message missing string 'runId'";
if (typeof obj.workflow !== "string") return "'start-thread' message missing string 'workflow'";
if (!("triggerPayload" in obj)) return "'start-thread' message missing 'triggerPayload'";
return null;
}
function validateResumeThreadMsg(obj: Record<string, unknown>): string | null {
if (typeof obj.runId !== "string") return "'resume-thread' message missing string 'runId'";
if (!Array.isArray(obj.events)) return "'resume-thread' message missing 'events' array";
if (!("triggerPayload" in obj)) return "'resume-thread' message missing 'triggerPayload'";
return null;
}
/** Validate and parse an unknown IPC message received from the parent process. */
export function parseParentMessage(raw: unknown): Result<ParentToWorkerMessage> {
@@ -51,13 +152,119 @@ export function parseParentMessage(raw: unknown): Result<ParentToWorkerMessage>
if (typeof obj.type !== "string") {
return err(new Error("IPC message missing string 'type' field"));
}
const type = obj.type;
if (type !== "compute" && type !== "shutdown") {
return err(new Error(`Unknown IPC message type: "${type}"`));
if (!PARENT_MSG_TYPES.has(obj.type)) {
return err(new Error(`Unknown IPC message type: "${obj.type}"`));
}
if (obj.type === "start-thread") {
const errMsg = validateStartThreadMsg(obj);
if (errMsg !== null) return err(new Error(errMsg));
}
if (obj.type === "resume-thread") {
const errMsg = validateResumeThreadMsg(obj);
if (errMsg !== null) return err(new Error(errMsg));
}
return ok(raw as ParentToWorkerMessage);
}
function parseSignalMsg(obj: Record<string, unknown>, raw: unknown): Result<WorkerToParentMessage> {
if (typeof obj.sense !== "string") {
return err(new Error("Worker 'signal' message missing string 'sense' field"));
}
if (!("payload" in obj)) {
return err(new Error("Worker 'signal' message missing 'payload' field"));
}
return ok(raw as SignalMessage);
}
function parseErrorMsg(obj: Record<string, unknown>, raw: unknown): Result<WorkerToParentMessage> {
if (typeof obj.sense !== "string") {
return err(new Error("Worker 'error' message missing string 'sense' field"));
}
if (typeof obj.error !== "string") {
return err(new Error("Worker 'error' message missing string 'error' field"));
}
return ok(raw as ErrorMessage);
}
function parseHealthResponseMsg(
obj: Record<string, unknown>,
raw: unknown,
): Result<WorkerToParentMessage> {
if (!Array.isArray(obj.senses)) {
return err(new Error("Worker 'health-response' message missing 'senses' array"));
}
if (typeof obj.inFlightCount !== "number") {
return err(new Error("Worker 'health-response' message missing 'inFlightCount' number"));
}
return ok(raw as HealthResponseMessage);
}
const THREAD_EVENT_TYPES = new Set<string>([
"queued",
"started",
"step_complete",
"completed",
"failed",
]);
function parseThreadEventMsg(
obj: Record<string, unknown>,
raw: unknown,
): Result<WorkerToParentMessage> {
if (typeof obj.runId !== "string") {
return err(new Error("Worker 'thread-event' message missing string 'runId' field"));
}
if (typeof obj.eventType !== "string" || !THREAD_EVENT_TYPES.has(obj.eventType)) {
return err(
new Error(`Worker 'thread-event' message has invalid 'eventType': "${obj.eventType}"`),
);
}
if (!("payload" in obj)) {
return err(new Error("Worker 'thread-event' message missing 'payload' field"));
}
return ok(raw as ThreadEventMessage);
}
function parseWorkflowErrorMsg(
obj: Record<string, unknown>,
raw: unknown,
): Result<WorkerToParentMessage> {
if (typeof obj.runId !== "string") {
return err(new Error("Worker 'workflow-error' message missing string 'runId' field"));
}
if (typeof obj.error !== "string") {
return err(new Error("Worker 'workflow-error' message missing string 'error' field"));
}
return ok(raw as WorkflowErrorMessage);
}
const WORKER_MSG_TYPES = new Set([
"signal",
"error",
"ready",
"health-response",
"thread-event",
"workflow-error",
"thread-command-event",
]);
function parseThreadCommandEventMsg(
obj: Record<string, unknown>,
raw: unknown,
): Result<WorkerToParentMessage> {
if (typeof obj.runId !== "string") {
return err(new Error("Worker 'thread-command-event' message missing string 'runId' field"));
}
if (obj.event === null || typeof obj.event !== "object") {
return err(new Error("Worker 'thread-command-event' message missing object 'event' field"));
}
const event = obj.event as Record<string, unknown>;
if (typeof event.type !== "string") {
return err(new Error("Worker 'thread-command-event' event missing string 'type' field"));
}
return ok(raw as ThreadCommandEventMessage);
}
/** Validate and parse an unknown IPC message received from a worker process. */
export function parseWorkerMessage(raw: unknown): Result<WorkerToParentMessage> {
if (raw === null || typeof raw !== "object") {
@@ -67,27 +274,14 @@ export function parseWorkerMessage(raw: unknown): Result<WorkerToParentMessage>
if (typeof obj.type !== "string") {
return err(new Error("Worker IPC message missing string 'type' field"));
}
const type = obj.type;
if (type !== "signal" && type !== "error" && type !== "ready") {
return err(new Error(`Unknown worker IPC message type: "${type}"`));
}
if (type === "signal") {
if (typeof obj.sense !== "string") {
return err(new Error("Worker 'signal' message missing string 'sense' field"));
}
if (!("payload" in obj)) {
return err(new Error("Worker 'signal' message missing 'payload' field"));
}
return ok(raw as SignalMessage);
}
if (type === "error") {
if (typeof obj.sense !== "string") {
return err(new Error("Worker 'error' message missing string 'sense' field"));
}
if (typeof obj.error !== "string") {
return err(new Error("Worker 'error' message missing string 'error' field"));
}
return ok(raw as ErrorMessage);
if (!WORKER_MSG_TYPES.has(obj.type)) {
return err(new Error(`Unknown worker IPC message type: "${obj.type}"`));
}
if (obj.type === "signal") return parseSignalMsg(obj, raw);
if (obj.type === "error") return parseErrorMsg(obj, raw);
if (obj.type === "health-response") return parseHealthResponseMsg(obj, raw);
if (obj.type === "thread-event") return parseThreadEventMsg(obj, raw);
if (obj.type === "workflow-error") return parseWorkflowErrorMsg(obj, raw);
if (obj.type === "thread-command-event") return parseThreadCommandEventMsg(obj, raw);
return ok({ type: "ready" });
}
+302 -12
View File
@@ -8,33 +8,62 @@
* - Route ErrorMessage from workers → stderr log
* - Drive compute triggers via ReflexScheduler
* - Graceful shutdown: stop scheduler, send shutdown to all workers
* - Hot reload: restartGroup, reloadConfig, file watcher integration
* - Health reporting: getHealth
*/
import { fork } from "node:child_process";
import type { ChildProcess } from "node:child_process";
import { readFileSync } from "node:fs";
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import type { NerveConfig, Signal } from "@uncaged/nerve-core";
import { parseNerveConfig } from "@uncaged/nerve-core";
import { createFileWatcher } from "./file-watcher.js";
import type { FileWatcher } from "./file-watcher.js";
import type { ComputeMessage, ShutdownMessage } from "./ipc.js";
import { parseWorkerMessage } from "./ipc.js";
import { createLogStore } from "./log-store.js";
import type { LogStore } from "./log-store.js";
import { createReflexScheduler } from "./reflex-scheduler.js";
import type { ReflexScheduler } from "./reflex-scheduler.js";
import { createSignalBus } from "./signal-bus.js";
import type { SignalBus } from "./signal-bus.js";
import { createWorkflowManager } from "./workflow-manager.js";
import type { WorkflowManager } from "./workflow-manager.js";
export type KernelHealth = {
uptime: number;
activeSenses: number;
activeGroups: number;
pendingComputes: number;
activeWorkflows: number;
memoryUsage: NodeJS.MemoryUsage;
};
export type Kernel = {
stop: () => Promise<void>;
groups: Set<string>;
senseCount: number;
bus: SignalBus;
logStore: LogStore;
workflowManager: WorkflowManager;
/** Resolves when all workers have sent their initial "ready" message. */
ready: Promise<void>;
/** Returns the PID of the worker process for a given group, or null if not found. */
getWorkerPid: (group: string) => number | null;
/** Sends a compute message to the worker responsible for the given sense. */
triggerCompute: (senseName: string) => void;
/** Gracefully restart a group worker (wait for exit, then respawn). */
restartGroup: (group: string) => Promise<void>;
/** Reload config from a new NerveConfig, incrementally updating scheduler and workers.
* Note: any pending/throttled computes in the old scheduler are silently dropped on reload.
* In-flight state is not preserved across reloadConfig. */
reloadConfig: (newConfig: NerveConfig) => void;
/** Return daemon health info. */
getHealth: () => KernelHealth;
};
type WorkerEntry = {
@@ -66,7 +95,6 @@ function sendCompute(worker: ChildProcess, senseName: string): void {
}
function sendShutdown(worker: ChildProcess): void {
// worker.connected is false when the IPC channel has been closed (e.g. worker crashed)
if (worker.connected === false) return;
const msg: ShutdownMessage = { type: "shutdown" };
try {
@@ -84,27 +112,43 @@ function groupForSense(config: NerveConfig, senseName: string): string | null {
export type KernelOptions = {
workerScript: string;
enableFileWatcher?: boolean;
/** Override the LogStore instance (useful for testing). */
logStore?: LogStore;
};
function defaultKernelOptions(): KernelOptions {
return { workerScript: resolveWorkerScript() };
return { workerScript: resolveWorkerScript(), enableFileWatcher: false };
}
export function createKernel(
config: NerveConfig,
initialConfig: NerveConfig,
nerveRoot: string,
options: KernelOptions = defaultKernelOptions(),
): Kernel {
const bus: SignalBus = createSignalBus();
const workerScript = options.workerScript;
const startTime = Date.now();
const logStore: LogStore = options.logStore ?? createLogStore(join(nerveRoot, "data", "logs.db"));
logStore.append({
source: "system",
type: "start",
refId: null,
payload: null,
ts: startTime,
});
let config = initialConfig;
// Signal ID counter is instance-scoped (fix #2)
let _signalIdCounter = 0;
function nextSignalId(): number {
_signalIdCounter += 1;
return _signalIdCounter;
}
const workflowManager = createWorkflowManager(nerveRoot, config, logStore);
const groups = new Set<string>();
for (const senseConfig of Object.values(config.senses)) {
groups.add(senseConfig.group);
@@ -112,7 +156,6 @@ export function createKernel(
const workers = new Map<string, WorkerEntry>();
let stopped = false;
// eslint-disable-next-line prefer-const
let scheduler: ReflexScheduler = null as unknown as ReflexScheduler;
let readyResolve: (() => void) | undefined;
@@ -145,6 +188,13 @@ export function createKernel(
if (msg.type === "error") {
process.stderr.write(`[kernel] sense "${msg.sense}" error: ${msg.error}\n`);
logStore.append({
source: "system",
type: "error",
refId: msg.sense,
payload: JSON.stringify({ error: msg.error }),
ts: Date.now(),
});
scheduler.onComputeComplete(msg.sense);
return;
}
@@ -156,24 +206,44 @@ export function createKernel(
payload: msg.payload,
ts: Date.now(),
};
logStore.append({
source: "reflex",
type: "run_complete",
refId: msg.sense,
payload: JSON.stringify(msg.payload),
ts: signal.ts,
});
bus.emit(signal);
scheduler.onComputeComplete(msg.sense);
}
// health-response is handled externally by the caller; no action needed here
}
function startWorker(group: string): void {
function startWorker(group: string): Promise<void> {
const child = spawnWorker(nerveRoot, group, workerScript);
child.on("message", handleWorkerMessage);
let workerReadyResolve: (() => void) | undefined;
const workerReady = new Promise<void>((resolve) => {
workerReadyResolve = resolve;
});
child.on("message", (raw: unknown) => {
const result = parseWorkerMessage(raw);
if (result.ok && result.value.type === "ready") {
workerReadyResolve?.();
}
handleWorkerMessage(raw);
});
child.on("exit", (code) => {
process.stderr.write(
`[kernel] worker for group "${group}" exited with code ${code ?? "null"}\n`,
);
// Respawn on unexpected exit (fix #4)
// Resolve ready in case the worker exits before sending ready (prevents hangs)
workerReadyResolve?.();
if (!stopped && code !== 0) {
process.stderr.write(`[kernel] respawning worker for group "${group}" in 1s\n`);
// Clear any stuck in-flight state for all senses in this group
for (const senseName of sensesForGroup(group)) {
scheduler.onComputeComplete(senseName);
}
@@ -186,6 +256,7 @@ export function createKernel(
});
workers.set(group, { group, process: child });
return workerReady;
}
function triggerFn(senseName: string): void {
@@ -202,7 +273,12 @@ export function createKernel(
sendCompute(entry.process, senseName);
}
scheduler = createReflexScheduler(config, bus, triggerFn);
scheduler = createReflexScheduler(config, bus, triggerFn, {
logStore,
workflowTriggerFn: (workflowName, payload) => {
workflowManager.startWorkflow(workflowName, payload);
},
});
if (groups.size === 0) {
readyResolve?.();
@@ -212,7 +288,6 @@ export function createKernel(
startWorker(group);
}
// Wait for a worker process to exit, with a timeout + SIGKILL fallback (fix #5)
function waitForExit(child: ChildProcess, timeoutMs: number): Promise<void> {
return new Promise((resolve) => {
const timer = setTimeout(() => {
@@ -226,15 +301,217 @@ export function createKernel(
});
}
// --- restartGroup: gracefully stop worker, then respawn and await ready ---
async function restartGroup(group: string): Promise<void> {
const entry = workers.get(group);
if (entry === undefined) return;
for (const senseName of sensesForGroup(group)) {
scheduler.onComputeComplete(senseName);
}
sendShutdown(entry.process);
await waitForExit(entry.process, 5000);
if (!stopped) {
await startWorker(group);
}
}
function collectGroups(cfg: NerveConfig): Set<string> {
const result = new Set<string>();
for (const sc of Object.values(cfg.senses)) {
result.add(sc.group);
}
return result;
}
function sensesForGroupInConfig(cfg: NerveConfig, group: string): Set<string> {
const result = new Set<string>();
for (const [name, sc] of Object.entries(cfg.senses)) {
if (sc.group === group) result.add(name);
}
return result;
}
function removeStaleGroups(oldGroups: Set<string>, newGroups: Set<string>): void {
for (const g of oldGroups) {
if (newGroups.has(g)) continue;
const entry = workers.get(g);
if (entry !== undefined) {
sendShutdown(entry.process);
workers.delete(g);
}
groups.delete(g);
}
}
function addNewGroups(oldGroups: Set<string>, newGroups: Set<string>): void {
for (const g of newGroups) {
if (oldGroups.has(g)) continue;
groups.add(g);
if (!stopped) startWorker(g);
}
}
function reloadConfig(newConfig: NerveConfig): void {
const oldGroups = collectGroups(config);
const oldConfig = config;
const oldWorkflows = config.workflows ?? {};
config = newConfig;
// Note: pending/throttled computes in the old scheduler are silently dropped here.
// In-flight state is not preserved across reloadConfig.
scheduler.stop();
scheduler = createReflexScheduler(config, bus, triggerFn, {
logStore,
workflowTriggerFn: (workflowName, payload) => {
workflowManager.startWorkflow(workflowName, payload);
},
});
// Update workflow concurrency/overflow config incrementally — no restart needed
workflowManager.updateConfig(newConfig);
const newWorkflows = newConfig.workflows ?? {};
// Drain + remove workers for deleted workflows
for (const workflowName of Object.keys(oldWorkflows)) {
if (!(workflowName in newWorkflows)) {
process.stderr.write(
`[kernel] workflow "${workflowName}" removed from config, draining worker\n`,
);
workflowManager.drainAndRespawn(workflowName).catch((e) => {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(
`[kernel] drainAndRespawn error for removed workflow "${workflowName}": ${msg}\n`,
);
});
}
}
const newGroups = collectGroups(newConfig);
removeStaleGroups(oldGroups, newGroups);
addNewGroups(oldGroups, newGroups);
// Restart existing groups that gained new senses — the running worker process
// was spawned with the old config and will report "Unknown sense" for any newly
// added sense until it is restarted.
for (const g of newGroups) {
if (!oldGroups.has(g)) continue; // already handled by addNewGroups
const oldSenses = sensesForGroupInConfig(oldConfig, g);
const newSenses = sensesForGroupInConfig(newConfig, g);
const gained = [...newSenses].some((s) => !oldSenses.has(s));
if (gained) {
restartGroup(g).catch((e) => {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[kernel] reloadConfig restartGroup error for "${g}": ${msg}\n`);
});
}
}
}
function getHealth(): KernelHealth {
return {
uptime: Date.now() - startTime,
activeSenses: Object.keys(config.senses).length,
activeGroups: workers.size,
pendingComputes: 0,
activeWorkflows: workflowManager.totalActiveCount(),
memoryUsage: process.memoryUsage(),
};
}
function handleSenseFileChange(senseName: string): void {
const sc = config.senses[senseName];
if (sc === undefined) return;
process.stderr.write(
`[kernel] sense file changed: "${senseName}", restarting group "${sc.group}"\n`,
);
logStore.append({
source: "system",
type: "sense_reload",
refId: senseName,
payload: null,
ts: Date.now(),
});
restartGroup(sc.group).catch((e) => {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[kernel] restartGroup error: ${msg}\n`);
});
}
function handleWorkflowFileChange(workflowName: string): void {
process.stderr.write(
`[kernel] workflow file changed: "${workflowName}", draining and respawning worker\n`,
);
logStore.append({
source: "system",
type: "workflow_reload",
refId: workflowName,
payload: null,
ts: Date.now(),
});
workflowManager.drainAndRespawn(workflowName).catch((e) => {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[kernel] drainAndRespawn error for "${workflowName}": ${msg}\n`);
});
}
function handleConfigFileChange(): void {
process.stderr.write("[kernel] nerve.yaml changed, reloading config\n");
logStore.append({
source: "system",
type: "config_reload",
refId: null,
payload: null,
ts: Date.now(),
});
try {
const raw = readFileSync(join(nerveRoot, "nerve.yaml"), "utf8");
const parseResult = parseNerveConfig(raw);
if (!parseResult.ok) {
process.stderr.write(
`[kernel] config parse error, keeping current config: ${parseResult.error.message}\n`,
);
return;
}
reloadConfig(parseResult.value);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[kernel] failed to read nerve.yaml, keeping current config: ${msg}\n`);
}
}
let fileWatcher: FileWatcher | null = null;
if (options.enableFileWatcher) {
fileWatcher = createFileWatcher(nerveRoot, (change) => {
if (change.kind === "sense") handleSenseFileChange(change.senseName);
if (change.kind === "config") handleConfigFileChange();
if (change.kind === "workflow") handleWorkflowFileChange(change.workflowName);
});
}
async function stop(): Promise<void> {
stopped = true;
if (fileWatcher !== null) {
fileWatcher.close();
fileWatcher = null;
}
scheduler.stop();
await workflowManager.stop();
const exitPromises: Promise<void>[] = [];
for (const entry of workers.values()) {
sendShutdown(entry.process);
exitPromises.push(waitForExit(entry.process, 5000));
}
await Promise.all(exitPromises);
logStore.append({
source: "system",
type: "stop",
refId: null,
payload: null,
ts: Date.now(),
});
logStore.close();
}
function getWorkerPid(group: string): number | null {
@@ -243,5 +520,18 @@ export function createKernel(
const senseCount = Object.keys(config.senses).length;
return { stop, groups, senseCount, bus, ready, getWorkerPid, triggerCompute: triggerFn };
return {
stop,
groups,
senseCount,
bus,
logStore,
workflowManager,
ready,
getWorkerPid,
triggerCompute: triggerFn,
restartGroup,
reloadConfig,
getHealth,
};
}
+351
View File
@@ -0,0 +1,351 @@
/**
* Log Store — append-only structured log storage backed by SQLite.
*
* Stores system, reflex, and workflow log entries in a single table.
* Logs are data assets for audit/analysis — they MUST NOT trigger reflexes.
*
* Also provides a `meta` key-value table for bookkeeping (e.g. archive watermarks).
*/
import { mkdirSync } from "node:fs";
import { dirname } from "node:path";
import Database from "better-sqlite3";
import type BetterSqlite3 from "better-sqlite3";
export type LogEntry = {
id?: number;
source: string;
type: string;
refId: string | null;
payload: string | null;
ts: number;
};
export type LogQuery = {
source?: string;
type?: string;
refId?: string;
since?: number;
until?: number;
limit?: number;
};
// ---------------------------------------------------------------------------
// Workflow runs materialized view (RFC-002 §6.2)
// ---------------------------------------------------------------------------
export type WorkflowRunStatus =
| "queued"
| "started"
| "completed"
| "failed"
| "crashed"
| "dropped"
| "interrupted";
const VALID_WORKFLOW_STATUSES = new Set<string>([
"queued",
"started",
"completed",
"failed",
"crashed",
"dropped",
"interrupted",
]);
function validateWorkflowRunStatus(status: string): WorkflowRunStatus {
if (!VALID_WORKFLOW_STATUSES.has(status)) {
throw new Error(`Invalid workflow run status from DB: "${status}"`);
}
return status as WorkflowRunStatus;
}
/** One row in the workflow_runs materialized table. */
export type WorkflowRun = {
runId: string;
workflow: string;
status: WorkflowRunStatus;
ts: number;
};
export type LogStore = {
append: (entry: Omit<LogEntry, "id">) => LogEntry;
query: (filter?: LogQuery) => LogEntry[];
getMeta: (key: string) => string | null;
setMeta: (key: string, value: string) => void;
/**
* Append a workflow log event and atomically upsert the workflow_runs
* materialized table — both in a single SQLite transaction (RFC-002 §6.2).
*/
upsertWorkflowRun: (entry: Omit<LogEntry, "id">, run: WorkflowRun) => LogEntry;
/**
* Alias for upsertWorkflowRun — append a log entry and update workflow_runs
* in one atomic transaction.
*/
appendWithWorkflowUpdate: (entry: Omit<LogEntry, "id">, run: WorkflowRun) => LogEntry;
/** Get the current materialized state of a specific workflow run. */
getWorkflowRun: (runId: string) => WorkflowRun | null;
/**
* Get all workflow runs with status 'queued' or 'started'.
* Optionally filter by workflow name.
*/
getActiveWorkflowRuns: (workflowName?: string) => WorkflowRun[];
/**
* Get the trigger payload for a workflow run (stored in the 'started' log entry).
* Returns null if not found.
*/
getTriggerPayload: (runId: string) => unknown;
/**
* Get all workflow CommandEvents for a specific run, ordered by id ASC.
* Used for crash recovery to rebuild ThreadState.
*/
getThreadEvents: (runId: string) => Array<{ type: string; [key: string]: unknown }>;
close: () => void;
};
const SCHEMA_SQL = `
CREATE TABLE IF NOT EXISTS logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT NOT NULL,
type TEXT NOT NULL,
ref_id TEXT,
payload TEXT,
ts INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_logs_source_type ON logs(source, type);
CREATE INDEX IF NOT EXISTS idx_logs_ts ON logs(ts);
CREATE INDEX IF NOT EXISTS idx_logs_ref_id ON logs(ref_id);
CREATE TABLE IF NOT EXISTS meta (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS workflow_runs (
run_id TEXT PRIMARY KEY,
workflow TEXT NOT NULL,
status TEXT NOT NULL,
ts INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_workflow_runs_status ON workflow_runs(status);
CREATE INDEX IF NOT EXISTS idx_workflow_runs_workflow ON workflow_runs(workflow);
`;
export function createLogStore(dbPath: string): LogStore {
mkdirSync(dirname(dbPath), { recursive: true });
const sqlite: BetterSqlite3.Database = new Database(dbPath);
sqlite.pragma("journal_mode = WAL");
sqlite.exec(SCHEMA_SQL);
const insertStmt = sqlite.prepare(
"INSERT INTO logs (source, type, ref_id, payload, ts) VALUES (@source, @type, @refId, @payload, @ts)",
);
const getMetaStmt = sqlite.prepare("SELECT value FROM meta WHERE key = ?");
const setMetaStmt = sqlite.prepare(
"INSERT INTO meta (key, value) VALUES (@key, @value) ON CONFLICT(key) DO UPDATE SET value = @value",
);
const upsertWorkflowRunStmt = sqlite.prepare(
"INSERT OR REPLACE INTO workflow_runs (run_id, workflow, status, ts) VALUES (@runId, @workflow, @status, @ts)",
);
const getWorkflowRunStmt = sqlite.prepare(
"SELECT run_id, workflow, status, ts FROM workflow_runs WHERE run_id = ?",
);
const getTriggerPayloadStmt = sqlite.prepare(
"SELECT payload FROM logs WHERE source = 'workflow' AND type = 'started' AND ref_id = ? ORDER BY id ASC LIMIT 1",
);
const getThreadEventsStmt = sqlite.prepare(
"SELECT payload FROM logs WHERE source = 'workflow' AND type = 'thread_command_event' AND ref_id = ? ORDER BY id ASC",
);
const getActiveWorkflowRunsStmt = sqlite.prepare(
"SELECT run_id, workflow, status, ts FROM workflow_runs WHERE status IN ('queued', 'started') ORDER BY ts ASC",
);
const getActiveWorkflowRunsByNameStmt = sqlite.prepare(
"SELECT run_id, workflow, status, ts FROM workflow_runs WHERE status IN ('queued', 'started') AND workflow = ? ORDER BY ts ASC",
);
const upsertWorkflowRunTx = sqlite.transaction(
(entry: Omit<LogEntry, "id">, run: WorkflowRun) => {
const info = insertStmt.run({
source: entry.source,
type: entry.type,
refId: entry.refId,
payload: entry.payload,
ts: entry.ts,
});
upsertWorkflowRunStmt.run({
runId: run.runId,
workflow: run.workflow,
status: run.status,
ts: run.ts,
});
return { ...entry, id: Number(info.lastInsertRowid) };
},
);
function append(entry: Omit<LogEntry, "id">): LogEntry {
const info = insertStmt.run({
source: entry.source,
type: entry.type,
refId: entry.refId,
payload: entry.payload,
ts: entry.ts,
});
return { ...entry, id: Number(info.lastInsertRowid) };
}
function query(filter: LogQuery = {}): LogEntry[] {
const conditions: string[] = [];
const params: Record<string, unknown> = {};
if (filter.source !== undefined) {
conditions.push("source = @source");
params.source = filter.source;
}
if (filter.type !== undefined) {
conditions.push("type = @type");
params.type = filter.type;
}
if (filter.refId !== undefined) {
conditions.push("ref_id = @refId");
params.refId = filter.refId;
}
if (filter.since !== undefined) {
conditions.push("ts >= @since");
params.since = filter.since;
}
if (filter.until !== undefined) {
conditions.push("ts <= @until");
params.until = filter.until;
}
const where = conditions.length > 0 ? `WHERE ${conditions.join(" AND ")}` : "";
const limit = filter.limit !== undefined ? `LIMIT ${filter.limit}` : "";
const sql = `SELECT id, source, type, ref_id, payload, ts FROM logs ${where} ORDER BY id ASC ${limit}`;
const rows = sqlite.prepare(sql).all(params) as Array<{
id: number;
source: string;
type: string;
ref_id: string | null;
payload: string | null;
ts: number;
}>;
return rows.map((r) => ({
id: r.id,
source: r.source,
type: r.type,
refId: r.ref_id,
payload: r.payload,
ts: r.ts,
}));
}
function getMeta(key: string): string | null {
const row = getMetaStmt.get(key) as { value: string } | undefined;
return row?.value ?? null;
}
function setMeta(key: string, value: string): void {
setMetaStmt.run({ key, value });
}
function upsertWorkflowRun(entry: Omit<LogEntry, "id">, run: WorkflowRun): LogEntry {
return upsertWorkflowRunTx(entry, run) as LogEntry;
}
function appendWithWorkflowUpdate(entry: Omit<LogEntry, "id">, run: WorkflowRun): LogEntry {
return upsertWorkflowRunTx(entry, run) as LogEntry;
}
function getWorkflowRun(runId: string): WorkflowRun | null {
const row = getWorkflowRunStmt.get(runId) as
| { run_id: string; workflow: string; status: string; ts: number }
| undefined;
if (row === undefined) return null;
return {
runId: row.run_id,
workflow: row.workflow,
status: validateWorkflowRunStatus(row.status),
ts: row.ts,
};
}
function getActiveWorkflowRuns(workflowName?: string): WorkflowRun[] {
const rows = (
workflowName !== undefined
? getActiveWorkflowRunsByNameStmt.all(workflowName)
: getActiveWorkflowRunsStmt.all()
) as Array<{ run_id: string; workflow: string; status: string; ts: number }>;
return rows.map((r) => ({
runId: r.run_id,
workflow: r.workflow,
status: validateWorkflowRunStatus(r.status),
ts: r.ts,
}));
}
function getTriggerPayload(runId: string): unknown {
const row = getTriggerPayloadStmt.get(runId) as { payload: string | null } | undefined;
if (row === undefined || row.payload === null) return null;
try {
const parsed = JSON.parse(row.payload) as unknown;
if (parsed !== null && typeof parsed === "object") {
const obj = parsed as Record<string, unknown>;
return obj.triggerPayload ?? null;
}
} catch {
// malformed
}
return null;
}
function getThreadEvents(runId: string): Array<{ type: string; [key: string]: unknown }> {
const rows = getThreadEventsStmt.all(runId) as Array<{ payload: string | null }>;
const result: Array<{ type: string; [key: string]: unknown }> = [];
for (const row of rows) {
if (row.payload === null) continue;
try {
const parsed = JSON.parse(row.payload) as unknown;
if (
parsed !== null &&
typeof parsed === "object" &&
typeof (parsed as Record<string, unknown>).type === "string"
) {
result.push(parsed as { type: string; [key: string]: unknown });
}
} catch {
// skip malformed payloads
}
}
return result;
}
function close(): void {
sqlite.close();
}
return {
append,
query,
getMeta,
setMeta,
upsertWorkflowRun,
appendWithWorkflowUpdate,
getWorkflowRun,
getActiveWorkflowRuns,
getTriggerPayload,
getThreadEvents,
close,
};
}
+34
View File
@@ -10,11 +10,15 @@
*/
import type { NerveConfig } from "@uncaged/nerve-core";
import type { LogStore } from "./log-store.js";
import type { SignalBus, Unsubscribe } from "./signal-bus.js";
/** Sends a compute message to the worker responsible for the given sense. */
export type TriggerFn = (senseName: string) => void;
/** Triggers a workflow run in response to a signal. */
export type WorkflowTriggerFn = (workflowName: string, payload: unknown) => void;
/** Per-sense mutable state tracked by the scheduler. */
type SenseState = {
lastComputeAt: number;
@@ -34,19 +38,27 @@ function makeSenseState(): SenseState {
return { lastComputeAt: 0, inFlight: false, pending: false, deferredTimer: null };
}
export type ReflexSchedulerOptions = {
logStore?: LogStore;
workflowTriggerFn?: WorkflowTriggerFn;
};
/**
* Create and start a reflex scheduler.
*
* @param config Full NerveConfig (reads senses for throttle/timeout, reflexes for schedule).
* @param bus SignalBus to subscribe for event-driven reflexes.
* @param triggerFn Called with the sense name when a compute should be dispatched.
* @param opts Optional: logStore for structured logging.
* @returns ReflexScheduler with stop() and onComputeComplete() methods.
*/
export function createReflexScheduler(
config: NerveConfig,
bus: SignalBus,
triggerFn: TriggerFn,
opts?: ReflexSchedulerOptions,
): ReflexScheduler {
const logStore = opts?.logStore;
const intervals: ReturnType<typeof setInterval>[] = [];
const unsubscribers: Unsubscribe[] = [];
const states = new Map<string, SenseState>();
@@ -65,6 +77,13 @@ export function createReflexScheduler(
state.inFlight = true;
state.pending = false;
state.lastComputeAt = Date.now();
logStore?.append({
source: "reflex",
type: "run_start",
refId: senseName,
payload: null,
ts: Date.now(),
});
triggerFn(senseName);
}
@@ -138,6 +157,21 @@ export function createReflexScheduler(
}
for (const reflex of config.reflexes) {
if (reflex.kind === "workflow") {
if (opts?.workflowTriggerFn !== undefined && reflex.on !== null && reflex.on.length > 0) {
const workflowTriggerFn = opts.workflowTriggerFn;
const workflowName = reflex.workflow;
const watchedSenses = new Set(reflex.on);
const unsub = bus.subscribe((signal) => {
if (watchedSenses.has(signal.senseId)) {
workflowTriggerFn(workflowName, signal.payload);
}
});
unsubscribers.push(unsub);
}
continue;
}
if (reflex.kind !== "sense") continue;
const senseReflex = reflex;
const senseName = senseReflex.sense;
+3 -2
View File
@@ -1,5 +1,5 @@
import { readFileSync, readdirSync } from "node:fs";
import { join } from "node:path";
import { mkdirSync, readFileSync, readdirSync } from "node:fs";
import { dirname, join } from "node:path";
import Database from "better-sqlite3";
import { drizzle } from "drizzle-orm/better-sqlite3";
@@ -128,6 +128,7 @@ export function openSenseDb(
let sqlite: Database.Database;
try {
mkdirSync(dirname(dbPath), { recursive: true });
sqlite = new Database(dbPath);
// WAL mode for better concurrent read performance
sqlite.pragma("journal_mode = WAL");
+119 -41
View File
@@ -79,18 +79,12 @@ async function initSense(
const dbResult = openSenseDb(dbPath, migrationsDir);
if (!dbResult.ok) {
process.stderr.write(
`[sense-worker] Failed to init DB for "${senseName}": ${dbResult.error.message}\n`,
);
process.exit(1);
throw new Error(`Failed to init DB for "${senseName}": ${dbResult.error.message}`);
}
const computeResult = await loadComputeFn(senseIndexPath);
if (!computeResult.ok) {
process.stderr.write(
`[sense-worker] Failed to load compute for "${senseName}": ${computeResult.error.message}\n`,
);
process.exit(1);
throw new Error(`Failed to load compute for "${senseName}": ${computeResult.error.message}`);
}
const { db } = dbResult.value;
@@ -129,12 +123,75 @@ function buildPeers(
return Object.fromEntries(entries);
}
// ---------------------------------------------------------------------------
// Grace period: hard kill after soft timeout
//
// Trade-off: grace period kills the entire worker process (process.exit), which
// terminates all senses in the group — not just the one that timed out. This is
// intentional: a hung compute may hold locks or corrupt shared state. Restarting
// the full worker ensures a clean slate, but other senses in the same group will
// lose any in-flight work until the kernel respawns the process.
// ---------------------------------------------------------------------------
const gracePeriodTimers = new Map<string, ReturnType<typeof setTimeout>>();
function scheduleGracePeriodKill(senseName: string, gracePeriodMs: number): void {
if (gracePeriodTimers.has(senseName)) return;
process.stderr.write(`[sense-worker] grace period for "${senseName}" (${gracePeriodMs}ms)\n`);
const timer = setTimeout(() => {
process.stderr.write(`[sense-worker] grace period expired for "${senseName}", hard kill\n`);
process.exit(1);
}, gracePeriodMs);
gracePeriodTimers.set(senseName, timer);
}
function clearGracePeriodTimer(senseName: string): void {
const timer = gracePeriodTimers.get(senseName);
if (timer === undefined) return;
clearTimeout(timer);
gracePeriodTimers.delete(senseName);
}
// ---------------------------------------------------------------------------
// Compute execution with error isolation
// ---------------------------------------------------------------------------
async function runCompute(
senseName: string,
runtime: SenseRuntime,
peers: PeerMap,
timeoutMs: number,
gracePeriodMs: number | null,
): Promise<void> {
try {
const result = await executeCompute(runtime, peers, timeoutMs);
if (!result.ok) {
sendError(senseName, result.error.message);
if (gracePeriodMs !== null && result.error.message.includes("timed out")) {
scheduleGracePeriodKill(senseName, gracePeriodMs);
}
return;
}
clearGracePeriodTimer(senseName);
if (result.value !== null) {
sendSignal(senseName, result.value);
}
} catch (e: unknown) {
const errMsg = e instanceof Error ? e.message : String(e);
sendError(senseName, errMsg);
}
}
// ---------------------------------------------------------------------------
// IPC message dispatch
// ---------------------------------------------------------------------------
function handleMessage(
raw: unknown,
runtimes: Map<string, SenseRuntime>,
peers: PeerMap,
group: string,
timeoutMs: number,
senseConfigs: Map<string, { timeout: number | null; gracePeriod: number | null }>,
inFlight: Map<string, Promise<void>>,
): void {
const parseResult = parseParentMessage(raw);
@@ -149,33 +206,36 @@ function handleMessage(
process.exit(0);
}
if (msg.type === "compute") {
const runtime = runtimes.get(msg.sense);
if (!runtime) {
sendError(msg.sense, `Unknown sense "${msg.sense}" in group "${group}"`);
return;
}
// Serialize computes for the same sense
const previous = inFlight.get(msg.sense) ?? Promise.resolve();
const next = previous.then(async () => {
const result = await executeCompute(runtime, peers, timeoutMs);
if (!result.ok) {
sendError(msg.sense, result.error.message);
return;
}
if (result.value !== null) {
sendSignal(msg.sense, result.value);
}
if (msg.type === "health-request") {
send({
type: "health-response",
senses: [...runtimes.keys()],
inFlightCount: inFlight.size,
});
return;
}
const tracked = next.catch((e: unknown) => {
if (msg.type !== "compute") return;
const runtime = runtimes.get(msg.sense);
if (!runtime) {
sendError(msg.sense, `Unknown sense "${msg.sense}" in group "${group}"`);
return;
}
// Look up timeout/gracePeriod per-sense at compute time (RFC §5.3: these are per-sense)
const sc = senseConfigs.get(msg.sense);
const timeoutMs = sc?.timeout ?? DEFAULT_TIMEOUT_MS;
const gracePeriodMs = sc?.gracePeriod ?? null;
const previous = inFlight.get(msg.sense) ?? Promise.resolve();
const next = previous
.then(() => runCompute(msg.sense, runtime, peers, timeoutMs, gracePeriodMs))
.catch((e: unknown) => {
const errMsg = e instanceof Error ? e.message : String(e);
sendError(msg.sense, errMsg);
});
inFlight.set(msg.sense, tracked);
}
inFlight.set(msg.sense, next);
}
// ---------------------------------------------------------------------------
@@ -198,29 +258,47 @@ async function bootstrap(nerveRoot: string, group: string): Promise<void> {
const runtimes = new Map<string, SenseRuntime>();
const ownDbs = new Map<string, DrizzleDB>();
const failedSenses: string[] = [];
for (const senseName of groupSenses) {
const { db, runtime } = await initSense(nerveRoot, senseName);
ownDbs.set(senseName, db);
runtimes.set(senseName, runtime);
try {
const { db, runtime } = await initSense(nerveRoot, senseName);
ownDbs.set(senseName, db);
runtimes.set(senseName, runtime);
} catch (e: unknown) {
const eMsg = e instanceof Error ? e.message : String(e);
process.stderr.write(
`[sense-worker] Failed to load sense "${senseName}", skipping: ${eMsg}\n`,
);
failedSenses.push(senseName);
}
}
// If ALL senses failed, exit with error so kernel respawns
if (runtimes.size === 0) {
process.stderr.write(`[sense-worker] All senses in group "${group}" failed to load, exiting\n`);
process.exit(1);
}
const groupSenseNames = new Set(groupSenses);
const peers = buildPeers(nerveRoot, Object.keys(config.senses), ownDbs, groupSenseNames);
// Read timeout from config (uses first group sense's config, or default)
const firstSenseConfig = config.senses[groupSenses[0]];
const timeoutMs =
typeof (firstSenseConfig as Record<string, unknown>).timeoutMs === "number"
? ((firstSenseConfig as Record<string, unknown>).timeoutMs as number)
: DEFAULT_TIMEOUT_MS;
// Build per-sense timeout/gracePeriod map (RFC §5.3: these are per-sense, not per-group)
const senseConfigs = new Map<string, { timeout: number | null; gracePeriod: number | null }>();
for (const senseName of groupSenses) {
const sc = config.senses[senseName];
senseConfigs.set(senseName, {
timeout: sc?.timeout ?? null,
gracePeriod: sc?.gracePeriod ?? null,
});
}
const inFlight = new Map<string, Promise<void>>();
sendReady();
process.on("message", (raw: unknown) => {
handleMessage(raw, runtimes, peers, group, timeoutMs, inFlight);
handleMessage(raw, runtimes, peers, group, senseConfigs, inFlight);
});
}
+556
View File
@@ -0,0 +1,556 @@
/**
* WorkflowManager — manages per-workflow worker processes, concurrency control,
* and thread lifecycle tracking.
*
* RFC-002 §7.1: one worker process per workflow name, reused while alive.
* Concurrency and overflow (drop/queue) are enforced here in the parent process.
*/
import { fork } from "node:child_process";
import type { ChildProcess } from "node:child_process";
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import type { NerveConfig, WorkflowConfig } from "@uncaged/nerve-core";
import type {
ResumeThreadMessage,
ShutdownMessage,
StartThreadMessage,
ThreadEventMessage,
} from "./ipc.js";
import { parseWorkerMessage } from "./ipc.js";
import type { LogStore } from "./log-store.js";
import type { WorkflowRunStatus } from "./log-store.js";
export type WorkflowManager = {
/** Trigger a new workflow thread (called by Reflex scheduler). */
startWorkflow: (workflowName: string, payload: unknown) => void;
/** Number of currently active (running) threads for a workflow. */
activeCount: (workflowName: string) => number;
/** Number of pending queued threads waiting to run for a workflow. */
queueLength: (workflowName: string) => number;
/** Total active workflow threads across all workflows. */
totalActiveCount: () => number;
/** Update the config reference (e.g. after hot reload). Active workers are unaffected. */
updateConfig: (newConfig: NerveConfig) => void;
/**
* Drain active threads for a workflow, then respawn its worker process.
* Used for hot reload when the workflow .ts file changes.
* Waits up to `drainTimeoutMs` for threads to complete before force-killing.
*/
drainAndRespawn: (workflowName: string, drainTimeoutMs?: number) => Promise<void>;
/** Gracefully shut down all workflow workers. */
stop: () => Promise<void>;
};
type PendingThread = {
runId: string;
payload: unknown;
};
type WorkflowState = {
active: Set<string>;
queue: PendingThread[];
};
type WorkerEntry = {
workflowName: string;
process: ChildProcess;
stopping: boolean;
/** When set, the worker is draining before a hot-reload respawn. */
draining: boolean;
};
// Crash respawn backoff: track crash timestamps per workflow.
const MAX_CRASHES_IN_WINDOW = 5;
const CRASH_WINDOW_MS = 60_000;
/**
* Worker shutdown timeout — must stay in sync with SHUTDOWN_TIMEOUT_MS in workflow-worker.ts.
* The drain timeout passed to drainAndRespawn must be >= this value so the worker has
* enough time to finish in-flight threads before the parent force-kills it.
*/
const WORKER_SHUTDOWN_TIMEOUT_MS = 10_000;
const DEFAULT_MAX_QUEUE = 100;
function resolveWorkerScript(): string {
const __filename = fileURLToPath(import.meta.url);
const __dir = dirname(__filename);
return join(__dir, "workflow-worker.js");
}
function spawnWorkflowWorker(
nerveRoot: string,
workflowName: string,
workerScript: string,
): ChildProcess {
return fork(workerScript, ["--workflow", workflowName, "--root", nerveRoot], {
stdio: ["ignore", "inherit", "inherit", "ipc"],
});
}
function sendStartThread(worker: ChildProcess, msg: StartThreadMessage): void {
if (worker.connected === false) return;
try {
worker.send(msg);
} catch {
// IPC channel closed between connected check and send
}
}
function sendShutdown(worker: ChildProcess, entry: WorkerEntry): void {
entry.stopping = true;
if (worker.connected === false) return;
const msg: ShutdownMessage = { type: "shutdown" };
try {
worker.send(msg);
} catch {
// IPC channel closed between connected check and send
}
}
function sendResumeThread(worker: ChildProcess, msg: ResumeThreadMessage): void {
if (worker.connected === false) return;
try {
worker.send(msg);
} catch {
// IPC channel closed between connected check and send
}
}
function waitForExit(child: ChildProcess, timeoutMs: number): Promise<void> {
return new Promise((resolve) => {
const timer = setTimeout(() => {
child.kill("SIGKILL");
resolve();
}, timeoutMs);
child.once("exit", () => {
clearTimeout(timer);
resolve();
});
});
}
export function createWorkflowManager(
nerveRoot: string,
initialConfig: NerveConfig,
logStore: LogStore,
): WorkflowManager {
const workerScript = resolveWorkerScript();
const states = new Map<string, WorkflowState>();
const workers = new Map<string, WorkerEntry>();
const crashTimestamps = new Map<string, number[]>();
let stopped = false;
let config = initialConfig;
function getOrCreateState(workflowName: string): WorkflowState {
let state = states.get(workflowName);
if (state === undefined) {
state = { active: new Set<string>(), queue: [] };
states.set(workflowName, state);
}
return state;
}
function workflowConfig(workflowName: string): WorkflowConfig | null {
return config.workflows?.[workflowName] ?? null;
}
function toWorkflowRunStatus(eventType: string): WorkflowRunStatus | null {
const map: Record<string, WorkflowRunStatus> = {
started: "started",
queued: "queued",
completed: "completed",
failed: "failed",
crashed: "crashed",
dropped: "dropped",
interrupted: "interrupted",
};
return map[eventType] ?? null;
}
function logWorkflowEvent(
workflowName: string,
runId: string,
eventType: string,
payload?: unknown,
): void {
const ts = Date.now();
const serialised = payload !== undefined ? JSON.stringify(payload) : null;
const status = toWorkflowRunStatus(eventType);
if (status !== null) {
logStore.upsertWorkflowRun(
{ source: "workflow", type: eventType, refId: runId, payload: serialised, ts },
{ runId, workflow: workflowName, status, ts },
);
} else {
logStore.append({
source: "workflow",
type: eventType,
refId: runId,
payload: serialised,
ts,
});
}
}
function dispatchThread(workflowName: string, runId: string, payload: unknown): void {
const state = getOrCreateState(workflowName);
state.active.add(runId);
const worker = getOrSpawnWorker(workflowName);
const msg: StartThreadMessage = {
type: "start-thread",
runId,
workflow: workflowName,
triggerPayload: payload,
};
sendStartThread(worker.process, msg);
// Store triggerPayload in the log so it can be recovered after a crash
logWorkflowEvent(workflowName, runId, "started", { triggerPayload: payload });
}
function dequeueNext(workflowName: string): void {
const state = states.get(workflowName);
if (state === undefined || state.queue.length === 0) return;
const wfConfig = workflowConfig(workflowName);
const concurrency = wfConfig?.concurrency ?? 1;
if (state.active.size < concurrency) {
const next = state.queue.shift();
if (next !== undefined) {
dispatchThread(workflowName, next.runId, next.payload);
}
}
}
function handleThreadEvent(workflowName: string, msg: ThreadEventMessage): void {
const state = states.get(workflowName);
if (state === undefined) return;
if (msg.eventType === "completed" || msg.eventType === "failed") {
state.active.delete(msg.runId);
dequeueNext(workflowName);
}
if (msg.eventType === "completed" || msg.eventType === "failed") {
logWorkflowEvent(workflowName, msg.runId, msg.eventType, msg.payload);
}
}
function recoverQueuedRun(workflowName: string, runId: string, state: WorkflowState): void {
if (state.queue.some((q) => q.runId === runId)) return;
const triggerPayload = logStore.getTriggerPayload(runId);
state.queue.push({ runId, payload: triggerPayload });
process.stderr.write(
`[workflow-manager] crash-recovery: re-queued thread "${runId}" for "${workflowName}"\n`,
);
}
function recoverStartedRun(
workflowName: string,
runId: string,
state: WorkflowState,
worker: WorkerEntry,
): void {
if (state.active.has(runId)) return;
const events = logStore.getThreadEvents(runId);
const triggerPayload = logStore.getTriggerPayload(runId);
state.active.add(runId);
const msg: ResumeThreadMessage = {
type: "resume-thread",
runId,
events,
triggerPayload,
};
sendResumeThread(worker.process, msg);
process.stderr.write(
`[workflow-manager] crash-recovery: resuming thread "${runId}" for "${workflowName}" (${events.length} events)\n`,
);
}
function recoverThreadsForWorker(workflowName: string, worker: WorkerEntry): void {
const activeRuns = logStore.getActiveWorkflowRuns(workflowName);
const state = getOrCreateState(workflowName);
for (const run of activeRuns) {
if (run.status === "queued") {
recoverQueuedRun(workflowName, run.runId, state);
} else if (run.status === "started") {
recoverStartedRun(workflowName, run.runId, state, worker);
}
}
}
function recordCrashAndCheckLimit(workflowName: string): boolean {
const now = Date.now();
const timestamps = (crashTimestamps.get(workflowName) ?? []).filter(
(t) => now - t < CRASH_WINDOW_MS,
);
timestamps.push(now);
crashTimestamps.set(workflowName, timestamps);
return timestamps.length > MAX_CRASHES_IN_WINDOW;
}
function handleWorkerCrash(workflowName: string): void {
const state = states.get(workflowName);
if (state === undefined) return;
const crashedCount = state.active.size;
if (crashedCount > 0) {
process.stderr.write(
`[workflow-manager] worker for "${workflowName}" crashed with ${crashedCount} active thread(s)\n`,
);
for (const runId of state.active) {
logWorkflowEvent(workflowName, runId, "crashed");
}
}
state.active.clear();
workers.delete(workflowName);
if (stopped || workflowConfig(workflowName) === null) return;
if (recordCrashAndCheckLimit(workflowName)) {
const count = crashTimestamps.get(workflowName)?.length ?? 0;
process.stderr.write(
`[workflow-manager] worker for "${workflowName}" crashed ${count} times in ${CRASH_WINDOW_MS}ms — stopping respawn\n`,
);
return;
}
process.stderr.write(
`[workflow-manager] respawning worker for "${workflowName}" after crash\n`,
);
const newWorker = getOrSpawnWorker(workflowName);
setImmediate(() => {
recoverThreadsForWorker(workflowName, newWorker);
});
}
function handleWorkerMessage(workflowName: string, raw: unknown): void {
const result = parseWorkerMessage(raw);
if (!result.ok) {
process.stderr.write(
`[workflow-manager] invalid message from "${workflowName}" worker: ${result.error.message}\n`,
);
return;
}
const msg = result.value;
if (msg.type === "thread-event") {
handleThreadEvent(workflowName, msg);
return;
}
if (msg.type === "thread-command-event") {
logStore.append({
source: "workflow",
type: "thread_command_event",
refId: msg.runId,
payload: JSON.stringify(msg.event),
ts: Date.now(),
});
return;
}
if (msg.type === "workflow-error") {
process.stderr.write(
`[workflow-manager] workflow-error for runId "${msg.runId}" in "${workflowName}": ${msg.error}\n`,
);
const state = states.get(workflowName);
if (state !== undefined) {
state.active.delete(msg.runId);
dequeueNext(workflowName);
}
logWorkflowEvent(workflowName, msg.runId, "failed", { error: msg.error });
return;
}
if (msg.type === "error") {
process.stderr.write(
`[workflow-manager] error from "${workflowName}" worker: ${msg.error}\n`,
);
}
}
function markActiveRunsInterrupted(workflowName: string): void {
const state = states.get(workflowName);
if (state === undefined) return;
for (const runId of state.active) {
logWorkflowEvent(workflowName, runId, "interrupted");
}
state.active.clear();
}
function handleWorkerExit(workflowName: string, code: number | null): void {
const entry = workers.get(workflowName);
if (entry?.draining) {
workers.delete(workflowName);
markActiveRunsInterrupted(workflowName);
if (!stopped && workflowConfig(workflowName) !== null) {
process.stderr.write(
`[workflow-manager] worker for "${workflowName}" drained, respawning\n`,
);
getOrSpawnWorker(workflowName);
}
return;
}
if (entry?.stopping) {
workers.delete(workflowName);
const state = states.get(workflowName);
if (state !== undefined) {
state.active.clear();
}
return;
}
process.stderr.write(
`[workflow-manager] worker for "${workflowName}" exited with code ${code ?? "null"}\n`,
);
handleWorkerCrash(workflowName);
}
function getOrSpawnWorker(workflowName: string): WorkerEntry {
const existing = workers.get(workflowName);
if (existing !== undefined && existing.process.exitCode === null) {
return existing;
}
const child = spawnWorkflowWorker(nerveRoot, workflowName, workerScript);
child.on("message", (raw: unknown) => {
handleWorkerMessage(workflowName, raw);
});
child.on("exit", (code) => {
handleWorkerExit(workflowName, code);
});
const entry: WorkerEntry = { workflowName, process: child, stopping: false, draining: false };
workers.set(workflowName, entry);
return entry;
}
function startWorkflow(workflowName: string, payload: unknown): void {
if (stopped) return;
const wfConfig = workflowConfig(workflowName);
if (wfConfig === null) {
process.stderr.write(
`[workflow-manager] startWorkflow: unknown workflow "${workflowName}"\n`,
);
return;
}
const state = getOrCreateState(workflowName);
const runId = crypto.randomUUID();
if (state.active.size < wfConfig.concurrency) {
dispatchThread(workflowName, runId, payload);
return;
}
// At concurrency limit — apply overflow policy
if (wfConfig.overflow === "drop") {
logWorkflowEvent(workflowName, runId, "dropped");
process.stderr.write(
`[workflow-manager] dropped thread for "${workflowName}" (overflow=drop)\n`,
);
return;
}
// overflow === "queue"
const maxQueue = (wfConfig as { maxQueue?: number }).maxQueue ?? DEFAULT_MAX_QUEUE;
if (state.queue.length >= maxQueue) {
const oldest = state.queue.shift();
if (oldest !== undefined) {
process.stderr.write(
`[workflow-manager] queue overflow for "${workflowName}", dropping oldest runId "${oldest.runId}"\n`,
);
logWorkflowEvent(workflowName, oldest.runId, "dropped");
}
}
state.queue.push({ runId, payload });
logWorkflowEvent(workflowName, runId, "queued");
process.stderr.write(
`[workflow-manager] queued thread for "${workflowName}" runId "${runId}" (queue length: ${state.queue.length})\n`,
);
}
function activeCount(workflowName: string): number {
return states.get(workflowName)?.active.size ?? 0;
}
function queueLength(workflowName: string): number {
return states.get(workflowName)?.queue.length ?? 0;
}
function totalActiveCount(): number {
let total = 0;
for (const state of states.values()) {
total += state.active.size;
}
return total;
}
function updateConfig(newConfig: NerveConfig): void {
config = newConfig;
}
/**
* Default drain timeout must be at least WORKER_SHUTDOWN_TIMEOUT_MS so the worker
* has enough time to finish in-flight threads before the parent force-kills it.
*/
const DEFAULT_DRAIN_TIMEOUT_MS = Math.max(30_000, WORKER_SHUTDOWN_TIMEOUT_MS + 5_000);
async function drainAndRespawn(
workflowName: string,
drainTimeoutMs: number = DEFAULT_DRAIN_TIMEOUT_MS,
): Promise<void> {
const entry = workers.get(workflowName);
if (entry === undefined) {
// No active worker — nothing to drain
return;
}
entry.draining = true;
// Send shutdown without setting stopping=true (so the exit handler uses the draining branch)
if (entry.process.connected) {
const msg: ShutdownMessage = { type: "shutdown" };
try {
entry.process.send(msg);
} catch {
// IPC closed
}
}
await waitForExit(entry.process, drainTimeoutMs);
// The exit handler (draining branch) will respawn the worker automatically
}
async function stop(): Promise<void> {
stopped = true;
const exitPromises: Promise<void>[] = [];
for (const entry of workers.values()) {
sendShutdown(entry.process, entry);
exitPromises.push(waitForExit(entry.process, 5000));
}
await Promise.all(exitPromises);
workers.clear();
}
return {
startWorkflow,
activeCount,
queueLength,
totalActiveCount,
updateConfig,
drainAndRespawn,
stop,
};
}
+341
View File
@@ -0,0 +1,341 @@
/**
* Workflow Worker runtime bootstrap.
*
* Entry point for a forked workflow worker process.
* Receives the workflow name and nerve root via CLI args, dynamically imports
* the user's WorkflowDefinition, then signals ready and enters the IPC event loop.
*
* Layout assumptions (nerve user config at `<nerveRoot>/`):
* workflows/<name>/index.ts (or .js) ← user workflow definition
*/
import { existsSync } from "node:fs";
import { join, resolve } from "node:path";
import type {
CommandEvent,
ThreadState,
WorkflowContext,
WorkflowDefinition,
} from "@uncaged/nerve-core";
import type { ThreadCommandEventMessage, ThreadEventType, WorkerToParentMessage } from "./ipc.js";
import { parseParentMessage } from "./ipc.js";
// ---------------------------------------------------------------------------
// IPC helpers
// ---------------------------------------------------------------------------
function send(msg: WorkerToParentMessage): void {
if (process.send) {
process.send(msg);
}
}
function sendReady(): void {
send({ type: "ready" });
}
function sendThreadEvent(runId: string, eventType: ThreadEventType, payload: unknown): void {
send({ type: "thread-event", runId, eventType, payload });
}
function sendWorkflowError(runId: string, error: string): void {
send({ type: "workflow-error", runId, error });
}
function sendCommandEvent(runId: string, event: CommandEvent): void {
const msg: ThreadCommandEventMessage = {
type: "thread-command-event",
runId,
event: event as { type: string; [key: string]: unknown },
};
send(msg);
}
// ---------------------------------------------------------------------------
// Thread loop (RFC-002 §5.4)
// ---------------------------------------------------------------------------
/**
* Replay persisted events through moderate() to reconstruct ThreadState,
* then execute the next role and return the resulting CommandEvent.
* Returns null if the thread is already complete (moderate returned null).
*/
async function replayAndResume(
def: WorkflowDefinition,
runId: string,
ctx: WorkflowContext,
state: ThreadState,
resumeEvents: CommandEvent[],
): Promise<CommandEvent | null> {
let lastNext: ReturnType<typeof def.moderate> = null;
for (const ev of resumeEvents) {
state.events.push(ev);
lastNext = def.moderate(state, ev);
if (lastNext === null) {
sendThreadEvent(runId, "completed", null);
return null;
}
}
const next = lastNext;
if (next === null) {
sendThreadEvent(runId, "completed", null);
return null;
}
const role = def.roles[next.role];
if (!role) {
sendWorkflowError(runId, `Unknown role: ${next.role}`);
return null;
}
try {
const event = await role.execute(next.prompt, ctx);
sendCommandEvent(runId, event);
return event;
} catch (e: unknown) {
const errMsg = e instanceof Error ? e.message : String(e);
sendThreadEvent(runId, "failed", { error: errMsg });
return null;
}
}
async function runThread(
def: WorkflowDefinition,
workflowName: string,
runId: string,
triggerPayload: unknown,
/** Pre-existing event history for crash-recovery resume. Empty for a fresh thread. */
resumeEvents: CommandEvent[] = [],
): Promise<void> {
const state: ThreadState = { runId, events: [] };
const ctx: WorkflowContext = {
runId,
workflowName,
log: (msg) => sendThreadEvent(runId, "step_complete", { message: msg }),
};
const initialEvent: CommandEvent = {
type: "thread_start",
triggerPayload:
triggerPayload != null && typeof triggerPayload === "object" ? triggerPayload : {},
};
// On resume: replay persisted events, run the next un-executed role, then continue.
if (resumeEvents.length > 0) {
const nextEvent = await replayAndResume(def, runId, ctx, state, resumeEvents);
if (nextEvent === null) return;
await continueThread(def, runId, ctx, state, nextEvent);
return;
}
// Fresh thread — send the initial command event and enter the loop.
sendCommandEvent(runId, initialEvent);
await continueThread(def, runId, ctx, state, initialEvent);
}
async function continueThread(
def: WorkflowDefinition,
runId: string,
ctx: WorkflowContext,
state: ThreadState,
firstEvent: CommandEvent,
): Promise<void> {
let event = firstEvent;
const MAX_STEPS = 1000;
let step = 0;
while (step < MAX_STEPS) {
step++;
state.events.push(event);
const next = def.moderate(state, event);
if (next === null) {
sendThreadEvent(runId, "completed", null);
return;
}
const role = def.roles[next.role];
if (!role) {
sendWorkflowError(runId, `Unknown role: ${next.role}`);
return;
}
try {
event = await role.execute(next.prompt, ctx);
} catch (e: unknown) {
const errMsg = e instanceof Error ? e.message : String(e);
sendThreadEvent(runId, "failed", { error: errMsg });
return;
}
sendCommandEvent(runId, event);
}
if (step >= MAX_STEPS) {
sendWorkflowError(runId, `Thread exceeded maximum steps (${MAX_STEPS})`);
}
}
// ---------------------------------------------------------------------------
// Workflow definition loader
// ---------------------------------------------------------------------------
async function loadWorkflowDefinition(
nerveRoot: string,
workflowName: string,
): Promise<WorkflowDefinition> {
const candidates = [
resolve(join(nerveRoot, "workflows", workflowName, "index.ts")),
resolve(join(nerveRoot, "workflows", workflowName, "index.js")),
];
const indexPath = candidates.find((p) => existsSync(p));
if (!indexPath) {
throw new Error(
`Workflow definition not found for "${workflowName}". Tried:\n${candidates.map((p) => ` ${p}`).join("\n")}`,
);
}
const mod = await import(indexPath);
const def: unknown = mod.default ?? mod;
if (
def === null ||
typeof def !== "object" ||
typeof (def as WorkflowDefinition).moderate !== "function" ||
typeof (def as WorkflowDefinition).roles !== "object"
) {
throw new Error(
`Workflow "${workflowName}" must export a WorkflowDefinition with "roles" and "moderate".`,
);
}
return def as WorkflowDefinition;
}
// ---------------------------------------------------------------------------
// IPC message dispatch
// ---------------------------------------------------------------------------
function handleMessage(
raw: unknown,
def: WorkflowDefinition,
workflowName: string,
inFlight: Map<string, Promise<void>>,
shuttingDown: { value: boolean },
): void {
const parseResult = parseParentMessage(raw);
if (!parseResult.ok) {
process.stderr.write(`[workflow-worker] Invalid IPC message: ${parseResult.error.message}\n`);
return;
}
const msg = parseResult.value;
if (msg.type === "shutdown") {
shuttingDown.value = true;
const timeout = new Promise<void>((r) => setTimeout(r, 10_000));
Promise.race([Promise.all(inFlight.values()), timeout])
.then(() => process.exit(0))
.catch(() => process.exit(1));
return;
}
if (msg.type === "start-thread") {
if (shuttingDown.value) return;
const { runId, triggerPayload } = msg;
const previous = inFlight.get(runId) ?? Promise.resolve();
const next = previous
.then(() => runThread(def, workflowName, runId, triggerPayload))
.catch((e: unknown) => {
const errMsg = e instanceof Error ? e.message : String(e);
sendWorkflowError(runId, errMsg);
})
.finally(() => {
inFlight.delete(runId);
});
inFlight.set(runId, next);
return;
}
if (msg.type === "resume-thread") {
if (shuttingDown.value) return;
const { runId, events, triggerPayload } = msg;
const previous = inFlight.get(runId) ?? Promise.resolve();
const next = previous
.then(() => runThread(def, workflowName, runId, triggerPayload, events as CommandEvent[]))
.catch((e: unknown) => {
const errMsg = e instanceof Error ? e.message : String(e);
sendWorkflowError(runId, errMsg);
})
.finally(() => {
inFlight.delete(runId);
});
inFlight.set(runId, next);
return;
}
}
// ---------------------------------------------------------------------------
// Bootstrap
// ---------------------------------------------------------------------------
async function bootstrap(nerveRoot: string, workflowName: string): Promise<void> {
let def: WorkflowDefinition;
try {
def = await loadWorkflowDefinition(nerveRoot, workflowName);
} catch (e: unknown) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[workflow-worker] Failed to load workflow "${workflowName}": ${msg}\n`);
process.exit(1);
}
const inFlight = new Map<string, Promise<void>>();
const shuttingDown = { value: false };
sendReady();
process.on("message", (raw: unknown) => {
handleMessage(raw, def, workflowName, inFlight, shuttingDown);
});
}
// ---------------------------------------------------------------------------
// CLI entrypoint
// ---------------------------------------------------------------------------
function parseArgs(): { nerveRoot: string; workflow: string } | null {
const args = process.argv.slice(2);
let workflow: string | null = null;
let nerveRoot: string | null = null;
for (let i = 0; i < args.length; i++) {
if (args[i] === "--workflow" && i + 1 < args.length) {
workflow = args[i + 1];
i++;
} else if (args[i] === "--root" && i + 1 < args.length) {
nerveRoot = args[i + 1];
i++;
}
}
if (!workflow || !nerveRoot) return null;
return { nerveRoot, workflow };
}
const parsed = parseArgs();
if (!parsed) {
process.stderr.write("Usage: workflow-worker --workflow <name> --root <nerve-root-dir>\n");
process.exit(1);
}
bootstrap(parsed.nerveRoot, parsed.workflow).catch((e) => {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[workflow-worker] Unhandled bootstrap error: ${msg}\n`);
process.exit(1);
});