RFC-006: WorkerRuntime — message-routed process management #279

Closed
opened 2026-04-30 12:50:12 +00:00 by xiaomo · 1 comment
Owner

Summary

Extract a generic WorkerRuntime<K> module from the daemon, replacing the current ad-hoc process management scattered across worker-pool.ts, workflow-manager.ts, and worker-fork-support.ts.

The key insight: our worker processes are containers for user code. The process itself is an implementation detail — what matters is routing messages to the right handler with automatic lifecycle management.

Motivation

Current problems:

  1. Duplicated logic — stderr capture, crash backoff, graceful shutdown, exit handling all implemented twice (sense pool + workflow manager)
  2. Mixed concernsworkflow-manager.ts (792 lines) mixes process management with thread tracking, concurrency control, and queue overflow
  3. Hard to test — testing crash recovery requires setting up full sense/workflow context
  4. Hard to extend — adding a new worker type means reimplementing fork/crash/drain from scratch

Design

Core Abstraction

Inspired by Cloudflare Workers — message-routed, auto-lifecycle:

type WorkerRuntimeConfig<K extends string> = {
  script: string
  argsForKey: (key: K) => string[]
  onMessage: (key: K, msg: unknown) => void
  onReady: (key: K) => void
  onExit: (key: K, code: number | null, signal: string | null) => void
  respawn: {
    enabled: boolean
    maxCrashes: number        // max crashes within window before giving up
    windowMs: number          // crash counting window
    delayMs: number           // delay before respawn
  }
  shutdownTimeoutMs: number   // grace period for shutdown
}

type WorkerRuntime<K extends string> = {
  // Core API: route message to worker (cold-starts if needed)
  send: (key: K, msg: unknown) => Promise<void>

  // Lifecycle
  start: (key: K) => Promise<void>     // explicit cold-start
  evict: (key: K) => Promise<void>     // graceful remove
  drain: (key: K) => Promise<void>     // shutdown + respawn (hot reload)
  shutdown: () => Promise<void>        // stop everything

  // Observability
  has: (key: K) => boolean
  pid: (key: K) => number | null
  keys: () => K[]
  stderrTail: (key: K) => string
}

Internal: ManagedWorker

Single-process lifecycle manager (not exposed to consumers):

type WorkerState = "starting" | "ready" | "draining" | "stopped"

// Internal to WorkerRuntime
type ManagedWorker = {
  state: WorkerState
  pid: number | null
  process: ChildProcess | null
  stderrTail: string
  crashTimestamps: number[]
}

State machine:

          start()
  stopped ───────→ starting ───ready msg──→ ready
     ↑                                       │
     │              crash                    │ drain() / evict()
     │            (auto-respawn)             │
     │                ↓                      ↓
     └──────────── stopped ←──────────── draining

How It Replaces Current Code

Current New
worker-pool.ts (SenseWorkerPool) WorkerRuntime<GroupName> + thin sense routing on top
workflow-manager.ts process parts WorkerRuntime<WorkflowName> + thread/concurrency logic on top
worker-fork-support.ts Absorbed into ManagedWorker internals

Consumers Become Thinner

Sense layer: only handles compute scheduling + signal routing. Process lifecycle delegated to runtime.

Workflow layer: only handles thread tracking, concurrency control, queue/drop overflow. Process lifecycle delegated to runtime.

Message Flow

Kernel
  │
  ├── send("system", computeMsg)  ──→  WorkerRuntime ──fork if needed──→ sense-worker
  │                                         │
  │   onMessage("system", signalMsg) ←──────┘
  │
  ├── send("review", startThreadMsg) ─→ WorkerRuntime ──fork if needed──→ wf-worker
  │                                          │
  │   onMessage("review", threadEvent) ←─────���

What Does NOT Change

  • Worker scripts (sense-worker.ts, workflow-worker.ts) — untouched
  • IPC protocol (ipc.ts) — untouched
  • Kernel orchestration logic — same, just calls WorkerRuntime instead of raw pool
  • Signal bus, scheduler, file watcher — untouched

