Compare commits

...

2 Commits

Author SHA1 Message Date
xiaoju c8e42c838b fix(daemon): harden state persistence, ReadonlyArray triggers
1. writeState: atomic write via temp file + rename
2. readState: distinguish missing file vs corrupt JSON (warn on error)
3. executeCompute: write disk before updating memory state
4. SenseInfo.triggers: ReadonlyArray<string>
5. CLAUDE.md: added Sense State Persistence docs
6. .knowledge/: updated architecture, sense, worker-isolation,
   storage-layer, cli, coding-conventions for stateful sense

Fixes #313
2026-05-01 12:08:59 +00:00
xiaomo eb7de9954f Merge pull request 'refactor: Stateful Sense (RFC #308)' (#312) from refactor/308-stateful-sense into main 2026-05-01 10:20:46 +00:00
9 changed files with 87 additions and 77 deletions
+5 -17
View File
@@ -5,27 +5,19 @@ Observation engine for autonomous agents — sense the world, react to changes,
## Core Pipeline
```
External World → Sense → Signal → Workflow → Log
compute() returns
{ signal, workflow }
External World → Sense(state) → { newState, workflow? } → Workflow → Log
```
Causality is **one-directional**. Logs are the end of the chain — they cannot trigger further Senses (prevents feedback loops).
Causality is **one-directional**. Logs are the end of the chain — they cannot trigger Senses (prevents feedback loops).
## Two Extension Points
| Extension | Question | Nature |
|-----------|----------|--------|
| **Sense** | What to observe & when to react | `compute()` pure function + YAML config (interval / on) |
| **Sense** | What to observe & when to react | `compute(state)` stateful function + YAML config (interval / on) |
| **Workflow** | What to do | Roles + Moderator |
Senses own both the "what" (compute logic) and the "when" (config-driven scheduling). A Sense can trigger a Workflow directly by returning `{ signal, workflow: { name, prompt } }`.
## Two Event Types
- **Signal** — from Sense compute (non-null return). Pure fact, no intent. Drives the front half (perception).
- **Command Event** — inside Workflow Threads. Has causal chain, must be responded to. Drives the back half (action).
Senses own both the "what" (compute logic) and the "when" (config-driven scheduling). A Sense can trigger a Workflow by returning `{ state, workflow: { name, prompt, maxRounds, dryRun } }`.
## Process Isolation
@@ -38,10 +30,6 @@ Senses own both the "what" (compute logic) and the "when" (config-driven schedul
## Storage Systems
- **Log Store** — SQLite with WAL mode for audit trails and workflow state
- **Sense Databases** — Isolated SQLite per sense group for private data
- **Sense State** — JSON files per sense (`data/senses/<name>.json`), atomically written
- **Knowledge Store** — Vector search index for project context
- **Blob Store** — Content-addressable storage for large artifacts
## Signal Flow
Sense compute outputs are routed through signal routing logic that determines whether to emit a signal or trigger a workflow—never both simultaneously.
+8 -10
View File
@@ -18,21 +18,18 @@ nerve daemon # restart daemon (stop + start)
### Init Behavior
**Default `nerve init`**: Creates workspace at `~/.uncaged-nerve/`. If this directory already exists and is non-empty, **exits with error** requiring `--force` flag. No merge/overwrite logic — prevents accidental workspace destruction.
**Default `nerve init`**: Creates workspace at `~/.uncaged-nerve/`. If this directory already exists and is non-empty, **exits with error** requiring `--force` flag.
**Force mode `nerve init --force`**: Reinitializes workspace even if `~/.uncaged-nerve/` exists. **Preserves `data/` directory** (containing sense SQLite databases and logs) but overwrites all config files (`nerve.yaml`, `package.json`, etc.) and example senses.
**Force mode `nerve init --force`**: Reinitializes workspace even if `~/.uncaged-nerve/` exists. **Preserves `data/` directory** (containing sense state files and logs) but overwrites all config files.
**Git clone `nerve init --from <url>`**: Clones existing repository to `~/.uncaged-nerve/`. Requires empty target directory — fails if workspace already exists and is non-empty.
**Git clone `nerve init --from <url>`**: Clones existing repository to `~/.uncaged-nerve/`. Requires empty target directory.
## Sense Management
```bash
nerve create sense <name> # scaffold a new sense (compute.ts + schema.ts)
nerve create sense <name> # scaffold a new sense (compute + initialState)
nerve sense list # list configured senses
nerve sense trigger <name> # manually trigger a sense compute
nerve sense schema <name> # show sense Drizzle schema
nerve sense query <name> # inspect sense SQLite database
nerve sense query <name> --sql "SELECT * FROM samples LIMIT 5"
```
## Workflow Management
@@ -77,12 +74,13 @@ my-agent/
knowledge.yaml # knowledge index config (optional)
senses/
cpu-usage/
compute.ts # sense implementation
schema.ts # Drizzle schema
migrations/ # auto-generated
src/index.ts # compute(state) + initialState export
workflows/
cleanup/
src/index.ts # workflow definition
data/
senses/ # sense state JSON files (auto-generated)
logs.db # log store (auto-generated)
knowledge.db # generated by nerve knowledge sync
.knowledge/ # curated knowledge cards
```
+1 -1
View File
@@ -58,7 +58,7 @@ No compiler enforcement - relies on manual discipline and TypeScript's flow cont
**Primary exports** use descriptive, unambiguous names:
- Functions: `createXxx()`, `parseXxx()`, `xxxAgent()` (e.g., `createCursorAdapter`, `cursorAgent`)
- Types: Domain-specific prefixes (e.g., `CursorAgentOptions`, `SenseComputeFn`, `ThreadContext`)
- Constants: `UPPER_SNAKE_CASE` with context (e.g., `DEFAULT_SENSE_SIGNAL_RETENTION`, `CURSOR_ADAPTER_DEFAULT_MS`)
- Constants: `UPPER_SNAKE_CASE` with context (e.g., `DEFAULT_ENGINE_MAX_ROUNDS`, `CURSOR_ADAPTER_DEFAULT_MS`)
**Avoiding ambiguity**:
- Package-scoped naming: `@uncaged/nerve-adapter-cursor` exports `cursorAgent`, `createCursorAdapter`
+30 -24
View File
@@ -1,33 +1,41 @@
# Sense
A `compute()` function that samples or derives external data. The only first-class citizen in nerve.
A stateful `compute(state)` function that samples or derives external data. Returns new state and an optional workflow trigger.
## Contract
Each sense module (`src/index.ts`) must export:
```ts
export { snapshots as table } from "./schema.ts"; // drizzle table for runtime to insert into
type MyState = { lastRun: number | null };
export async function compute(): Promise<ComputeResult<T>> { ... } // pure, no args
export const initialState: MyState = { lastRun: null };
export async function compute(state: MyState): Promise<{
state: MyState;
workflow: WorkflowTrigger | null;
}> {
// ... observe external world, derive new state
return { state: { lastRun: Date.now() }, workflow: null };
}
```
**Function Signature & Input Schema:**
- `compute()` is **parameterless** — no direct inputs, environment variables available
- No database access within compute — runtime provides isolated execution context
- Must be pure function (no side effects, no external API calls)
**Function Signature:**
- `compute(state: S)` — receives previous state (or `initialState` on first run)
- Returns `{ state: S; workflow: WorkflowTrigger | null }`
- `workflow: null` → no workflow triggered
- `workflow: { name, maxRounds, prompt, dryRun }` → triggers a workflow
**Return Value Contract:**
- `ComputeResult<T>` = `null | { signal: T; workflow: WorkflowTrigger | null }`
- `null` → silent, no storage, no signal
- `{ signal: data, workflow: null }` → persist + emit signal
- `{ signal, workflow: WorkflowTrigger }` → persist + emit signal + trigger workflow
- Any other value → treated as `{ signal: value, workflow: null }`
**State Persistence:**
- State stored as JSON at `data/senses/<name>.json`
- Read on worker startup; if missing or corrupt, `initialState` is used
- Written atomically (temp file + rename) after each successful compute
- Memory state updated only after disk write succeeds
**Error Handling & Serialization:**
- Exceptions caught by worker, logged as errors (no signal emitted)
- Signal payload must be JSON-serializable (passed via IPC)
- Invalid workflow triggers silently dropped (signal still emitted)
**Error Handling:**
- Exceptions caught by worker, logged as errors (state unchanged)
- State payload must be JSON-serializable
- Invalid workflow triggers rejected by daemon (workflow not started, compute still succeeds)
**Timeout & Scheduling Semantics:**
- Timeout priority: explicit config → AbortSignal → DEFAULT_TIMEOUT_MS (30s)
@@ -46,16 +54,14 @@ senses:
timeout: 30s # max compute duration
grace_period: 5s # wait before first compute
interval: 30s # periodic trigger (optional)
on: [disk-pressure] # trigger on signals from other senses (optional)
on: [disk-pressure] # trigger when another sense completes (optional)
```
## Manual Trigger Context
**`nerve sense trigger <name>`** sends IPC message to running daemon. The compute context is initialized as follows:
**`nerve sense trigger <name>`** sends IPC message to running daemon. The compute runs in the sense's worker process with:
- **SQLite Database**: Opened in **read-write mode** at `data/senses/<name>.db`
- **Migrations**: All `*.sql` files in `senses/<name>/migrations/` applied in lexicographic order
- **Environment**: Inherits daemon process environment (no special secrets injection)
- **Arguments**: No runtime arguments or mock inputs supported — `compute()` is always pure function with no parameters
- **State**: Read from `data/senses/<name>.json` (or `initialState` if missing)
- **Environment**: Inherits daemon process environment
- **Isolation**: Runs in forked child process (worker) with full filesystem access within user permissions
- **Persistence**: Runtime automatically calls `db.insert(table).values(result.signal)` if compute returns non-null signal
- **Persistence**: State written to JSON file after successful compute
+12 -13
View File
@@ -18,13 +18,13 @@ Append-only audit trail implemented in SQLite with WAL mode.
- Configurable log archival to JSONL files
- Full-text search across log entries
### 2. Sense Databases
Each sense group gets its own SQLite database for private state.
### 2. Sense State Files
Each sense persists its state as a JSON file.
**Characteristics:**
- Isolated per sense group (e.g., `system-senses.db`)
- Managed by individual sense compute functions
- Drizzle ORM integration for schema management
- One JSON file per sense at `data/senses/<name>.json`
- Atomically written (temp file + rename) after each successful compute
- Read on worker startup; `initialState` used if missing or corrupt
- No cross-sense data sharing
### 3. Knowledge Store (`knowledge.db`)
@@ -74,11 +74,10 @@ function runInTransaction<T>(db: DatabaseSync, fn: () => T): T {
- `upsertWorkflowRun()` — atomically writes log entry + workflow state
- `archiveLogs()` — transactional export + delete + watermark update
#### Sense Database Isolation
- Each sense group has its own SQLite file (e.g., `system-senses.db`)
- No cross-sense transactions or coordination required
- Independent schema migrations per sense
- Private `_signals` table for signal history retention
#### Sense State Isolation
- Each sense has its own JSON state file
- No cross-sense coordination required
- State files are independent and self-contained
### Process-Level Isolation
@@ -107,14 +106,14 @@ workflows:
**No Cross-Database Consistency**:
- No distributed transactions across multiple SQLite files
- Log Store and Sense Databases can temporarily diverge during failures
- Signal emission and workflow triggering are separate, non-atomic operations
- State persistence and workflow triggering are separate, non-atomic operations
**Failure Recovery Mechanisms**:
- **Sense worker crash**: State rebuilt from sense SQLite database on respawn
- **Sense worker crash**: State rebuilt from JSON state file on respawn (or `initialState` if corrupt)
- **Workflow worker crash**: Thread state recovered from log store message history
- **Kernel crash**: All workers respawned, state recovered from persistent stores
- **Log Store corruption**: WAL recovery on database open
- **Sense DB corruption**: Migrations re-run, `_signals` table rebuilt if needed
- **Sense state corruption**: Falls back to `initialState` with stderr warning
**Rollback Scenarios**:
- **Log write failure**: Transaction rolled back, no state changes persisted
+5 -5
View File
@@ -24,7 +24,7 @@ Worker **entrypoints** (`sense-worker.ts`, `workflow-worker.ts`) import lightwei
- **One worker per sense group** (configured in `nerve.yaml`)
- Groups share a child process but have isolated execution contexts
- Crash in one sense doesn't affect other groups
- Each group has its own SQLite database
- Each group has its own JSON state files
### 2. Workflow Workers
- **One worker per workflow type** (spawned on-demand)
@@ -59,7 +59,7 @@ workflow trigger → check existing worker → reuse or spawn
### Kernel ↔ Sense Worker
- IPC via child process stdio
- JSON-formatted messages
- Worker reports signals back to kernel
- Worker reports compute results back to kernel
- Bidirectional: kernel can request immediate computes
### Kernel ↔ Workflow Worker
@@ -128,7 +128,7 @@ process.on("SIGTERM", () => {});
**Sense Workers**:
- IPC `shutdown` message → `process.exit(0)` (immediate)
- No graceful termination period for senses
- State rebuilt from SQLite on respawn (no handoff needed)
- State rebuilt from JSON state files on respawn (no handoff needed)
**Workflow Workers**:
- IPC `shutdown` → wait for in-flight threads to complete
@@ -138,14 +138,14 @@ process.on("SIGTERM", () => {});
**State Handoff Mechanism**:
- No explicit state transfer between old/new workers
- Sense workers: SQLite database contains full state
- Sense workers: JSON state files contain full state
- Workflow workers: Log store contains thread message history
- Kernel coordinates recovery via `recoverThreadsForWorker()`
## Failure Handling
### Worker Crashes
- **Sense workers**: Automatic respawn after 1s delay, state rebuilt from DB
- **Sense workers**: Automatic respawn after 1s delay, state rebuilt from JSON
- **Workflow workers**: Crash recovery from log store thread messages
- **Kernel protection**: Main process continues, marks affected runs as crashed
- **Crash limits**: Max 5 crashes per workflow in 60s window (prevents infinite respawn)
+13
View File
@@ -28,6 +28,19 @@ External World → Sense(state) → { newState, workflow? } → Workflow → Log
### Sense State Persistence
Each sense's state is persisted as a JSON file at `data/senses/<name>.json` (relative to the nerve root, typically `~/.uncaged-nerve/`).
| Event | Behavior |
|-------|----------|
| **Worker start** | Read `state.json`; if missing or corrupt, use `initialState` from the sense module |
| **Compute success** | Write new state atomically (write-temp + rename), then update in-memory state |
| **Compute failure** | State unchanged (both disk and memory) |
| **Daemon restart** | State restored from last successful write |
State files are written atomically (temp file + rename) to prevent corruption on crash.
## Language & Paradigm
### Functional-first
+1 -1
View File
@@ -8,7 +8,7 @@ export type SenseInfo = {
throttle: number | null;
timeout: number | null;
/** Declarative schedule (`interval` / `on`) for this sense (derived from nerve.yaml). */
triggers: string[];
triggers: ReadonlyArray<string>;
};
/**
+12 -6
View File
@@ -1,5 +1,5 @@
import { mkdirSync, readFileSync, writeFileSync } from "node:fs";
import { dirname } from "node:path";
import { existsSync, mkdirSync, readFileSync, renameSync, writeFileSync } from "node:fs";
import { dirname, join } from "node:path";
import type { Result, SenseComputeFn, WorkflowTrigger } from "@uncaged/nerve-core";
import { err, isPlainRecord, ok } from "@uncaged/nerve-core";
@@ -14,16 +14,22 @@ export type SenseRuntime = {
export function readState(statePath: string, initialState: unknown): unknown {
try {
if (!existsSync(statePath)) return initialState;
const raw = readFileSync(statePath, "utf8");
return JSON.parse(raw);
} catch {
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[sense-runtime] warning: failed to read state from "${statePath}": ${msg} — using initialState\n`);
return initialState;
}
}
export function writeState(statePath: string, state: unknown): void {
mkdirSync(dirname(statePath), { recursive: true });
writeFileSync(statePath, JSON.stringify(state, null, 2));
const dir = dirname(statePath);
mkdirSync(dir, { recursive: true });
const tmp = join(dir, `.${Date.now()}.tmp`);
writeFileSync(tmp, JSON.stringify(state, null, 2));
renameSync(tmp, statePath);
}
/**
@@ -86,8 +92,8 @@ export async function executeCompute(
? await Promise.race([computePromise, timeoutPromise])
: await computePromise;
runtime.state = result.state;
writeState(runtime.statePath, result.state);
runtime.state = result.state;
return ok(result);
} catch (e) {