Phase Plan

Phase 1: Extract WorkerRuntime

  • Create packages/daemon/src/worker-runtime.ts
  • Implement ManagedWorker + WorkerRuntime<K> with fork/crash/drain/shutdown
  • Absorb worker-fork-support.ts into it
  • Unit tests for: cold start, crash respawn, crash limit, drain+respawn, graceful shutdown
  • Does NOT touch sense/workflow yet

Phase 2: Migrate Sense Pool

  • Replace worker-pool.ts (SenseWorkerPool) with WorkerRuntime<string> + thin wrapper
  • Update kernel to use new API
  • Delete worker-pool.ts
  • All sense tests must pass

Phase 3: Migrate Workflow Manager

  • Extract process management from workflow-manager.ts into WorkerRuntime<string>
  • workflow-manager.ts shrinks to: thread tracking, concurrency, queue/overflow, drainWhenIdle coordination
  • All workflow tests must pass

Phase 4: Cleanup

  • Delete worker-fork-support.ts
  • Update docs / knowledge cards
  • Final verification

Phase Tracking

Phase Issue Status
Phase 1: WorkerRuntime #280 🔲
Phase 2: Sense Pool migration #281 🔲
Phase 3: Workflow Manager migration #282 🔲
Phase 4: Cleanup + docs #283 🔲

Non-goals

  • Changing worker scripts (sense-worker.ts, workflow-worker.ts)
  • Changing the IPC message protocol
  • Adding worker_threads support (stay with fork for process isolation)
  • Dynamic routing / load balancing (1:1 key→worker mapping is sufficient)
## Summary Extract a generic `WorkerRuntime<K>` module from the daemon, replacing the current ad-hoc process management scattered across `worker-pool.ts`, `workflow-manager.ts`, and `worker-fork-support.ts`. The key insight: our worker processes are **containers for user code**. The process itself is an implementation detail — what matters is **routing messages to the right handler** with automatic lifecycle management. ## Motivation Current problems: 1. **Duplicated logic** — stderr capture, crash backoff, graceful shutdown, exit handling all implemented twice (sense pool + workflow manager) 2. **Mixed concerns** — `workflow-manager.ts` (792 lines) mixes process management with thread tracking, concurrency control, and queue overflow 3. **Hard to test** — testing crash recovery requires setting up full sense/workflow context 4. **Hard to extend** — adding a new worker type means reimplementing fork/crash/drain from scratch ## Design ### Core Abstraction Inspired by Cloudflare Workers — message-routed, auto-lifecycle: ```typescript type WorkerRuntimeConfig<K extends string> = { script: string argsForKey: (key: K) => string[] onMessage: (key: K, msg: unknown) => void onReady: (key: K) => void onExit: (key: K, code: number | null, signal: string | null) => void respawn: { enabled: boolean maxCrashes: number // max crashes within window before giving up windowMs: number // crash counting window delayMs: number // delay before respawn } shutdownTimeoutMs: number // grace period for shutdown } type WorkerRuntime<K extends string> = { // Core API: route message to worker (cold-starts if needed) send: (key: K, msg: unknown) => Promise<void> // Lifecycle start: (key: K) => Promise<void> // explicit cold-start evict: (key: K) => Promise<void> // graceful remove drain: (key: K) => Promise<void> // shutdown + respawn (hot reload) shutdown: () => Promise<void> // stop everything // Observability has: (key: K) => boolean pid: (key: K) => number | null keys: () => K[] stderrTail: (key: K) => string } ``` ### Internal: ManagedWorker Single-process lifecycle manager (not exposed to consumers): ```typescript type WorkerState = "starting" | "ready" | "draining" | "stopped" // Internal to WorkerRuntime type ManagedWorker = { state: WorkerState pid: number | null process: ChildProcess | null stderrTail: string crashTimestamps: number[] } ``` State machine: ``` start() stopped ───────→ starting ───ready msg──→ ready ↑ │ │ crash │ drain() / evict() │ (auto-respawn) │ │ ↓ ↓ └──────────── stopped ←──────────── draining ``` ### How It Replaces Current Code | Current | New | |---------|-----| | `worker-pool.ts` (SenseWorkerPool) | `WorkerRuntime<GroupName>` + thin sense routing on top | | `workflow-manager.ts` process parts | `WorkerRuntime<WorkflowName>` + thread/concurrency logic on top | | `worker-fork-support.ts` | Absorbed into ManagedWorker internals | ### Consumers Become Thinner **Sense layer:** only handles compute scheduling + signal routing. Process lifecycle delegated to runtime. **Workflow layer:** only handles thread tracking, concurrency control, queue/drop overflow. Process lifecycle delegated to runtime. ### Message Flow ``` Kernel │ ├── send("system", computeMsg) ──→ WorkerRuntime ──fork if needed──→ sense-worker │ │ │ onMessage("system", signalMsg) ←──────┘ │ ├── send("review", startThreadMsg) ─→ WorkerRuntime ──fork if needed──→ wf-worker │ │ │ onMessage("review", threadEvent) ←─────��� ``` ### What Does NOT Change - Worker scripts (`sense-worker.ts`, `workflow-worker.ts`) — untouched - IPC protocol (`ipc.ts`) — untouched - Kernel orchestration logic — same, just calls `WorkerRuntime` instead of raw pool - Signal bus, scheduler, file watcher — untouched ## Phase Plan ### Phase 1: Extract WorkerRuntime - Create `packages/daemon/src/worker-runtime.ts` - Implement `ManagedWorker` + `WorkerRuntime<K>` with fork/crash/drain/shutdown - Absorb `worker-fork-support.ts` into it - Unit tests for: cold start, crash respawn, crash limit, drain+respawn, graceful shutdown - Does NOT touch sense/workflow yet ### Phase 2: Migrate Sense Pool - Replace `worker-pool.ts` (SenseWorkerPool) with `WorkerRuntime<string>` + thin wrapper - Update kernel to use new API - Delete `worker-pool.ts` - All sense tests must pass ### Phase 3: Migrate Workflow Manager - Extract process management from `workflow-manager.ts` into `WorkerRuntime<string>` - `workflow-manager.ts` shrinks to: thread tracking, concurrency, queue/overflow, drainWhenIdle coordination - All workflow tests must pass ### Phase 4: Cleanup - Delete `worker-fork-support.ts` - Update docs / knowledge cards - Final verification ## Phase Tracking | Phase | Issue | Status | |-------|-------|--------| | Phase 1: WorkerRuntime | #280 | 🔲 | | Phase 2: Sense Pool migration | #281 | 🔲 | | Phase 3: Workflow Manager migration | #282 | 🔲 | | Phase 4: Cleanup + docs | #283 | 🔲 | ## Non-goals - Changing worker scripts (sense-worker.ts, workflow-worker.ts) - Changing the IPC message protocol - Adding worker_threads support (stay with fork for process isolation) - Dynamic routing / load balancing (1:1 key→worker mapping is sufficient)
Owner

RFC-006 全部 4 个 Phase 已完成并合并(#280 Phase 1, #292 Phase 2, #295 Phase 3, Phase 4 cleanup)。WorkerRuntime 已在 worker-pool.ts 和 workflow-manager.ts 中投入使用。

— 小橘 🍊(NEKO Team)

RFC-006 全部 4 个 Phase 已完成并合并(#280 Phase 1, #292 Phase 2, #295 Phase 3, Phase 4 cleanup)。WorkerRuntime 已在 worker-pool.ts 和 workflow-manager.ts 中投入使用。 — 小橘 🍊(NEKO Team)
This repo is archived. You cannot comment on issues.
No Label
2 Participants
Due Date
No due date set.
Dependencies

No dependencies set.

Reference: uncaged/nerve#279