Compare commits

...

50 Commits

Author SHA1 Message Date
xiaoju f08ad802b0 fix: remove accidentally committed tgz and add to .gitignore
小橘 <xiaoju@shazhou.work>
2026-04-24 06:03:00 +00:00
xiaoju dcfb00128d feat(cli): add nerve workflow thread <runId> command — closes #77
Implements the workflow thread CLI command that retrieves
workflow execution context (logs, events, state) for a given run.

- Add 'nerve workflow thread <runId>' subcommand
- Add log-store query API in daemon
- Add tests for CLI and log-store
- Export new daemon types for thread data

小橘 <xiaoju@shazhou.work>
2026-04-24 05:59:53 +00:00
xiaomo 9cdac05f2c Merge pull request 'docs: add coding agent rules (.cursor/rules + CLAUDE.md)' (#76) from chore/cursor-rules-from-conventions into main 2026-04-23 12:12:08 +00:00
xiaomo 24a8ec927d docs: add core concepts (sense, signal, reflex, workflow) to agent rules 2026-04-23 12:05:31 +00:00
xiaomo 554a79775c docs: add .github/copilot-instructions.md for GitHub Copilot 2026-04-23 12:03:15 +00:00
xiaomo ceb5998fa3 docs: add cursor rules and CLAUDE.md from coding conventions
- .cursor/rules/global.mdc: coding conventions as Cursor agent rules
- CLAUDE.md: same conventions for Claude Code / Hermes agents
- Content derived from docs/coding-conventions.md
- Includes no-dynamic-import rule in CLAUDE.md for completeness
2026-04-23 12:01:03 +00:00
xiaomo 49b5099065 Merge pull request 'fix(daemon): accept string triggerPayload in workflow thread' (#75) from fix/trigger-payload-string-support into main 2026-04-23 11:50:48 +00:00
xiaoju 01d2185495 fix(daemon): accept string triggerPayload in workflow thread
The original code only accepted object-type triggerPayload, silently
discarding string values by replacing them with {}. This meant
`nerve workflow trigger <name> --payload '"some string"'` would
lose the payload entirely.

Changed to `triggerPayload ?? {}` so strings (and other non-null
values) pass through correctly.

小橘 🍊(NEKO Team)
2026-04-23 11:48:05 +00:00
xiaoju 5cedc6a33d release: v0.2.0 — core, daemon, cli 2026-04-23 10:58:49 +00:00
xiaomo c291d3a69a Merge pull request 'feat(cli): add nerve init --from to clone workspace from git' (#74) from feat/init-from-git into main 2026-04-23 10:56:17 +00:00
xiaomo 7960f5af8b Merge pull request 'docs: add comprehensive README for root and all packages' (#73) from docs/readme-update into main 2026-04-23 10:54:43 +00:00
xiaomo 5be14d0d8b docs: add comprehensive README for root and all packages 2026-04-23 10:53:45 +00:00
xiaoju 0e0eb4eec6 feat(cli): add nerve init --from to clone workspace from git
Made-with: Cursor
2026-04-23 10:53:06 +00:00
xiaomo cf2b0ac223 Merge pull request 'build: migrate from tsup to rslib' (#71) from build/tsup-to-rslib into main 2026-04-23 09:50:55 +00:00
xiaoju 1b5a52ea4d build: migrate from tsup to rslib
Replace tsup (esbuild-based) with rslib (rspack-based) across all packages.

tsup's built-in nodeProtocolPlugin strips the 'node:' prefix from all
Node.js builtins. Unlike node:fs etc., node:sqlite has no unprefixed
form, causing ERR_MODULE_NOT_FOUND at runtime. rslib handles node:
imports correctly without any workarounds.

Changes:
- Replace tsup.config.ts with rslib.config.ts in core, daemon, cli
- Swap tsup → @rslib/core in devDependencies
- Fix log-store.ts params type (Record<string, unknown> → Record<string, string | number>)
- Fix logStream.fd type cast in start.ts
- Exclude __tests__ from CLI tsconfig to avoid DTS errors
- All 356 tests pass, nerve init works correctly

Closes #70

小橘 🍊(NEKO Team)
2026-04-23 09:48:45 +00:00
xiaoju a084205b47 Revert "fix: restore node:sqlite prefix stripped by tsup bundler"
This reverts commit 57550ccfdb.
2026-04-23 09:41:28 +00:00
xiaoju 57550ccfdb fix: restore node:sqlite prefix stripped by tsup bundler
tsup's built-in node-protocol-plugin strips the 'node:' prefix from
all builtins. Unlike node:fs etc., node:sqlite has no unprefixed form,
causing ERR_MODULE_NOT_FOUND at runtime.

- Add onSuccess hook to both cli and daemon tsup configs to restore
  'node:sqlite' imports in bundled output
- Fix log-store params type to Record<string, string | number>

小橘 🍊(NEKO Team)
2026-04-23 09:32:20 +00:00
xiaomo 37588df402 Merge pull request 'refactor(daemon): upgrade Drizzle v1.0-beta + migrate better-sqlite3 → node:sqlite' (#69) from refactor/drizzle-v1-node-sqlite into main 2026-04-23 09:20:15 +00:00
xiaoju 85dd11c84d refactor(daemon): upgrade Drizzle v1.0-beta + migrate better-sqlite3 → node:sqlite
- Upgrade drizzle-orm from 0.43.1 to 1.0.0-beta.23
- Replace better-sqlite3 with node:sqlite (DatabaseSync) in:
  - sense-runtime.ts (Drizzle driver)
  - log-store.ts (raw SQL)
  - all test files
- Replace sqlite.pragma() with sqlite.exec('PRAGMA ...')
- Replace sqlite.transaction() with manual BEGIN/COMMIT/ROLLBACK
- Update CLI init command to verify node:sqlite instead of better-sqlite3
- Remove better-sqlite3 and @types/better-sqlite3 from dependencies
- Zero native addons remaining in the monorepo 🎉

Closes #67

小橘 <xiaoju@shazhou.work>
2026-04-23 09:18:44 +00:00
xiaomo d80a414530 Merge pull request 'chore: walkthrough cleanup — engines, types, mock fixes' (#68) from fix/walkthrough-cleanup into main 2026-04-23 09:10:09 +00:00
xiaoju 7f780f0642 chore: walkthrough cleanup — engines, types, mock fixes
- Add engines >= 22.5.0 to root and cli package.json (node:sqlite requirement)
- Remove unused @types/better-sqlite3 from cli devDeps (leftover from sql.js migration)
- Add files/publishConfig to core package.json (parity with other packages)
- Fix daemon test type errors: add getAllWorkflowRuns to mock LogStore,
  fix array destructuring on mock.calls, fix sense-runtime callback signatures

All 356 tests pass across all packages.

小橘 🍊(NEKO Team)
2026-04-23 09:08:24 +00:00
xiaomo 33e0d9a705 Merge pull request 'refactor(cli): replace sql.js with node:sqlite' (#66) from refactor/node-sqlite into main 2026-04-23 08:51:01 +00:00
xiaoju 418d8ee0c8 refactor(cli): replace sql.js with node:sqlite
Drop the sql.js WASM dependency in favour of Node 22's built-in
node:sqlite (DatabaseSync). This eliminates the ~2 MB WASM binary,
removes the async init ceremony, and lets us open databases in
readonly mode directly on disk instead of loading them into memory.

Breaking: requires Node >= 22.5.0 (sqlite support).

- Remove sql.js from cli dependencies
- Rewrite sense-sqlite.ts to use DatabaseSync
- Update sense command (schema/query) — sync API, no more queryAsObjects
- Update tests to use node:sqlite directly
- Remove sql.js from tsup externals

小橘 🍊(NEKO Team)
2026-04-23 08:43:39 +00:00
xiaomo 719c4c1449 Merge pull request 'refactor(cli): replace better-sqlite3 with sql.js (pure WASM) — implements RFC #63' (#64) from refactor/sql-js-migration into main 2026-04-23 07:32:38 +00:00
xiaoju c8bf4bf547 refactor(cli): replace better-sqlite3 with sql.js (pure WASM)
- Remove native C++ addon dependency, no more pnpm approve-builds
- sql.js loads SQLite as WASM, zero compilation required
- WASM init is singleton (once per process)
- Add queryAsObjects() adapter for sql.js columnar → row format
- Tests migrated to sql.js (16 passing)

Implements RFC #63
2026-04-23 07:25:08 +00:00
xiaoju 9b93c4a4d9 chore(cli): bump version to 0.1.8 2026-04-23 07:10:28 +00:00
xiaomo ca14c5f51d Merge pull request 'feat(cli): add nerve sense schema and query commands (closes #60)' (#62) from feat/sense-query into main 2026-04-23 07:06:02 +00:00
xiaomo 1979e0e16c Merge pull request 'refactor: replace dynamic imports with static imports in CLI' (#61) from refactor/static-imports into main 2026-04-23 07:04:31 +00:00
xingyue 9102c6698a chore: remove gitea-access rule from project (belongs in agent local skills) 2026-04-23 15:03:14 +08:00
xiaoju b15fc993f2 feat(cli): add nerve sense schema and query commands
Open each sense SQLite file read-only under data/senses. schema lists CREATE TABLE SQL from sqlite_master; query runs optional SQL or a default SELECT ordered by rowid. Human output uses aligned columns; --json for machine-readable output. Add better-sqlite3 to the CLI package and externalize it in tsup.

Tests cover sense-sqlite helpers and integration against a temp database.

Made-with: Cursor
2026-04-23 07:01:16 +00:00
xingyue 6cc8833b2a chore: add cursor rules and annotate legitimate dynamic imports
- Add .cursor/rules/no-dynamic-import.mdc: ban dynamic import() in
  production code with documented exceptions
- Add .cursor/rules/gitea-access.mdc: tea CLI usage guide
- Add explanatory comments on the 2 legitimate dynamic imports in
  sense-runtime.ts and workflow-worker.ts
2026-04-23 15:00:07 +08:00
xiaomo fc76b862ad Merge pull request 'refactor(cli): replace dynamic imports with static imports — closes #57' (#59) from refactor/static-imports into main 2026-04-23 06:55:46 +00:00
xingyue 787e791aba refactor(cli): replace dynamic imports with static imports
Convert 6 unnecessary `await import()` calls for Node built-in modules
(node:child_process, node:util) and project modules (../workspace.js)
to static top-level imports in init.ts and start.ts.

Closes #57
2026-04-23 14:52:18 +08:00
xiaomo 96188c8cda Merge pull request 'fix(daemon): foreground worker signals and crash diagnostics (closes #55, closes #56)' (#58) from fix/dev-worker-crash into main 2026-04-23 06:48:33 +00:00
xiaoju f1458f8353 fix(daemon): foreground worker signals and crash diagnostics
Ignore SIGINT/SIGTERM only when fork IPC is active (process.send) so terminal signals do not race the kernel shutdown in nerve dev, without breaking standalone worker CLIs (fixes #55).

Pipe worker stderr through the parent with a rolling capture buffer; log exit signal name and stderr tail on worker exit (fixes #56). Apply the same exit logging to workflow workers.

Made-with: Cursor
2026-04-23 06:41:32 +00:00
xiaomo 781f571474 Merge pull request 'refactor: add daemon subcommand group and dev foreground mode' (#54) from refactor/daemon-subcommand into main 2026-04-23 04:24:31 +00:00
xiaoju 640f170de8 refactor: add daemon subcommand group and dev foreground mode
- Create 'nerve daemon' subcommand group: start, stop, status, restart, logs
- Create 'nerve dev' for foreground mode (replaces old start without -d)
- 'nerve daemon start' is always background (removed -d/--daemon flag)
- Keep top-level aliases: nerve start/stop/status/logs → nerve daemon *
- Extract runStopCommand() for restart reuse
- Add daemon-cli tests

Closes #53

小橘 🍊(NEKO Team)
2026-04-23 01:16:13 +00:00
xiaoju 119b1f3722 chore: enforce pnpm publish for all packages unconditionally
小橘 <xiaoju@shazhou.work>
2026-04-23 00:49:39 +00:00
xiaoju 96ea4b46ff chore: add prepublish guard against npm publish with workspace:* deps
小橘 <xiaoju@shazhou.work>
2026-04-23 00:47:56 +00:00
xiaoju 57881533a8 docs: fix publish skill — use pnpm publish for workspace:* conversion
小橘 <xiaoju@shazhou.work>
2026-04-23 00:43:51 +00:00
xiaoju a62a993a82 fix(cli): remove duplicate shebang in daemon-bootstrap causing crash on nerve start -d
小橘 <xiaoju@shazhou.work>
2026-04-23 00:43:18 +00:00
xiaoju 3f22eb4664 release: @uncaged/nerve-core@0.1.3, @uncaged/nerve-daemon@0.1.4, @uncaged/nerve-cli@0.1.5
小橘 <xiaoju@shazhou.work>
2026-04-23 00:35:40 +00:00
xiaoju b5913263e4 docs: add publish and setup skills
小橘 <xiaoju@shazhou.work>
2026-04-23 00:31:27 +00:00
xiaomo d3ecd2a492 Merge pull request 'fix: address review issues #46-#49' (#52) from fix/review-issues-46-49 into main 2026-04-23 00:24:19 +00:00
xiaoju 8763440436 fix: address review issues #46-#49
#46 — EPIPE handler: only silence EPIPE, log other child errors
#47 — lastSignalTs: query sense/signal instead of reflex/run_complete
#48 — SenseInfo: deduplicate to @uncaged/nerve-core, add expectTypeOf test
#49 — IPC client: extract sendAndReceive<T> to eliminate duplication

小橘 <xiaoju@shazhou.work>
2026-04-23 00:22:55 +00:00
xiaomo f270804002 Merge pull request 'feat(daemon): CAS blob store — sha256 content-addressable storage (closes #39)' (#51) from feat/blob-store into main 2026-04-23 00:21:46 +00:00
xiaoju 404ee3e34f feat(daemon): add CAS blob store with sha256 content-addressable storage — closes #39
- createBlobStore(root) with write/read/exists API
- sha256 hex, first 2 chars as shard directory
- Atomic writes via temp file + rename
- CAS mismatch detection on read and write
- Inject blobStore into sense compute via options.blobs
- Export createBlobStore, normalizeBlobHash, BlobStore type
2026-04-23 00:19:35 +00:00
xiaomo cbc6db6b7d Merge pull request 'feat(daemon): log store archival — Meta table + JSONL cold archive (closes #38)' (#45) from feat/log-archive into main 2026-04-23 00:17:54 +00:00
xiaomo b1f6c775ce Merge pull request 'fix(init): auto-verify and retry better-sqlite3 native build — closes #44' (#50) from fix/init-sqlite-retry into main 2026-04-23 00:14:30 +00:00
xiaoju 978b1680a3 feat(daemon): add log store archival with meta watermark + JSONL cold archive — closes #38
- Add meta table with archived_up_to watermark in logs.db
- Archive logs older than 30 days to data/archive/logs/YYYY-MM-DD.jsonl
- Idempotent: same-day re-export overwrites file
- Single transaction: DELETE + UPDATE meta
- Optional VACUUM after archive loop
- CLI: nerve store archive [--vacuum]
- 15+ new tests for archive logic
2026-04-23 00:10:20 +00:00
67 changed files with 4509 additions and 860 deletions
+189
View File
@@ -0,0 +1,189 @@
---
description: Nerve project coding conventions — style, patterns, and toolchain
globs: packages/*/src/**/*.ts
alwaysApply: true
---
# Nerve Coding Conventions
## Core Concepts
```
External World → Sense → Signal → Reflex → Workflow → Log
↑ ↑
"what to observe" "what to do"
```
**Nerve** is a lightweight observation engine daemon for autonomous agents. It continuously observes external state, reacts to changes via declarative rules, and orchestrates multi-step workflows.
### Key Terms
| Concept | What it is |
|---------|-----------|
| **Sense** | A `compute()` function that samples or derives data. Returns `T \| null` — non-null emits a Signal, null is silent. Each Sense has its own SQLite database. |
| **Signal** | A notification emitted when a Sense returns non-null. Pure fact, no intent. Distributed via an in-memory Signal Bus. Not persisted. |
| **Reflex** | A declarative trigger (YAML) connecting Senses to actions. Trigger types: `interval` (periodic), `on` (react to Signals). Action types: trigger a Sense, or start a Workflow. |
| **Workflow** | A stateful multi-step execution. Contains **Roles** (actors with side effects) and a **Moderator** (pure router). Each instance is a **Thread** with a unique `runId`. |
| **Log** | Immutable audit trail. Records executions, state transitions, errors. **Cannot trigger Reflexes** — prevents feedback loops. |
| **Engine** | The kernel orchestrating everything. Holds Signal Bus, Reflex Scheduler, Process Manager, Workflow Manager. Never loads user code directly — all user code runs in isolated Workers. |
| **Daemon** | The `nerve-daemon` package — engine runtime. Runs as a background process. |
### Architecture Rules
- **Three orthogonal extension points**: Sense (what to compute), Reflex (when to compute), Workflow (what to do)
- **Process isolation**: One worker per Sense group (long-lived), one per Workflow type (on-demand). Workers never talk to each other.
- **Causality is one-directional**: External world → Sense → Signal → Reflex → Action + Log. Logs are the end of the chain.
## Language & Paradigm
### Functional-first
Use `function` + `type`, not `class` + `interface`.
```typescript
// ✅ Good
type Signal = {
senseId: string;
value: unknown;
ts: number;
};
function createSignal(senseId: string, value: unknown): Signal {
return { senseId, value, ts: Date.now() };
}
// ❌ Bad — no class, no interface
class Signal implements ISignal { ... }
```
### Rules
| Rule | Description |
|------|-------------|
| `type` over `interface` | All type definitions use `type` |
| `function` over `class` | Pure functions + closures, no class |
| No `this` | Functions must not depend on `this` context |
| No inheritance | No `extends`, `implements`, `abstract` |
| Composition over inheritance | Use function composition |
| Immutability first | Use `Readonly<T>`, `as const`, avoid mutation |
| No optional properties | Use `T \| null` instead of `?:` — see below |
### Exceptions
Classes are allowed when:
- Required by a third-party library (e.g. Drizzle's `sqliteTable`)
- Error subclasses (`class NerveError extends Error`)
### No Optional Properties
Never use `?:`. All nullable fields must be explicit `T | null`.
```typescript
// ✅ Good
type SenseConfig = {
group: string;
throttle: string | null;
timeout: string | null;
};
// ❌ Bad
type SenseConfig = {
group: string;
throttle?: string;
timeout?: string;
};
```
For mutually exclusive fields, use discriminated unions:
```typescript
// ✅ Good
type ReflexConfig =
| { kind: "sense"; sense: string; interval: string | null; on: string[] | null }
| { kind: "workflow"; workflow: string; on: string[] | null };
```
## Modules & Exports
- Always named exports, never default exports
- One module = one responsibility, filename = purpose
```typescript
// ✅ Named exports only
export function startEngine(config: EngineConfig): Engine { ... }
export type EngineConfig = { ... };
// ❌ No default exports
export default function startEngine() { ... }
```
## Naming
| Type | Style | Example |
|------|-------|---------|
| Files | kebab-case | `signal-bus.ts` |
| Types | PascalCase | `SignalBus` |
| Functions/variables | camelCase | `createSignalBus` |
| Constants | UPPER_SNAKE | `MAX_RETRY_COUNT` |
| Generics | Single letter or descriptive | `T`, `TValue` |
## Error Handling
- Use `Result` type for expected failures
- `throw` only for unrecoverable bugs (programmer errors)
- No try-catch for flow control
```typescript
type Result<T, E = Error> = { ok: true; value: T } | { ok: false; error: E };
function parseSenseConfig(raw: unknown): Result<SenseConfig> { ... }
```
## Async
- Always `async/await`, never `.then()` chains
## Toolchain
| Tool | Purpose |
|------|---------|
| **pnpm** | Package manager |
| **TypeScript** | Type checking (strict mode) |
| **Biome** | Lint + format (replaces ESLint + Prettier) |
| **tsup** | Bundling |
### Commands
```bash
pnpm run check # biome check (lint + format)
pnpm run format # biome format --write
pnpm run build # full build
pnpm test # run tests
```
## Monorepo Structure
```
nerve/
packages/
core/ # @nerve/core — shared types and utils
cli/ # @nerve/cli — CLI entry point
daemon/ # @nerve/daemon — engine runtime
docs/ # RFCs, conventions
biome.json # root Biome config
tsconfig.json # root TypeScript config (composite project references)
```
- `core` is the shared layer; `cli` and `daemon` both depend on it
- `cli` and `daemon` must NOT depend on each other
## Commit Convention
```
<type>(<scope>): <description>
type: feat | fix | refactor | docs | chore | test
scope: core | cli | daemon | rfc-001 | ...
```
+34
View File
@@ -0,0 +1,34 @@
---
description: Ban dynamic import() in production code — use static imports instead
globs: packages/*/src/**/*.ts
alwaysApply: true
---
# No Dynamic Import in Production Code
## Rule
Do NOT use `await import()` or dynamic `import()` expressions in production source code.
Always use static top-level `import` statements.
## Why
- Static imports enable tree-shaking and bundler optimizations
- They make dependencies explicit and discoverable at a glance
- Dynamic imports of Node built-ins or project modules add unnecessary async overhead
## Exceptions (must include a comment explaining why)
1. **`sense-runtime.ts`** — loads user-authored sense modules whose paths are only known at runtime
2. **`workflow-worker.ts`** — loads user-authored workflow modules whose paths are only known at runtime
When suppressing, add a comment directly above:
```ts
// Dynamic import required: user module path resolved at runtime
const mod = await import(senseIndexPath);
```
## Test Files
Test files (`__tests__/**`) are exempt — dynamic import after `vi.mock()` is standard vitest practice.
+180
View File
@@ -0,0 +1,180 @@
# Nerve Coding Conventions
## Core Concepts
```
External World → Sense → Signal → Reflex → Workflow → Log
↑ ↑
"what to observe" "what to do"
```
**Nerve** is a lightweight observation engine daemon for autonomous agents. It continuously observes external state, reacts to changes via declarative rules, and orchestrates multi-step workflows.
### Key Terms
| Concept | What it is |
|---------|-----------|
| **Sense** | A `compute()` function that samples or derives data. Returns `T \| null` — non-null emits a Signal, null is silent. Each Sense has its own SQLite database. |
| **Signal** | A notification emitted when a Sense returns non-null. Pure fact, no intent. Distributed via an in-memory Signal Bus. Not persisted. |
| **Reflex** | A declarative trigger (YAML) connecting Senses to actions. Trigger types: `interval` (periodic), `on` (react to Signals). Action types: trigger a Sense, or start a Workflow. |
| **Workflow** | A stateful multi-step execution. Contains **Roles** (actors with side effects) and a **Moderator** (pure router). Each instance is a **Thread** with a unique `runId`. |
| **Log** | Immutable audit trail. Records executions, state transitions, errors. **Cannot trigger Reflexes** — prevents feedback loops. |
| **Engine** | The kernel orchestrating everything. Holds Signal Bus, Reflex Scheduler, Process Manager, Workflow Manager. Never loads user code directly — all user code runs in isolated Workers. |
| **Daemon** | The `nerve-daemon` package — engine runtime. Runs as a background process. |
### Architecture Rules
- **Three orthogonal extension points**: Sense (what to compute), Reflex (when to compute), Workflow (what to do)
- **Process isolation**: One worker per Sense group (long-lived), one per Workflow type (on-demand). Workers never talk to each other.
- **Causality is one-directional**: External world → Sense → Signal → Reflex → Action + Log. Logs are the end of the chain.
## Language & Paradigm
### Functional-first
Use `function` + `type`, not `class` + `interface`.
```typescript
// ✅ Good
type Signal = {
senseId: string;
value: unknown;
ts: number;
};
function createSignal(senseId: string, value: unknown): Signal {
return { senseId, value, ts: Date.now() };
}
// ❌ Bad — no class, no interface
class Signal implements ISignal { ... }
```
### Rules
| Rule | Description |
|------|-------------|
| `type` over `interface` | All type definitions use `type` |
| `function` over `class` | Pure functions + closures, no class |
| No `this` | Functions must not depend on `this` context |
| No inheritance | No `extends`, `implements`, `abstract` |
| Composition over inheritance | Use function composition |
| Immutability first | Use `Readonly<T>`, `as const`, avoid mutation |
| No optional properties | Use `T \| null` instead of `?:` — see below |
### Exceptions
Classes are allowed when:
- Required by a third-party library (e.g. Drizzle's `sqliteTable`)
- Error subclasses (`class NerveError extends Error`)
### No Optional Properties
Never use `?:`. All nullable fields must be explicit `T | null`.
```typescript
// ✅ Good
type SenseConfig = {
group: string;
throttle: string | null;
timeout: string | null;
};
// ❌ Bad
type SenseConfig = {
group: string;
throttle?: string;
timeout?: string;
};
```
For mutually exclusive fields, use discriminated unions:
```typescript
// ✅ Good
type ReflexConfig =
| { kind: "sense"; sense: string; interval: string | null; on: string[] | null }
| { kind: "workflow"; workflow: string; on: string[] | null };
```
## Modules & Exports
- Always named exports, never default exports
- One module = one responsibility, filename = purpose
## Naming
| Type | Style | Example |
|------|-------|---------|
| Files | kebab-case | `signal-bus.ts` |
| Types | PascalCase | `SignalBus` |
| Functions/variables | camelCase | `createSignalBus` |
| Constants | UPPER_SNAKE | `MAX_RETRY_COUNT` |
| Generics | Single letter or descriptive | `T`, `TValue` |
## Error Handling
- Use `Result` type for expected failures
- `throw` only for unrecoverable bugs (programmer errors)
- No try-catch for flow control
```typescript
type Result<T, E = Error> = { ok: true; value: T } | { ok: false; error: E };
```
## Async
- Always `async/await`, never `.then()` chains
## No Dynamic Import
Do NOT use `await import()` in production code. Always use static top-level `import`.
Exceptions (must include a comment):
1. `sense-runtime.ts` — user module paths known only at runtime
2. `workflow-worker.ts` — user module paths known only at runtime
Test files (`__tests__/**`) are exempt.
## Toolchain
| Tool | Purpose |
|------|---------|
| **pnpm** | Package manager |
| **TypeScript** | Type checking (strict mode) |
| **Biome** | Lint + format (replaces ESLint + Prettier) |
| **tsup** | Bundling |
### Commands
```bash
pnpm run check # biome check (lint + format)
pnpm run format # biome format --write
pnpm run build # full build
pnpm test # run tests
```
## Monorepo Structure
```
nerve/
packages/
core/ # @nerve/core — shared types and utils
cli/ # @nerve/cli — CLI entry point
daemon/ # @nerve/daemon — engine runtime
docs/ # RFCs, conventions
```
- `core` is the shared layer; `cli` and `daemon` both depend on it
- `cli` and `daemon` must NOT depend on each other
## Commit Convention
```
<type>(<scope>): <description>
type: feat | fix | refactor | docs | chore | test
scope: core | cli | daemon | rfc-001 | ...
```
+1
View File
@@ -2,3 +2,4 @@ node_modules
dist dist
.turbo .turbo
*.tsbuildinfo *.tsbuildinfo
*.tgz
+180
View File
@@ -0,0 +1,180 @@
# Nerve Coding Conventions
## Core Concepts
```
External World → Sense → Signal → Reflex → Workflow → Log
↑ ↑
"what to observe" "what to do"
```
**Nerve** is a lightweight observation engine daemon for autonomous agents. It continuously observes external state, reacts to changes via declarative rules, and orchestrates multi-step workflows.
### Key Terms
| Concept | What it is |
|---------|-----------|
| **Sense** | A `compute()` function that samples or derives data. Returns `T \| null` — non-null emits a Signal, null is silent. Each Sense has its own SQLite database. |
| **Signal** | A notification emitted when a Sense returns non-null. Pure fact, no intent. Distributed via an in-memory Signal Bus. Not persisted. |
| **Reflex** | A declarative trigger (YAML) connecting Senses to actions. Trigger types: `interval` (periodic), `on` (react to Signals). Action types: trigger a Sense, or start a Workflow. |
| **Workflow** | A stateful multi-step execution. Contains **Roles** (actors with side effects) and a **Moderator** (pure router). Each instance is a **Thread** with a unique `runId`. |
| **Log** | Immutable audit trail. Records executions, state transitions, errors. **Cannot trigger Reflexes** — prevents feedback loops. |
| **Engine** | The kernel orchestrating everything. Holds Signal Bus, Reflex Scheduler, Process Manager, Workflow Manager. Never loads user code directly — all user code runs in isolated Workers. |
| **Daemon** | The `nerve-daemon` package — engine runtime. Runs as a background process. |
### Architecture Rules
- **Three orthogonal extension points**: Sense (what to compute), Reflex (when to compute), Workflow (what to do)
- **Process isolation**: One worker per Sense group (long-lived), one per Workflow type (on-demand). Workers never talk to each other.
- **Causality is one-directional**: External world → Sense → Signal → Reflex → Action + Log. Logs are the end of the chain.
## Language & Paradigm
### Functional-first
Use `function` + `type`, not `class` + `interface`.
```typescript
// ✅ Good
type Signal = {
senseId: string;
value: unknown;
ts: number;
};
function createSignal(senseId: string, value: unknown): Signal {
return { senseId, value, ts: Date.now() };
}
// ❌ Bad — no class, no interface
class Signal implements ISignal { ... }
```
### Rules
| Rule | Description |
|------|-------------|
| `type` over `interface` | All type definitions use `type` |
| `function` over `class` | Pure functions + closures, no class |
| No `this` | Functions must not depend on `this` context |
| No inheritance | No `extends`, `implements`, `abstract` |
| Composition over inheritance | Use function composition |
| Immutability first | Use `Readonly<T>`, `as const`, avoid mutation |
| No optional properties | Use `T \| null` instead of `?:` — see below |
### Exceptions
Classes are allowed when:
- Required by a third-party library (e.g. Drizzle's `sqliteTable`)
- Error subclasses (`class NerveError extends Error`)
### No Optional Properties
Never use `?:`. All nullable fields must be explicit `T | null`.
```typescript
// ✅ Good
type SenseConfig = {
group: string;
throttle: string | null;
timeout: string | null;
};
// ❌ Bad
type SenseConfig = {
group: string;
throttle?: string;
timeout?: string;
};
```
For mutually exclusive fields, use discriminated unions:
```typescript
// ✅ Good
type ReflexConfig =
| { kind: "sense"; sense: string; interval: string | null; on: string[] | null }
| { kind: "workflow"; workflow: string; on: string[] | null };
```
## Modules & Exports
- Always named exports, never default exports
- One module = one responsibility, filename = purpose
## Naming
| Type | Style | Example |
|------|-------|---------|
| Files | kebab-case | `signal-bus.ts` |
| Types | PascalCase | `SignalBus` |
| Functions/variables | camelCase | `createSignalBus` |
| Constants | UPPER_SNAKE | `MAX_RETRY_COUNT` |
| Generics | Single letter or descriptive | `T`, `TValue` |
## Error Handling
- Use `Result` type for expected failures
- `throw` only for unrecoverable bugs (programmer errors)
- No try-catch for flow control
```typescript
type Result<T, E = Error> = { ok: true; value: T } | { ok: false; error: E };
```
## Async
- Always `async/await`, never `.then()` chains
## No Dynamic Import
Do NOT use `await import()` in production code. Always use static top-level `import`.
Exceptions (must include a comment):
1. `sense-runtime.ts` — user module paths known only at runtime
2. `workflow-worker.ts` — user module paths known only at runtime
Test files (`__tests__/**`) are exempt.
## Toolchain
| Tool | Purpose |
|------|---------|
| **pnpm** | Package manager |
| **TypeScript** | Type checking (strict mode) |
| **Biome** | Lint + format (replaces ESLint + Prettier) |
| **tsup** | Bundling |
### Commands
```bash
pnpm run check # biome check (lint + format)
pnpm run format # biome format --write
pnpm run build # full build
pnpm test # run tests
```
## Monorepo Structure
```
nerve/
packages/
core/ # @nerve/core — shared types and utils
cli/ # @nerve/cli — CLI entry point
daemon/ # @nerve/daemon — engine runtime
docs/ # RFCs, conventions
```
- `core` is the shared layer; `cli` and `daemon` both depend on it
- `cli` and `daemon` must NOT depend on each other
## Commit Convention
```
<type>(<scope>): <description>
type: feat | fix | refactor | docs | chore | test
scope: core | cli | daemon | rfc-001 | ...
```
+163 -1
View File
@@ -1,3 +1,165 @@
# nerve # nerve
Observation engine — Sense, Reflex, Workflow **Observation engine for autonomous agents**sense the world, react to changes, run workflows.
Nerve is a lightweight daemon that continuously observes external state through **Senses**, reacts via declarative **Reflexes**, and orchestrates multi-step **Workflows**. Built for the [Uncaged](https://github.com/uncaged) agent framework.
## Core Concepts
```
External World → Sense → Signal → Reflex → Workflow → Log
↑ ↑
"what to observe" "what to do"
```
| Concept | Metaphor | Role |
|---------|----------|------|
| **Sense** | 👁️ Perception | A `compute()` function that samples or derives data. Each sense has its own SQLite database. |
| **Reflex** | ⚡ Reaction | Declarative trigger — interval-based, event-driven, or both. Connects senses to actions. |
| **Signal** | 📡 Notification | Emitted when a sense returns non-null. Other reflexes can listen for signals. |
| **Workflow** | 🔧 Action | Stateful multi-step execution with Roles (actors) and a Moderator (coordinator). |
| **Log** | 📝 Record | Immutable audit trail. Queryable by senses, but **cannot** trigger reflexes (prevents feedback loops). |
Three extension points, fully orthogonal — a Sense doesn't know when it runs, a Reflex doesn't know what it computes, a Workflow doesn't know why it was triggered.
## Packages
| Package | Description |
|---------|-------------|
| [`@uncaged/nerve-core`](./packages/core) | Shared types and config parser |
| [`@uncaged/nerve-daemon`](./packages/daemon) | The observation engine — kernel, sense runtime, reflex scheduler, workflow manager |
| [`@uncaged/nerve-cli`](./packages/cli) | CLI tool (`nerve`) — init, start, stop, logs, query |
## Quick Start
```bash
# Requirements: Node.js ≥ 22.5, pnpm
pnpm add -g @uncaged/nerve-cli
# Initialize a workspace
mkdir my-agent && cd my-agent
nerve init
# Write a sense
cat > senses/cpu-usage/compute.ts << 'EOF'
export async function compute() {
const [load] = (await import("node:os")).loadavg();
return load > 2.0 ? { load } : null; // signal only when load is high
}
EOF
# Configure reflexes in nerve.yaml
cat > nerve.yaml << 'EOF'
senses:
cpu-usage:
group: system
throttle: 10s
reflexes:
- kind: sense
sense: cpu-usage
interval: 30s
EOF
# Run
nerve dev # foreground (development)
nerve daemon start # background (production)
nerve status # check health
nerve logs # view logs
```
## Configuration
`nerve.yaml` declares senses, reflexes, and workflows:
```yaml
senses:
cpu-usage:
group: system # senses in the same group share a worker process
throttle: 10s # min interval between computes
timeout: 30s # max compute duration
gracePeriod: 5s # wait before first compute after startup
reflexes:
- kind: sense
sense: cpu-usage
interval: 30s # periodic trigger
on: [disk-pressure] # also trigger on signals from other senses
- kind: workflow
workflow: cleanup
on: [disk-pressure] # start a workflow when signal fires
workflows:
cleanup:
concurrency: 1
overflow: drop # discard if already running
code-review:
concurrency: 3
overflow: queue
maxQueue: 20
```
## Architecture
```
┌─────────────────────────────────────────────────────────┐
│ Kernel │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Worker │ │ Worker │ │ Worker │ (1 per │
│ │ (group A)│ │ (group B)│ │ (group C)│ group) │
│ │ sense-1 │ │ sense-3 │ │ sense-5 │ │
│ │ sense-2 │ │ sense-4 │ │ │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ └──────────────┼──────────────┘ │
│ ▼ │
│ ┌──────────────┐ │
│ │ Signal Bus │ │
│ └──────┬───────┘ │
│ ▼ │
│ ┌──────────────────┐ │
│ │ Reflex Scheduler │ │
│ └────────┬─────────┘ │
│ ▼ │
│ ┌───────────────────┐ │
│ │ Workflow Manager │──→ Log Store (SQLite) │
│ └───────────────────┘ │
└─────────────────────────────────────────────────────────┘
```
- **Worker processes** — one per sense group, forked by the kernel. Isolated compute execution.
- **Signal Bus** — in-memory pub/sub for signal distribution.
- **Reflex Scheduler** — interval timers + signal subscriptions, with throttle/coalesce.
- **Workflow Manager** — concurrency control (drop/queue), thread lifecycle tracking.
- **Log Store** — WAL-mode SQLite via `node:sqlite`, with archival and retention policies.
## Tech Stack
- **Zero native addons** — uses Node.js built-in `node:sqlite` (DatabaseSync)
- **Drizzle ORM** v1.0 for sense databases
- **rslib** (rspack) for building
- **Biome** for formatting/linting
- **Vitest** for testing
- **pnpm** workspaces for monorepo management
## Development
```bash
git clone https://git.shazhou.work/uncaged/nerve.git
cd nerve
pnpm install
pnpm build
pnpm -r test # run all tests
```
## Design Documents
- [RFC-001: Observation Engine](./docs/rfc-001-observation-engine.md) — Sense, Signal, Reflex model
- [RFC-002: Workflow Engine](./docs/rfc-002-workflow-engine.md) — Stateful workflow execution
- [Coding Conventions](./docs/coding-conventions.md)
## License
MIT
+80
View File
@@ -0,0 +1,80 @@
# Skill: Publish @uncaged/nerve packages to npm
## When to use
When releasing a new version of any `@uncaged/nerve-*` package to npm.
## Prerequisites
- npm login with an account that has **owner** access to the `@uncaged` org
- All tests pass: `pnpm -r run test`
- Clean working tree (no uncommitted changes)
## Packages
| Package | Path | npm |
|---------|------|-----|
| `@uncaged/nerve-core` | `packages/core` | [link](https://www.npmjs.com/package/@uncaged/nerve-core) |
| `@uncaged/nerve-daemon` | `packages/daemon` | [link](https://www.npmjs.com/package/@uncaged/nerve-daemon) |
| `@uncaged/nerve-cli` | `packages/cli` | [link](https://www.npmjs.com/package/@uncaged/nerve-cli) |
## Dependency order
`core``daemon``cli`
Always publish in this order. If `core` has changes, bump and publish it first, then update dependents.
## Steps
### 1. Ensure clean state
```bash
git checkout main && git pull origin main
pnpm install
pnpm -r run build
pnpm -r run test
```
### 2. Bump versions
Manually update `version` in each changed package's `package.json`.
Follow semver:
- **patch** (0.1.x): bug fixes, refactors
- **minor** (0.x.0): new features, non-breaking API additions
- **major** (x.0.0): breaking changes
If bumping `core`, also update the `@uncaged/nerve-core` dependency version in `daemon` and `cli` package.json. Same for `daemon``cli`.
### 3. Build
```bash
pnpm -r run build
```
### 4. Publish (in order)
```bash
# Only publish packages that have version bumps
# MUST use pnpm publish (not npm) — pnpm converts workspace:* to real versions
cd packages/core && pnpm publish --access public --no-git-checks
cd packages/daemon && pnpm publish --access public --no-git-checks
cd packages/cli && pnpm publish --access public --no-git-checks
```
### 5. Commit & tag
```bash
git add -A
git commit -m "release: @uncaged/nerve-core@X.Y.Z, @uncaged/nerve-daemon@X.Y.Z, @uncaged/nerve-cli@X.Y.Z"
git tag -a vX.Y.Z -m "Release vX.Y.Z"
git push origin main --tags
```
## Pitfalls
- **Don't publish without building first** — `tsup` output in `dist/` is what npm ships
- **Dependency order matters** — if you publish `daemon` before `core`, npm may resolve the old `core` version
- **`--access public`** is required for scoped packages on first publish; safe to always include
- **Check `npm whoami`** to confirm you're logged in as the right account
- **No changeset tool** — this project uses manual version bumps (no changesets/lerna)
+101
View File
@@ -0,0 +1,101 @@
# Skill: Setup nerve from scratch
## When to use
Setting up the nerve project for local development from a fresh clone.
## Prerequisites
- **Node.js** ≥ 18
- **pnpm** ≥ 9 (`npm install -g pnpm`)
- **Git** access to `git.shazhou.work`
## Steps
### 1. Clone
```bash
git clone https://git.shazhou.work/uncaged/nerve.git
cd nerve
```
### 2. Install dependencies
```bash
pnpm install
```
This installs all workspace packages and links internal dependencies (`core``daemon``cli`).
### 3. Build all packages
```bash
pnpm -r run build
```
Build order is handled automatically by pnpm workspace — `core` builds first, then `daemon`, then `cli`.
### 4. Run tests
```bash
pnpm -r run test
```
Or test individual packages:
```bash
pnpm --filter @uncaged/nerve-core test
pnpm --filter @uncaged/nerve-daemon test
pnpm --filter @uncaged/nerve-cli test
```
### 5. Try the CLI
```bash
# Link the CLI globally
cd packages/cli && npm link
# Initialize a workspace
mkdir ~/my-nerve-workspace && cd ~/my-nerve-workspace
nerve init
# Edit senses in nerve.yaml, then:
nerve start # start the daemon
nerve sense list # list registered senses
nerve stop # stop the daemon
```
### 6. Lint & format
```bash
pnpm run check # biome lint check
pnpm run format # biome auto-format
```
## Project structure
```
nerve/
├── packages/
│ ├── core/ # @uncaged/nerve-core — shared types, log store, blob store
│ ├── daemon/ # @uncaged/nerve-daemon — kernel, sense runtime, workflow manager
│ └── cli/ # @uncaged/nerve-cli — CLI commands (init, start, stop, sense, etc.)
├── docs/ # RFCs, conventions, skills
├── pnpm-workspace.yaml
└── biome.json # linter/formatter config
```
## Key conventions
- **Monorepo** with pnpm workspaces
- **ESM only** — all packages output ESM (`"type": "module"`)
- **tsup** for builds, **vitest** for tests, **biome** for lint/format
- **SQLite** (better-sqlite3) for log store and blob store
- See `docs/coding-conventions.md` for code style rules
## Pitfalls
- **Must build before test** — daemon and cli import compiled output from core
- **better-sqlite3** requires native compilation — if `pnpm install` fails, ensure you have build tools (`build-essential` on Linux, Xcode CLI tools on macOS)
- **Node 18+** required — uses native `fetch`, `crypto.randomUUID`, etc.
- **pnpm only** — don't use npm/yarn, workspace links won't resolve correctly
+4 -1
View File
@@ -1,6 +1,9 @@
{ {
"name": "nerve", "name": "nerve",
"private": true, "private": true,
"engines": {
"node": ">=22.5.0"
},
"scripts": { "scripts": {
"build": "pnpm -r run build", "build": "pnpm -r run build",
"check": "biome check .", "check": "biome check .",
@@ -8,7 +11,7 @@
}, },
"devDependencies": { "devDependencies": {
"@biomejs/biome": "^1.9.0", "@biomejs/biome": "^1.9.0",
"tsup": "^8.0.0", "@rslib/core": "^0.21.3",
"typescript": "^5.5.0" "typescript": "^5.5.0"
} }
} }
+69
View File
@@ -0,0 +1,69 @@
# @uncaged/nerve-cli
Command-line interface for the [nerve](../../README.md) observation engine.
## Install
```bash
pnpm add -g @uncaged/nerve-cli
# or
npx @uncaged/nerve-cli
```
Requires Node.js ≥ 22.5.
## Commands
### Workspace
```bash
nerve init # Initialize a nerve workspace (installs deps, scaffolds config)
nerve validate # Validate nerve.yaml configuration
```
### Daemon Management
```bash
nerve daemon start # Start the daemon (background)
nerve daemon stop # Stop the daemon
nerve daemon status # Check daemon health
nerve daemon restart # Restart the daemon
nerve daemon logs # Tail daemon logs
```
### Development
```bash
nerve dev # Run in foreground mode (no daemon, Ctrl+C to stop)
```
### Querying
```bash
nerve logs # View structured logs
nerve sense query <name> # Query a sense's SQLite database
nerve sense schema <name> # Show a sense's database schema
nerve status # Daemon health summary
```
### Workflows
```bash
nerve workflow list # List workflow runs
nerve workflow show <runId> # Show workflow run details
```
### Top-level Aliases
For convenience, these aliases are available:
```bash
nerve start → nerve daemon start
nerve stop → nerve daemon stop
nerve status → nerve daemon status
nerve logs → nerve daemon logs
```
## License
MIT
+10 -5
View File
@@ -1,6 +1,9 @@
{ {
"name": "@uncaged/nerve-cli", "name": "@uncaged/nerve-cli",
"version": "0.1.4", "engines": {
"node": ">=22.5.0"
},
"version": "0.2.0",
"type": "module", "type": "module",
"bin": { "bin": {
"nerve": "dist/cli.js" "nerve": "dist/cli.js"
@@ -14,17 +17,19 @@
"access": "public" "access": "public"
}, },
"scripts": { "scripts": {
"build": "tsup", "prepublishOnly": "bash ../../scripts/prepublish-check.sh",
"build": "rslib build",
"test": "vitest run" "test": "vitest run"
}, },
"dependencies": { "dependencies": {
"@uncaged/nerve-core": "workspace:*", "@uncaged/nerve-core": "workspace:*",
"citty": "^0.1.6" "citty": "^0.1.6",
"yaml": "^2.8.3"
}, },
"devDependencies": { "devDependencies": {
"@uncaged/nerve-daemon": "workspace:*", "@rslib/core": "^0.21.3",
"@types/better-sqlite3": "^7.6.13",
"@types/node": "^22.0.0", "@types/node": "^22.0.0",
"@uncaged/nerve-daemon": "workspace:*",
"vitest": "^4.1.5" "vitest": "^4.1.5"
} }
} }
+25
View File
@@ -0,0 +1,25 @@
import { defineConfig } from "@rslib/core";
export default defineConfig({
lib: [
{
format: "esm",
dts: true,
banner: {
js: "#!/usr/bin/env node",
},
},
],
source: {
entry: {
index: "src/index.ts",
cli: "src/cli.ts",
"daemon-bootstrap": "src/daemon-bootstrap.ts",
},
},
output: {
target: "node",
cleanDistPath: true,
externals: ["@uncaged/nerve-daemon"],
},
});
@@ -0,0 +1,28 @@
import { describe, expect, it } from "vitest";
import { daemonCommand } from "../commands/daemon.js";
import { devCommand } from "../commands/dev.js";
import { daemonStartCommand } from "../commands/start.js";
describe("nerve daemon command group", () => {
it("exposes start, stop, status, restart, and logs subcommands", () => {
const subs = daemonCommand.subCommands;
expect(subs).toBeDefined();
if (!subs) {
throw new Error("expected daemonCommand.subCommands");
}
expect(Object.keys(subs).sort()).toEqual(["logs", "restart", "start", "status", "stop"]);
});
it("shares the same start command object as top-level nerve start alias", () => {
const subs = daemonCommand.subCommands;
expect(subs?.start).toBe(daemonStartCommand);
});
});
describe("nerve dev", () => {
it("is a foreground dev command", () => {
expect(devCommand.meta?.name).toBe("dev");
expect(devCommand.meta?.description).toMatch(/foreground/i);
});
});
@@ -3,16 +3,24 @@
* If the daemon package changes its public API, this file will fail to compile. * If the daemon package changes its public API, this file will fail to compile.
*/ */
import type { SenseInfo } from "@uncaged/nerve-core";
import type { import type {
ArchiveLogsDayResult as DaemonArchiveLogsDayResult,
ArchiveLogsOptions as DaemonArchiveLogsOptions,
ArchiveLogsResult as DaemonArchiveLogsResult,
LogEntry as DaemonLogEntry, LogEntry as DaemonLogEntry,
LogQuery as DaemonLogQuery, LogQuery as DaemonLogQuery,
LogStore as DaemonLogStore, LogStore as DaemonLogStore,
SenseInfo as DaemonSenseInfo,
WorkflowRun as DaemonWorkflowRun, WorkflowRun as DaemonWorkflowRun,
WorkflowRunStatus as DaemonWorkflowRunStatus, WorkflowRunStatus as DaemonWorkflowRunStatus,
} from "@uncaged/nerve-daemon"; } from "@uncaged/nerve-daemon";
import { describe, it, expectTypeOf } from "vitest"; import { describe, expectTypeOf, it } from "vitest";
import type { import type {
ArchiveLogsDayResult,
ArchiveLogsOptions,
ArchiveLogsResult,
LogEntry, LogEntry,
LogQuery, LogQuery,
LogStore, LogStore,
@@ -21,6 +29,11 @@ import type {
} from "../daemon-types.js"; } from "../daemon-types.js";
describe("daemon-types drift guard", () => { describe("daemon-types drift guard", () => {
it("SenseInfo matches daemon package export (list-senses IPC)", () => {
expectTypeOf<SenseInfo>().toMatchTypeOf<DaemonSenseInfo>();
expectTypeOf<DaemonSenseInfo>().toMatchTypeOf<SenseInfo>();
});
it("WorkflowRunStatus is assignable both ways", () => { it("WorkflowRunStatus is assignable both ways", () => {
expectTypeOf<WorkflowRunStatus>().toMatchTypeOf<DaemonWorkflowRunStatus>(); expectTypeOf<WorkflowRunStatus>().toMatchTypeOf<DaemonWorkflowRunStatus>();
expectTypeOf<DaemonWorkflowRunStatus>().toMatchTypeOf<WorkflowRunStatus>(); expectTypeOf<DaemonWorkflowRunStatus>().toMatchTypeOf<WorkflowRunStatus>();
@@ -42,6 +55,26 @@ describe("daemon-types drift guard", () => {
}); });
it("LogStore has all required methods", () => { it("LogStore has all required methods", () => {
expectTypeOf<LogStore>().toMatchTypeOf<Pick<DaemonLogStore, "query" | "getWorkflowRun" | "getActiveWorkflowRuns" | "getAllWorkflowRuns" | "upsertWorkflowRun" | "close">>(); expectTypeOf<LogStore>().toMatchTypeOf<
Pick<
DaemonLogStore,
| "query"
| "getWorkflowRun"
| "getActiveWorkflowRuns"
| "getAllWorkflowRuns"
| "upsertWorkflowRun"
| "archiveLogs"
| "close"
>
>();
});
it("ArchiveLogs types match daemon", () => {
expectTypeOf<ArchiveLogsOptions>().toMatchTypeOf<DaemonArchiveLogsOptions>();
expectTypeOf<DaemonArchiveLogsOptions>().toMatchTypeOf<ArchiveLogsOptions>();
expectTypeOf<ArchiveLogsResult>().toMatchTypeOf<DaemonArchiveLogsResult>();
expectTypeOf<DaemonArchiveLogsResult>().toMatchTypeOf<ArchiveLogsResult>();
expectTypeOf<ArchiveLogsDayResult>().toMatchTypeOf<DaemonArchiveLogsDayResult>();
expectTypeOf<DaemonArchiveLogsDayResult>().toMatchTypeOf<ArchiveLogsDayResult>();
}); });
}); });
@@ -13,18 +13,24 @@ import { createServer } from "node:net";
import { tmpdir } from "node:os"; import { tmpdir } from "node:os";
import { join } from "node:path"; import { join } from "node:path";
import type { SenseInfo } from "@uncaged/nerve-core";
import { afterEach, beforeEach, describe, expect, it } from "vitest"; import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { listSensesViaDaemon } from "../daemon-client.js";
import type { SenseInfo } from "../daemon-client.js";
import { formatDuration, formatSenseList, sensesFromConfig } from "../commands/sense.js"; import { formatDuration, formatSenseList, sensesFromConfig } from "../commands/sense.js";
import { listSensesViaDaemon } from "../daemon-client.js";
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Fixtures // Fixtures
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
const SAMPLE_SENSES: SenseInfo[] = [ const SAMPLE_SENSES: SenseInfo[] = [
{ name: "cpu-usage", group: "system", throttle: 5000, timeout: 3000, lastSignalTs: 1_700_000_000_000 }, {
name: "cpu-usage",
group: "system",
throttle: 5000,
timeout: 3000,
lastSignalTs: 1_700_000_000_000,
},
{ name: "disk-usage", group: "system", throttle: 30000, timeout: null, lastSignalTs: null }, { name: "disk-usage", group: "system", throttle: 30000, timeout: null, lastSignalTs: null },
{ name: "active-tasks", group: "tasks", throttle: 10000, timeout: 30000, lastSignalTs: null }, { name: "active-tasks", group: "tasks", throttle: 10000, timeout: 30000, lastSignalTs: null },
]; ];
@@ -0,0 +1,159 @@
/**
* Tests for sense SQLite helpers used by `nerve sense schema` / `nerve sense query`.
*/
import { mkdirSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { DatabaseSync } from "node:sqlite";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import {
assertSenseDbExists,
collectColumnKeys,
defaultPreviewSql,
formatRowsAsAlignedTable,
listTableSqlStatements,
parseSenseQueryArgs,
pickDefaultPreviewTable,
senseDbPath,
} from "../sense-sqlite.js";
let tmpDir: string;
beforeEach(() => {
tmpDir = join(
tmpdir(),
`nerve-sense-sqlite-${Date.now()}-${Math.random().toString(16).slice(2)}`,
);
mkdirSync(join(tmpDir, "data", "senses"), { recursive: true });
});
afterEach(() => {
rmSync(tmpDir, { recursive: true, force: true });
});
describe("senseDbPath", () => {
it("points at data/senses/<name>.db under the given root", () => {
expect(senseDbPath("/root", "cpu-usage")).toBe(join("/root", "data", "senses", "cpu-usage.db"));
});
});
describe("assertSenseDbExists", () => {
it("throws when the file is missing", () => {
expect(() => assertSenseDbExists(tmpDir, "nope")).toThrow(/No database at/);
});
it("returns the path when the file exists", () => {
const p = join(tmpDir, "data", "senses", "x.db");
new DatabaseSync(p).close();
expect(assertSenseDbExists(tmpDir, "x")).toBe(p);
});
});
describe("listTableSqlStatements", () => {
it("returns CREATE statements ordered by tbl_name", () => {
const p = join(tmpDir, "data", "senses", "t.db");
const db = new DatabaseSync(p);
db.exec("CREATE TABLE zebra (id INTEGER)");
db.exec("CREATE TABLE alpha (id INTEGER)");
const stmts = listTableSqlStatements(db);
db.close();
expect(stmts).toHaveLength(2);
expect(stmts[0]).toMatch(/^CREATE TABLE alpha/i);
expect(stmts[1]).toMatch(/^CREATE TABLE zebra/i);
});
});
describe("pickDefaultPreviewTable", () => {
it("prefers non-_migrations tables when both exist", () => {
const p = join(tmpDir, "data", "senses", "t.db");
const db = new DatabaseSync(p);
db.exec("CREATE TABLE _migrations (name TEXT PRIMARY KEY)");
db.exec("CREATE TABLE readings (id INTEGER)");
expect(pickDefaultPreviewTable(db)).toBe("readings");
db.close();
});
it("uses _migrations when it is the only table", () => {
const p = join(tmpDir, "data", "senses", "t.db");
const db = new DatabaseSync(p);
db.exec("CREATE TABLE _migrations (name TEXT PRIMARY KEY)");
expect(pickDefaultPreviewTable(db)).toBe("_migrations");
db.close();
});
});
describe("defaultPreviewSql", () => {
it("quotes identifiers for SQL safety", () => {
expect(defaultPreviewSql(`weird"name`)).toContain(`weird""name`);
});
});
describe("parseSenseQueryArgs", () => {
it("parses sense name only", () => {
expect(parseSenseQueryArgs(["cpu"])).toEqual({ name: "cpu", sql: undefined });
});
it("strips --json", () => {
expect(parseSenseQueryArgs(["cpu", "--json"])).toEqual({ name: "cpu", sql: undefined });
expect(parseSenseQueryArgs(["--json", "cpu"])).toEqual({ name: "cpu", sql: undefined });
});
it("joins remaining tokens into SQL", () => {
expect(parseSenseQueryArgs(["cpu", "SELECT", "1"])).toEqual({ name: "cpu", sql: "SELECT 1" });
});
it("throws when name is missing", () => {
expect(() => parseSenseQueryArgs(["--json"])).toThrow(/Missing sense name/);
});
});
describe("formatRowsAsAlignedTable", () => {
it("shows empty marker for no rows", () => {
expect(formatRowsAsAlignedTable([])).toContain("(0 rows)");
});
it("aligns columns from row data", () => {
const out = formatRowsAsAlignedTable([
{ a: 1, b: "x" },
{ a: 22, b: "yy" },
]);
expect(out).toContain("a");
expect(out).toContain("b");
expect(out).toContain("22");
});
});
describe("collectColumnKeys", () => {
it("preserves key order from first row then appends new keys", () => {
expect(
collectColumnKeys([
{ z: 1, a: 2 },
{ a: 3, b: 4 },
]),
).toEqual(["z", "a", "b"]);
});
});
describe("readonly query integration", () => {
it("runs default preview SQL on a real db", () => {
const p = join(tmpDir, "data", "senses", "demo.db");
const rw = new DatabaseSync(p);
rw.exec("CREATE TABLE items (id INTEGER PRIMARY KEY, v TEXT)");
rw.exec("INSERT INTO items (v) VALUES ('a'), ('b')");
rw.close();
const db = new DatabaseSync(p, { readOnly: true });
const table = pickDefaultPreviewTable(db);
expect(table).toBe("items");
if (table === null) {
throw new Error("expected items table");
}
const sql = defaultPreviewSql(table);
const rows = db.prepare(sql).all() as Record<string, unknown>[];
db.close();
expect(rows.length).toBeGreaterThanOrEqual(1);
});
});
+96 -1
View File
@@ -18,13 +18,17 @@ import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { import {
buildInspectOutput, buildInspectOutput,
buildListOutput, buildListOutput,
buildThreadCommandOutput,
DEFAULT_THREAD_BUDGET_CHARS,
formatThreadRoundBlock,
formatTs, formatTs,
getAllWorkflowRuns, getAllWorkflowRuns,
partitionCommandEvent,
parseIntArg, parseIntArg,
statusIcon, statusIcon,
} from "../commands/workflow.js"; } from "../commands/workflow.js";
import { triggerWorkflowViaDaemon } from "../daemon-client.js"; import { triggerWorkflowViaDaemon } from "../daemon-client.js";
import type { LogStore, WorkflowRun } from "../daemon-types.js"; import type { LogStore, ThreadRoundRow, WorkflowRun } from "../daemon-types.js";
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Test helpers // Test helpers
@@ -322,6 +326,97 @@ describe("workflow list — integration with real store", () => {
}); });
}); });
// ---------------------------------------------------------------------------
// nerve workflow thread — formatting helpers
// ---------------------------------------------------------------------------
describe("partitionCommandEvent", () => {
it("splits reserved type, role, content from rest", () => {
const p = partitionCommandEvent({
type: "scan_done",
role: "scanner",
content: "ok",
items: [1, 2],
});
expect(p.typeStr).toBe("scan_done");
expect(p.roleStr).toBe("scanner");
expect(p.contentBody).toBe("ok");
expect(p.rest).toEqual({ items: [1, 2] });
});
it("uses fallback role and stringifies non-string content", () => {
const p = partitionCommandEvent({ type: "x", content: { n: 1 } });
expect(p.roleStr).toBe("?");
expect(p.contentBody).toBe('{"n":1}');
});
});
describe("formatThreadRoundBlock", () => {
const row: ThreadRoundRow = {
round: 2,
logId: 99,
ts: new Date("2026-01-02T03:04:05.006Z").getTime(),
event: { type: "reply", role: "bot", content: "hi", score: 0.5 },
};
it("includes header, YAML frontmatter for rest, and body", () => {
const text = formatThreadRoundBlock(row);
expect(text).toContain("[#2 bot]");
expect(text).toContain("type=reply");
expect(text).toContain("---\n");
expect(text).toContain("score: 0.5");
expect(text).toContain("hi");
expect(text).not.toContain("role:");
});
});
describe("buildThreadCommandOutput", () => {
function row(n: number, content: string): ThreadRoundRow {
return {
round: n,
logId: 10 + n,
ts: 1000 + n,
event: { type: "ev", role: "r", content, extra: n },
};
}
it("orders rounds chronologically (oldest first in output)", () => {
const desc = [row(3, "ccc"), row(2, "bbb"), row(1, "aaa")];
const prefix = ["HEADER\n"];
const { lines, paginationHint } = buildThreadCommandOutput(prefix, desc, 50_000, "run-x");
const text = lines.join("");
const idxA = text.indexOf("\naaa\n");
const idxB = text.indexOf("\nbbb\n");
const idxC = text.indexOf("\nccc\n");
expect(idxA).toBeGreaterThan(-1);
expect(idxB).toBeGreaterThan(idxA);
expect(idxC).toBeGreaterThan(idxB);
expect(paginationHint).toBeNull();
});
it("emits pagination hint with --before when oldest shown round is still > 1", () => {
const desc = [row(4, "d"), row(3, "c")];
const { paginationHint } = buildThreadCommandOutput([], desc, 50_000, "run-y");
expect(paginationHint).toContain("--before 3");
expect(paginationHint).toContain("run-y");
});
it("respects budget and hints with non-default --budget in command", () => {
const big = "y".repeat(500);
const desc = [row(2, big), row(1, "a")];
const { lines, paginationHint } = buildThreadCommandOutput([], desc, 400, "run-z");
const text = lines.join("");
expect(text).toContain("[#2");
expect(text).not.toContain("[#1");
expect(paginationHint).toContain("--before 2");
expect(paginationHint).toContain("--budget 400");
});
it("default budget constant matches workflow command default", () => {
expect(DEFAULT_THREAD_BUDGET_CHARS).toBe(8000);
});
});
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// parseIntArg // parseIntArg
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
+29 -3
View File
@@ -1,14 +1,37 @@
import { defineCommand, runMain } from "citty"; import { defineCommand, runMain } from "citty";
import { daemonCommand } from "./commands/daemon.js";
import { devCommand } from "./commands/dev.js";
import { initCommand } from "./commands/init.js"; import { initCommand } from "./commands/init.js";
import { logsCommand } from "./commands/logs.js"; import { logsCommand } from "./commands/logs.js";
import { senseCommand } from "./commands/sense.js"; import { senseCommand } from "./commands/sense.js";
import { startCommand } from "./commands/start.js"; import { daemonStartCommand } from "./commands/start.js";
import { statusCommand } from "./commands/status.js"; import { statusCommand } from "./commands/status.js";
import { stopCommand } from "./commands/stop.js"; import { stopCommand } from "./commands/stop.js";
import { storeCommand } from "./commands/store.js";
import { validateCommand } from "./commands/validate.js"; import { validateCommand } from "./commands/validate.js";
import { workflowCommand } from "./commands/workflow.js"; import { workflowCommand } from "./commands/workflow.js";
/**
* Citty picks the first non-flag token as a subcommand name. Rewrite
* `nerve init --from <url>` so the URL is not mistaken for `workflow`/`workspace`.
*/
function normalizeNerveArgv(argv: string[]): string[] {
const initIdx = argv.indexOf("init");
if (initIdx === -1) return argv;
const tail = argv.slice(initIdx + 1);
const fromAt = tail.indexOf("--from");
if (fromAt === -1) return argv;
const beforeFrom = tail.slice(0, fromAt);
if (beforeFrom.some((a) => !a.startsWith("-"))) return argv;
const next = tail[fromAt + 1];
if (next === undefined || next.startsWith("-")) return argv;
const reserved = new Set(["workflow", "workspace"]);
if (reserved.has(next)) return argv;
const mergedTail = [...tail.slice(0, fromAt), `--from=${next}`, ...tail.slice(fromAt + 2)];
return [...argv.slice(0, initIdx + 1), ...mergedTail];
}
const main = defineCommand({ const main = defineCommand({
meta: { meta: {
name: "nerve", name: "nerve",
@@ -16,14 +39,17 @@ const main = defineCommand({
}, },
subCommands: { subCommands: {
init: initCommand, init: initCommand,
start: startCommand, daemon: daemonCommand,
dev: devCommand,
start: daemonStartCommand,
stop: stopCommand, stop: stopCommand,
status: statusCommand, status: statusCommand,
logs: logsCommand, logs: logsCommand,
validate: validateCommand, validate: validateCommand,
sense: senseCommand, sense: senseCommand,
store: storeCommand,
workflow: workflowCommand, workflow: workflowCommand,
}, },
}); });
runMain(main); runMain(main, { rawArgs: normalizeNerveArgv(process.argv.slice(2)) });
+31
View File
@@ -0,0 +1,31 @@
import { defineCommand } from "citty";
import { logsCommand } from "./logs.js";
import { daemonStartCommand, runDaemonStartCommand } from "./start.js";
import { statusCommand } from "./status.js";
import { runStopCommand, stopCommand } from "./stop.js";
const daemonRestartCommand = defineCommand({
meta: {
name: "restart",
description: "Stop then start the nerve daemon",
},
async run() {
await runStopCommand();
await runDaemonStartCommand();
},
});
export const daemonCommand = defineCommand({
meta: {
name: "daemon",
description: "Manage the nerve background daemon",
},
subCommands: {
start: daemonStartCommand,
stop: stopCommand,
status: statusCommand,
restart: daemonRestartCommand,
logs: logsCommand,
},
});
+17
View File
@@ -0,0 +1,17 @@
import { defineCommand } from "citty";
import { runForegroundKernelSession } from "../run-foreground-kernel.js";
import { loadDaemonModule } from "../workspace-daemon.js";
import { getNerveRoot } from "../workspace.js";
export const devCommand = defineCommand({
meta: {
name: "dev",
description: "Run the nerve kernel in the foreground (development mode)",
},
async run() {
const nerveRoot = getNerveRoot();
const { createKernel } = await loadDaemonModule(nerveRoot);
await runForegroundKernelSession(nerveRoot, createKernel);
},
});
+102 -39
View File
@@ -1,5 +1,7 @@
import { existsSync, mkdirSync, writeFileSync } from "node:fs"; import { execFile, spawn } from "node:child_process";
import { existsSync, mkdirSync, readdirSync, writeFileSync } from "node:fs";
import { dirname, join } from "node:path"; import { dirname, join } from "node:path";
import { promisify } from "node:util";
import { defineCommand } from "citty"; import { defineCommand } from "citty";
@@ -33,7 +35,7 @@ const PACKAGE_JSON = `{
"drizzle-kit": "latest" "drizzle-kit": "latest"
}, },
"pnpm": { "pnpm": {
"onlyBuiltDependencies": ["better-sqlite3", "esbuild"] "onlyBuiltDependencies": ["esbuild"]
} }
} }
`; `;
@@ -42,6 +44,8 @@ const GITIGNORE = `data/
node_modules/ node_modules/
`; `;
const execFileAsync = promisify(execFile);
const CPU_SCHEMA_TS = `import { integer, real, sqliteTable, text } from "drizzle-orm/sqlite-core"; const CPU_SCHEMA_TS = `import { integer, real, sqliteTable, text } from "drizzle-orm/sqlite-core";
export const cpuUsage = sqliteTable("cpu_usage", { export const cpuUsage = sqliteTable("cpu_usage", {
@@ -90,7 +94,6 @@ function writeFile(filePath: string, content: string): void {
} }
async function runCommand(cmd: string, args: string[], cwd: string): Promise<void> { async function runCommand(cmd: string, args: string[], cwd: string): Promise<void> {
const { spawn } = await import("node:child_process");
await new Promise<void>((resolve, reject) => { await new Promise<void>((resolve, reject) => {
const child = spawn(cmd, args, { cwd, stdio: "inherit" }); const child = spawn(cmd, args, { cwd, stdio: "inherit" });
child.on("close", (code) => { child.on("close", (code) => {
@@ -102,10 +105,6 @@ async function runCommand(cmd: string, args: string[], cwd: string): Promise<voi
} }
async function detectPackageManager(): Promise<{ cmd: string; installArgs: string[] }> { async function detectPackageManager(): Promise<{ cmd: string; installArgs: string[] }> {
const { execFile } = await import("node:child_process");
const { promisify } = await import("node:util");
const execFileAsync = promisify(execFile);
for (const pm of ["pnpm", "yarn", "npm"]) { for (const pm of ["pnpm", "yarn", "npm"]) {
try { try {
await execFileAsync(pm, ["--version"]); await execFileAsync(pm, ["--version"]);
@@ -219,23 +218,94 @@ const initWorkspaceCommand = defineCommand({
}, },
}); });
async function tryRequireSqlite(nerveRoot: string): Promise<boolean> { /** Verify built-in `node:sqlite` (Node.js ≥22.5) loads in a child process. */
async function verifyNodeSqlite(): Promise<boolean> {
try { try {
const modulePath = join(nerveRoot, "node_modules", "better-sqlite3"); await execFileAsync(
// Use a child process to test if the native module loads "node",
const { execFile } = await import("node:child_process"); [
const { promisify } = await import("node:util"); "--input-type=module",
const execFileAsync = promisify(execFile); "-e",
await execFileAsync("node", ["-e", `require(${JSON.stringify(modulePath)})`], { "import { DatabaseSync } from 'node:sqlite'; new DatabaseSync(':memory:').exec('SELECT 1');",
cwd: nerveRoot, ],
timeout: 10_000, { timeout: 10_000 },
}); );
return true; return true;
} catch { } catch {
return false; return false;
} }
} }
function isNerveRootNonEmpty(nerveRoot: string): boolean {
if (!existsSync(nerveRoot)) return false;
return readdirSync(nerveRoot).length > 0;
}
async function runInitFromGit(url: string): Promise<void> {
const trimmed = url.trim();
if (trimmed.length === 0) {
process.stderr.write("❌ --from requires a non-empty git URL.\n");
process.exit(1);
}
const nerveRoot = getNerveRoot();
if (isNerveRootNonEmpty(nerveRoot)) {
process.stderr.write(
`${nerveRoot} already exists and is not empty. Remove it (or empty it) before using --from.\n`,
);
process.exit(1);
}
try {
await execFileAsync("git", ["--version"]);
} catch {
process.stderr.write("❌ git is not available. Install git and retry.\n");
process.exit(1);
}
try {
await execFileAsync("pnpm", ["--version"]);
} catch {
process.stderr.write("❌ pnpm is not available. Install pnpm and retry.\n");
process.exit(1);
}
process.stdout.write(`Cloning ${trimmed}${nerveRoot}\n`);
try {
await runCommand("git", ["clone", trimmed, nerveRoot], process.cwd());
} catch {
process.stderr.write("❌ git clone failed.\n");
process.exit(1);
}
if (!existsSync(join(nerveRoot, "nerve.yaml"))) {
process.stdout.write(`⚠️ ${join(nerveRoot, "nerve.yaml")} not found after clone.\n`);
}
if (!existsSync(join(nerveRoot, "package.json"))) {
process.stdout.write(`⚠️ ${join(nerveRoot, "package.json")} not found after clone.\n`);
}
process.stdout.write("Installing dependencies with pnpm …\n");
try {
await runCommand("pnpm", ["install", "--no-cache"], nerveRoot);
} catch {
process.stdout.write(
`⚠️ pnpm install failed. Try manually:\n cd ${nerveRoot} && pnpm install --no-cache\n`,
);
}
if (!(await verifyNodeSqlite())) {
process.stdout.write(
"⚠️ Built-in SQLite (node:sqlite) is not available in this Node.js build. " +
"The daemon requires Node.js 22.5 or newer with SQLite enabled.\n",
);
}
process.stdout.write(
`✅ Workspace cloned to ${nerveRoot}\n\n💡 Next steps:\n 1. Review nerve.yaml and install any missing tooling.\n 2. Run \`nerve start\` to launch the daemon.\n`,
);
}
async function runInitWorkspace(force: boolean): Promise<void> { async function runInitWorkspace(force: boolean): Promise<void> {
const nerveRoot = getNerveRoot(); const nerveRoot = getNerveRoot();
@@ -268,27 +338,11 @@ async function runInitWorkspace(force: boolean): Promise<void> {
); );
} }
// Verify better-sqlite3 native module — rebuild up to 2 times if broken if (!(await verifyNodeSqlite())) {
const sqlitePath = join(nerveRoot, "node_modules", "better-sqlite3"); process.stdout.write(
if (existsSync(sqlitePath)) { "⚠️ Built-in SQLite (node:sqlite) is not available in this Node.js build. " +
for (let attempt = 1; attempt <= 2; attempt++) { "The daemon requires Node.js 22.5 or newer with SQLite enabled.\n",
if (await tryRequireSqlite(nerveRoot)) break; );
process.stdout.write(
`${attempt === 1 ? "Building" : "Retrying build of"} native module better-sqlite3 (attempt ${attempt}/2)…\n`,
);
try {
await runCommand(cmd, ["rebuild", "better-sqlite3"], nerveRoot);
} catch {
// will be caught by the verify below
}
}
if (!(await tryRequireSqlite(nerveRoot))) {
process.stdout.write(
`⚠️ better-sqlite3 native module is not working. The daemon will fail to start.\n` +
` Fix: cd ${nerveRoot} && ${cmd} rebuild better-sqlite3\n` +
` Or: npm install --build-from-source better-sqlite3\n`,
);
}
} }
if (!existsSync(join(nerveRoot, ".git"))) { if (!existsSync(join(nerveRoot, ".git"))) {
@@ -310,7 +364,7 @@ export const initCommand = defineCommand({
meta: { meta: {
name: "init", name: "init",
description: description:
"Initialize workspace (nerve init) or scaffold templates (nerve init workflow <name>)", "Initialize workspace (nerve init), clone from git (nerve init --from <url>), or scaffold templates (nerve init workflow <name>)",
}, },
args: { args: {
force: { force: {
@@ -318,12 +372,21 @@ export const initCommand = defineCommand({
description: "Reinitialize even if workspace already exists (preserves data/)", description: "Reinitialize even if workspace already exists (preserves data/)",
default: false, default: false,
}, },
from: {
type: "string",
description: "Clone an existing git repo into ~/.uncaged-nerve instead of scaffolding",
required: false,
},
}, },
subCommands: { subCommands: {
workflow: initWorkflowCommand, workflow: initWorkflowCommand,
workspace: initWorkspaceCommand, workspace: initWorkspaceCommand,
}, },
async run({ args }) { async run({ args }) {
if (args.from !== undefined) {
await runInitFromGit(String(args.from));
return;
}
await runInitWorkspace(args.force); await runInitWorkspace(args.force);
}, },
}); });
+123 -5
View File
@@ -1,11 +1,20 @@
import { readFileSync } from "node:fs"; import { readFileSync } from "node:fs";
import { join } from "node:path"; import { join } from "node:path";
import { DatabaseSync } from "node:sqlite";
import { parseNerveConfig } from "@uncaged/nerve-core"; import { type SenseInfo, parseNerveConfig } from "@uncaged/nerve-core";
import { defineCommand } from "citty"; import { defineCommand } from "citty";
import { listSensesViaDaemon, triggerSenseViaDaemon } from "../daemon-client.js"; import { listSensesViaDaemon, triggerSenseViaDaemon } from "../daemon-client.js";
import type { SenseInfo } from "../daemon-client.js"; import {
assertSenseDbExists,
defaultPreviewSql,
formatRowsAsAlignedTable,
listTableSqlStatements,
openSenseDb,
parseSenseQueryArgs,
pickDefaultPreviewTable,
} from "../sense-sqlite.js";
import { getNerveRoot, getSocketPath, isRunning } from "../workspace.js"; import { getNerveRoot, getSocketPath, isRunning } from "../workspace.js";
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -35,8 +44,7 @@ export function formatSenseList(senses: SenseInfo[]): string {
lines.push(` group: ${s.group}\n`); lines.push(` group: ${s.group}\n`);
lines.push(` throttle: ${formatDuration(s.throttle)}\n`); lines.push(` throttle: ${formatDuration(s.throttle)}\n`);
lines.push(` timeout: ${formatDuration(s.timeout)}\n`); lines.push(` timeout: ${formatDuration(s.timeout)}\n`);
const lastSignal = const lastSignal = s.lastSignalTs !== null ? new Date(s.lastSignalTs).toISOString() : "(never)";
s.lastSignalTs !== null ? new Date(s.lastSignalTs).toISOString() : "(never)";
lines.push(` last signal: ${lastSignal}\n`); lines.push(` last signal: ${lastSignal}\n`);
} }
return lines.join(""); return lines.join("");
@@ -72,7 +80,6 @@ const senseListCommand = defineCommand({
}, },
async run() { async run() {
if (!isRunning()) { if (!isRunning()) {
// Daemon not running — show static info from nerve.yaml
process.stderr.write( process.stderr.write(
"⚠️ Daemon is not running — showing static config only (no last signal time).\n\n", "⚠️ Daemon is not running — showing static config only (no last signal time).\n\n",
); );
@@ -141,6 +148,115 @@ const senseTriggerCommand = defineCommand({
}, },
}); });
// ---------------------------------------------------------------------------
// nerve sense schema <name>
// ---------------------------------------------------------------------------
const senseSchemaCommand = defineCommand({
meta: {
name: "schema",
description: "Print CREATE TABLE statements from a sense SQLite database",
},
args: {
name: {
type: "positional",
description: "Sense name (data/senses/<name>.db under the nerve workspace)",
},
json: {
type: "boolean",
description: "Print JSON array of CREATE TABLE SQL strings",
default: false,
},
},
async run({ args }) {
const nerveRoot = getNerveRoot();
let db: DatabaseSync | undefined;
try {
db = openSenseDb(nerveRoot, args.name);
const statements = listTableSqlStatements(db);
if (args.json) {
process.stdout.write(`${JSON.stringify(statements, null, 2)}\n`);
} else if (statements.length === 0) {
process.stdout.write("(no tables)\n");
} else {
for (const sql of statements) {
process.stdout.write(`${sql};\n\n`);
}
}
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`${msg}\n`);
process.exit(1);
} finally {
db?.close();
}
},
});
// ---------------------------------------------------------------------------
// nerve sense query <name> [sql...]
// ---------------------------------------------------------------------------
const senseQueryCommand = defineCommand({
meta: {
name: "query",
description:
"Run a read-only SQL query against a sense database (default: last 10 rows of the first data table). Pass optional SQL after the sense name; multiple words are joined.",
},
args: {
name: {
type: "positional",
description: "Sense name (data/senses/<name>.db under the nerve workspace)",
},
json: {
type: "boolean",
description: "Print result rows as JSON",
default: false,
},
},
async run({ args, rawArgs }) {
const nerveRoot = getNerveRoot();
let db: DatabaseSync | undefined;
try {
let parsed: { name: string; sql: string | undefined };
try {
parsed = parseSenseQueryArgs(rawArgs);
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`${msg}\n`);
process.exit(1);
}
db = openSenseDb(nerveRoot, args.name);
let sql = parsed.sql?.trim();
if (!sql) {
const table = pickDefaultPreviewTable(db);
if (table === null) {
process.stderr.write("❌ No tables found in database.\n");
process.exit(1);
} else {
sql = defaultPreviewSql(table);
}
}
const rows = db.prepare(sql).all() as Record<string, unknown>[];
if (args.json) {
process.stdout.write(`${JSON.stringify(rows, null, 2)}\n`);
} else {
process.stdout.write(formatRowsAsAlignedTable(rows));
}
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`${msg}\n`);
process.exit(1);
} finally {
db?.close();
}
},
});
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// nerve sense (parent command) // nerve sense (parent command)
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -153,5 +269,7 @@ export const senseCommand = defineCommand({
subCommands: { subCommands: {
list: senseListCommand, list: senseListCommand,
trigger: senseTriggerCommand, trigger: senseTriggerCommand,
schema: senseSchemaCommand,
query: senseQueryCommand,
}, },
}); });
+14 -30
View File
@@ -1,3 +1,4 @@
import { spawn } from "node:child_process";
import { createWriteStream, existsSync } from "node:fs"; import { createWriteStream, existsSync } from "node:fs";
import { mkdir } from "node:fs/promises"; import { mkdir } from "node:fs/promises";
import { dirname, join } from "node:path"; import { dirname, join } from "node:path";
@@ -5,11 +6,10 @@ import { fileURLToPath } from "node:url";
import { defineCommand } from "citty"; import { defineCommand } from "citty";
import { runForegroundKernelSession } from "../run-foreground-kernel.js";
import { loadDaemonModule } from "../workspace-daemon.js";
import { import {
getLogPath, getLogPath,
getNerveRoot, getNerveRoot,
getSocketPath,
isRunning, isRunning,
readPidFile, readPidFile,
removePidFile, removePidFile,
@@ -52,15 +52,10 @@ function daemonBootstrapScript(): string {
return bootstrapJs; return bootstrapJs;
} }
throw new Error( throw new Error(
`daemon-bootstrap.js not found next to CLI at ${bootstrapJs}. Build the CLI package (e.g. \`pnpm --filter @uncaged/nerve-cli build\`) before using background mode (\`nerve start -d\`).`, `daemon-bootstrap.js not found next to CLI at ${bootstrapJs}. Build the CLI package (e.g. \`pnpm --filter @uncaged/nerve-cli build\`) before using \`nerve daemon start\`.`,
); );
} }
async function runForeground(nerveRoot: string): Promise<void> {
const { createKernel } = await loadDaemonModule(nerveRoot);
await runForegroundKernelSession(nerveRoot, createKernel);
}
async function runDaemon(nerveRoot: string): Promise<void> { async function runDaemon(nerveRoot: string): Promise<void> {
if (isRunning()) { if (isRunning()) {
const pid = readPidFile(); const pid = readPidFile();
@@ -71,7 +66,6 @@ async function runDaemon(nerveRoot: string): Promise<void> {
const logPath = getLogPath(); const logPath = getLogPath();
await mkdir(join(nerveRoot, "logs"), { recursive: true }); await mkdir(join(nerveRoot, "logs"), { recursive: true });
const { spawn } = await import("node:child_process");
const logStream = createWriteStream(logPath, { flags: "a" }); const logStream = createWriteStream(logPath, { flags: "a" });
await new Promise<void>((resolve) => { await new Promise<void>((resolve) => {
if (logStream.pending) logStream.once("open", () => resolve()); if (logStream.pending) logStream.once("open", () => resolve());
@@ -82,7 +76,7 @@ async function runDaemon(nerveRoot: string): Promise<void> {
const child = spawn(process.execPath, [bootstrapPath], { const child = spawn(process.execPath, [bootstrapPath], {
detached: true, detached: true,
stdio: ["ignore", logStream.fd, logStream.fd], stdio: ["ignore", (logStream as any).fd, (logStream as any).fd],
env: { ...process.env, NERVE_ROOT: nerveRoot }, env: { ...process.env, NERVE_ROOT: nerveRoot },
cwd: nerveRoot, cwd: nerveRoot,
}); });
@@ -97,7 +91,6 @@ async function runDaemon(nerveRoot: string): Promise<void> {
writePidFile(pid); writePidFile(pid);
const { getSocketPath } = await import("../workspace.js");
const ready = await waitForSocket(getSocketPath(), 5000); const ready = await waitForSocket(getSocketPath(), 5000);
if (!ready || !isRunning()) { if (!ready || !isRunning()) {
@@ -110,29 +103,20 @@ async function runDaemon(nerveRoot: string): Promise<void> {
process.stdout.write(`✅ Nerve daemon started (pid ${pid}).\n`); process.stdout.write(`✅ Nerve daemon started (pid ${pid}).\n`);
process.stdout.write(` Logs: ${logPath}\n`); process.stdout.write(` Logs: ${logPath}\n`);
process.stdout.write(" Run `nerve stop` to stop.\n"); process.stdout.write(" Run `nerve daemon stop` (or `nerve stop`) to stop.\n");
} }
export const startCommand = defineCommand({ /** Background daemon only — use `nerve dev` for foreground mode. */
export async function runDaemonStartCommand(): Promise<void> {
await runDaemon(getNerveRoot());
}
export const daemonStartCommand = defineCommand({
meta: { meta: {
name: "start", name: "start",
description: "Start the nerve daemon", description: "Start the nerve daemon in the background",
}, },
args: { async run() {
daemon: { await runDaemonStartCommand();
type: "boolean",
alias: "d",
description: "Run as background daemon",
default: false,
},
},
async run({ args }) {
const nerveRoot = getNerveRoot();
if (args.daemon) {
await runDaemon(nerveRoot);
} else {
await runForeground(nerveRoot);
}
}, },
}); });
+38 -33
View File
@@ -15,44 +15,49 @@ async function waitForExit(pid: number, timeoutMs: number): Promise<boolean> {
return false; return false;
} }
/** Core stop logic — also used by `nerve daemon restart`. */
export async function runStopCommand(): Promise<void> {
const pid = readPidFile();
if (pid === null) {
process.stdout.write("⚠️ No PID file found — daemon may not be running.\n");
return;
}
if (!isRunning()) {
process.stdout.write("⚠️ Daemon is not running (stale PID file). Cleaning up.\n");
removePidFile();
return;
}
process.stdout.write(`Stopping nerve daemon (pid ${pid})…\n`);
try {
process.kill(pid, "SIGTERM");
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`❌ Failed to send SIGTERM: ${msg}\n`);
process.exit(1);
}
const graceful = await waitForExit(pid, 10_000);
if (!graceful) {
process.stdout.write("⚠️ Daemon did not exit in 10s — sending SIGKILL.\n");
try {
process.kill(pid, "SIGKILL");
} catch {
// already dead
}
}
removePidFile();
process.stdout.write("✅ Daemon stopped.\n");
}
export const stopCommand = defineCommand({ export const stopCommand = defineCommand({
meta: { meta: {
name: "stop", name: "stop",
description: "Stop the nerve daemon", description: "Stop the nerve daemon",
}, },
async run() { async run() {
const pid = readPidFile(); await runStopCommand();
if (pid === null) {
process.stdout.write("⚠️ No PID file found — daemon may not be running.\n");
return;
}
if (!isRunning()) {
process.stdout.write("⚠️ Daemon is not running (stale PID file). Cleaning up.\n");
removePidFile();
return;
}
process.stdout.write(`Stopping nerve daemon (pid ${pid})…\n`);
try {
process.kill(pid, "SIGTERM");
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`❌ Failed to send SIGTERM: ${msg}\n`);
process.exit(1);
}
const graceful = await waitForExit(pid, 10_000);
if (!graceful) {
process.stdout.write("⚠️ Daemon did not exit in 10s — sending SIGKILL.\n");
try {
process.kill(pid, "SIGKILL");
} catch {
// already dead
}
}
removePidFile();
process.stdout.write("✅ Daemon stopped.\n");
}, },
}); });
+70
View File
@@ -0,0 +1,70 @@
import { existsSync } from "node:fs";
import { join } from "node:path";
import { defineCommand } from "citty";
import { loadDaemonModule } from "../workspace-daemon.js";
import { getNerveRoot } from "../workspace.js";
// ---------------------------------------------------------------------------
// nerve store archive
// ---------------------------------------------------------------------------
const storeArchiveCommand = defineCommand({
meta: {
name: "archive",
description:
"Export logs older than 30 days from logs.db to data/archive/logs/YYYY-MM-DD.jsonl and delete those rows (RFC-001 §5.4)",
},
args: {
vacuum: {
type: "boolean",
description: "Run SQLite VACUUM after archiving",
default: false,
},
},
async run({ args }) {
const nerveRoot = getNerveRoot();
const dbPath = join(nerveRoot, "data", "logs.db");
if (!existsSync(dbPath)) {
process.stderr.write("❌ No data/logs.db found — start the daemon at least once.\n");
process.exit(1);
}
const { createLogStore } = await loadDaemonModule(nerveRoot);
const store = createLogStore(dbPath);
try {
const result = store.archiveLogs({ vacuum: args.vacuum });
if (result.days.length === 0) {
process.stdout.write(
"✅ Nothing to archive (no eligible UTC days beyond the 30-day window).\n",
);
} else {
process.stdout.write(`✅ Archived ${result.days.length} day(s):\n`);
for (const d of result.days) {
process.stdout.write(` ${d.day} rows=${d.rowCount} ${d.filePath}\n`);
}
}
if (result.vacuumed) {
process.stdout.write(" VACUUM completed.\n");
}
} finally {
store.close();
}
},
});
// ---------------------------------------------------------------------------
// nerve store
// ---------------------------------------------------------------------------
export const storeCommand = defineCommand({
meta: {
name: "store",
description: "Maintain local Nerve SQLite stores (log cold-archive, …)",
},
subCommands: {
archive: storeArchiveCommand,
},
});
+212 -1
View File
@@ -2,14 +2,21 @@ import { existsSync } from "node:fs";
import { join } from "node:path"; import { join } from "node:path";
import { defineCommand } from "citty"; import { defineCommand } from "citty";
import { stringify } from "yaml";
import { triggerWorkflowViaDaemon } from "../daemon-client.js"; import { triggerWorkflowViaDaemon } from "../daemon-client.js";
import type { LogStore, WorkflowRun } from "../daemon-types.js"; import type { LogStore, ThreadRoundRow, WorkflowRun } from "../daemon-types.js";
import { loadDaemonModule } from "../workspace-daemon.js"; import { loadDaemonModule } from "../workspace-daemon.js";
import { getNerveRoot, getSocketPath, isRunning } from "../workspace.js"; import { getNerveRoot, getSocketPath, isRunning } from "../workspace.js";
export const DEFAULT_PAGE_SIZE = 20; export const DEFAULT_PAGE_SIZE = 20;
/** Default max characters for `nerve workflow thread` output (including run header). */
export const DEFAULT_THREAD_BUDGET_CHARS = 8000;
/** Max role-round rows read from SQLite per invocation (DESC by round). */
export const THREAD_ROUNDS_FETCH_LIMIT = 8192;
export function parseIntArg(raw: string, fallback: number): number { export function parseIntArg(raw: string, fallback: number): number {
const v = Number.parseInt(raw, 10); const v = Number.parseInt(raw, 10);
return Number.isNaN(v) ? fallback : v; return Number.isNaN(v) ? fallback : v;
@@ -172,6 +179,123 @@ export function buildInspectOutput(
return { header, eventLines, paginationHint }; return { header, eventLines, paginationHint };
} }
// ---------------------------------------------------------------------------
// nerve workflow thread <runId> — agent-oriented role rounds
// ---------------------------------------------------------------------------
export type PartitionedEvent = {
typeStr: string;
roleStr: string;
contentBody: string;
rest: Record<string, unknown>;
};
/**
* Split a CommandEvent: `type`, `role`, and `content` are reserved for the
* header / body; all other fields are serialized as YAML frontmatter.
*/
export function partitionCommandEvent(event: Record<string, unknown>): PartitionedEvent {
const typeStr =
typeof event.type === "string" ? event.type : String(event.type === undefined ? "?" : event.type);
const roleStr = typeof event.role === "string" ? event.role : "?";
const contentRaw = event.content;
const contentBody =
contentRaw === undefined || contentRaw === null
? ""
: typeof contentRaw === "string"
? contentRaw
: JSON.stringify(contentRaw);
const rest: Record<string, unknown> = {};
for (const key of Object.keys(event)) {
if (key === "type" || key === "role" || key === "content") continue;
rest[key] = event[key];
}
return { typeStr, roleStr, contentBody, rest };
}
/**
* One role round as plain text: header line, YAML frontmatter (`rest` only), body (`content`).
*/
export function formatThreadRoundBlock(row: ThreadRoundRow): string {
const { typeStr, roleStr, contentBody, rest } = partitionCommandEvent(row.event);
const yamlBlock =
Object.keys(rest).length === 0 ? "{}\n" : `${stringify(rest, { lineWidth: 100 })}\n`;
return (
`[#${row.round} ${roleStr}] ${formatTs(row.ts)} type=${typeStr}\n` +
`---\n` +
yamlBlock +
`---\n` +
`${contentBody}\n\n`
);
}
export type ThreadCommandOutput = {
lines: string[];
paginationHint: string | null;
};
/**
* Build stdout lines for `nerve workflow thread`: newest-first selection from
* `descRows` until `budgetChars` (including `prefixLines`), then chronological order.
*/
export function buildThreadCommandOutput(
prefixLines: string[],
descRows: ThreadRoundRow[],
budgetChars: number,
runId: string,
): ThreadCommandOutput {
const prefixText = prefixLines.join("");
let remaining = Math.max(0, budgetChars - prefixText.length);
const picked: ThreadRoundRow[] = [];
const budgetFlag =
budgetChars === DEFAULT_THREAD_BUDGET_CHARS ? "" : ` --budget ${String(budgetChars)}`;
for (const row of descRows) {
const block = formatThreadRoundBlock(row);
if (block.length <= remaining) {
picked.push(row);
remaining -= block.length;
continue;
}
if (picked.length === 0) {
const { typeStr, roleStr, contentBody, rest } = partitionCommandEvent(row.event);
const yamlBlock =
Object.keys(rest).length === 0
? "{}\n"
: `${stringify(rest, { lineWidth: 100 })}\n`;
const header =
`[#${row.round} ${roleStr}] ${formatTs(row.ts)} type=${typeStr}\n` + `---\n` + yamlBlock + `---\n`;
const maxBody = Math.max(0, remaining - header.length - `[truncated]\n`.length);
const truncated =
maxBody > 0 && contentBody.length > maxBody
? `${contentBody.slice(0, maxBody)}\n[truncated]\n`
: `${contentBody}\n[truncated]\n`;
const single = header + truncated + "\n";
const hintRound = row.round;
return {
lines: [...prefixLines, single],
paginationHint:
hintRound > 1
? `\n⏩ Older rounds exist. Fetch with:\n nerve workflow thread ${runId} --before ${String(hintRound)}${budgetFlag}\n`
: null,
};
}
break;
}
const blocksAsc = picked.map(formatThreadRoundBlock).reverse();
const shownMinRound = picked.length === 0 ? null : Math.min(...picked.map((r) => r.round));
let paginationHint: string | null = null;
if (shownMinRound !== null && shownMinRound > 1) {
paginationHint =
`\n⏩ Older rounds not shown. Fetch with:\n` +
` nerve workflow thread ${runId} --before ${String(shownMinRound)}${budgetFlag}\n`;
}
return { lines: [...prefixLines, ...blocksAsc], paginationHint };
}
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// nerve workflow list // nerve workflow list
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -293,6 +417,92 @@ const workflowInspectCommand = defineCommand({
}, },
}); });
// ---------------------------------------------------------------------------
// nerve workflow thread <runId>
// ---------------------------------------------------------------------------
const workflowThreadCommand = defineCommand({
meta: {
name: "thread",
description: "Print role rounds for a workflow run (agent-oriented, budget-limited)",
},
args: {
runId: {
type: "positional",
description: "The run ID to dump role rounds for",
},
before: {
type: "string",
description:
"Exclusive upper bound on 1-based round index (use with hint from prior output to load older rounds)",
default: "0",
},
budget: {
type: "string",
description: `Max output characters including header (default: ${String(DEFAULT_THREAD_BUDGET_CHARS)})`,
default: String(DEFAULT_THREAD_BUDGET_CHARS),
},
},
async run({ args }) {
const store = await openStore();
try {
const before = Math.max(0, parseIntArg(args.before, 0));
const budgetChars = Math.max(1, parseIntArg(args.budget, DEFAULT_THREAD_BUDGET_CHARS));
const run = store.getWorkflowRun(args.runId);
if (run === null) {
process.stderr.write(`❌ No workflow run found with runId: ${args.runId}\n`);
process.exit(1);
}
const totalRoleRounds = store.getThreadRoundCount(args.runId);
if (totalRoleRounds === 0) {
process.stdout.write(
`🧵 Workflow thread: ${run.runId}\n` +
` workflow: ${run.workflow}\n` +
` status: ${run.status}\n\n` +
`📭 No role rounds recorded for this run.\n`,
);
return;
}
const descRows = store.getThreadRounds(args.runId, {
before,
limit: THREAD_ROUNDS_FETCH_LIMIT,
});
const prefixLines = [
`🧵 Role rounds (workflow thread)\n`,
` runId: ${run.runId}\n`,
` workflow: ${run.workflow}\n`,
` status: ${run.status}\n`,
` rounds: ${String(totalRoleRounds)} role event(s) total\n\n`,
];
const { lines, paginationHint } = buildThreadCommandOutput(
prefixLines,
descRows,
budgetChars,
args.runId,
);
for (const line of lines) {
process.stdout.write(line);
}
if (paginationHint !== null) {
process.stdout.write(paginationHint);
}
if (descRows.length === 0 && before > 0) {
process.stdout.write(`\n📭 No rounds with index < ${String(before)}.\n`);
}
} finally {
store.close();
}
},
});
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// nerve workflow trigger <name> // nerve workflow trigger <name>
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -359,6 +569,7 @@ export const workflowCommand = defineCommand({
subCommands: { subCommands: {
list: workflowListCommand, list: workflowListCommand,
inspect: workflowInspectCommand, inspect: workflowInspectCommand,
thread: workflowThreadCommand,
trigger: workflowTriggerCommand, trigger: workflowTriggerCommand,
}, },
}); });
-2
View File
@@ -1,5 +1,3 @@
#!/usr/bin/env node
import { runForegroundKernelSession } from "./run-foreground-kernel.js"; import { runForegroundKernelSession } from "./run-foreground-kernel.js";
import { loadDaemonModule } from "./workspace-daemon.js"; import { loadDaemonModule } from "./workspace-daemon.js";
+40 -85
View File
@@ -8,18 +8,14 @@
import { connect } from "node:net"; import { connect } from "node:net";
import type { Socket } from "node:net"; import type { Socket } from "node:net";
import type { SenseInfo } from "@uncaged/nerve-core";
const CONNECT_TIMEOUT_MS = 3_000; const CONNECT_TIMEOUT_MS = 3_000;
const RESPONSE_TIMEOUT_MS = 5_000; const RESPONSE_TIMEOUT_MS = 5_000;
type TriggerResponse = { ok: true } | { ok: false; error: string }; export type { SenseInfo };
export type SenseInfo = { type TriggerResponse = { ok: true } | { ok: false; error: string };
name: string;
group: string;
throttle: number | null;
timeout: number | null;
lastSignalTs: number | null;
};
type ListSensesResponse = { ok: true; senses: SenseInfo[] } | { ok: false; error: string }; type ListSensesResponse = { ok: true; senses: SenseInfo[] } | { ok: false; error: string };
@@ -37,12 +33,36 @@ function parseDaemonResponse(line: string): TriggerResponse {
return { ok: false, error: `Unexpected daemon response: ${line}` }; return { ok: false, error: `Unexpected daemon response: ${line}` };
} }
function sendAndReceive(socketPath: string, message: object): Promise<TriggerResponse> { function parseListSensesResponse(line: string): ListSensesResponse {
try {
const obj = JSON.parse(line) as unknown;
if (obj !== null && typeof obj === "object") {
const r = obj as Record<string, unknown>;
if (r.ok === false && typeof r.error === "string") return { ok: false, error: r.error };
if (r.ok === true && Array.isArray(r.senses))
return { ok: true, senses: r.senses as SenseInfo[] };
}
} catch {
// fall through
}
return { ok: false, error: `Unexpected daemon response: ${line}` };
}
/**
* Connect to the daemon socket, send one JSON request (newline-terminated),
* and resolve with the first non-empty line parsed by `parseFirstLine`.
*/
function sendAndReceive<T>(
socketPath: string,
message: object,
parseFirstLine: (trimmed: string) => T,
responseTimeoutMs: number = RESPONSE_TIMEOUT_MS,
): Promise<T> {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
let socket: Socket | null = null; let socket: Socket | null = null;
let settled = false; let settled = false;
function settle(result: TriggerResponse | Error): void { function settle(result: T | Error): void {
if (settled) return; if (settled) return;
settled = true; settled = true;
if (socket !== null) { if (socket !== null) {
@@ -65,7 +85,7 @@ function sendAndReceive(socketPath: string, message: object): Promise<TriggerRes
const responseTimer = setTimeout(() => { const responseTimer = setTimeout(() => {
settle(new Error("Timed out waiting for daemon response")); settle(new Error("Timed out waiting for daemon response"));
}, RESPONSE_TIMEOUT_MS); }, responseTimeoutMs);
let buf = ""; let buf = "";
socket?.on("data", (chunk: Buffer) => { socket?.on("data", (chunk: Buffer) => {
@@ -76,7 +96,7 @@ function sendAndReceive(socketPath: string, message: object): Promise<TriggerRes
const trimmed = line.trim(); const trimmed = line.trim();
if (trimmed.length === 0) continue; if (trimmed.length === 0) continue;
clearTimeout(responseTimer); clearTimeout(responseTimer);
settle(parseDaemonResponse(trimmed)); settle(parseFirstLine(trimmed));
return; return;
} }
}); });
@@ -101,18 +121,19 @@ export function triggerWorkflowViaDaemon(
workflow: string, workflow: string,
payload: unknown, payload: unknown,
): Promise<TriggerResponse> { ): Promise<TriggerResponse> {
return sendAndReceive(socketPath, { type: "trigger-workflow", workflow, payload }); return sendAndReceive(
socketPath,
{ type: "trigger-workflow", workflow, payload },
parseDaemonResponse,
);
} }
/** /**
* Send a trigger-sense message to the running daemon via its Unix socket. * Send a trigger-sense message to the running daemon via its Unix socket.
* Resolves with the daemon's response or rejects on connection/timeout errors. * Resolves with the daemon's response or rejects on connection/timeout errors.
*/ */
export function triggerSenseViaDaemon( export function triggerSenseViaDaemon(socketPath: string, sense: string): Promise<TriggerResponse> {
socketPath: string, return sendAndReceive(socketPath, { type: "trigger-sense", sense }, parseDaemonResponse);
sense: string,
): Promise<TriggerResponse> {
return sendAndReceive(socketPath, { type: "trigger-sense", sense });
} }
/** /**
@@ -120,71 +141,5 @@ export function triggerSenseViaDaemon(
* Resolves with the list of registered senses or rejects on connection/timeout errors. * Resolves with the list of registered senses or rejects on connection/timeout errors.
*/ */
export function listSensesViaDaemon(socketPath: string): Promise<ListSensesResponse> { export function listSensesViaDaemon(socketPath: string): Promise<ListSensesResponse> {
return new Promise((resolve, reject) => { return sendAndReceive(socketPath, { type: "list-senses" }, parseListSensesResponse);
let socket: Socket | null = null;
let settled = false;
function settle(result: ListSensesResponse | Error): void {
if (settled) return;
settled = true;
if (socket !== null) {
socket.destroy();
socket = null;
}
if (result instanceof Error) {
reject(result);
} else {
resolve(result);
}
}
const connectTimer = setTimeout(() => {
settle(new Error(`Timed out connecting to daemon socket: ${socketPath}`));
}, CONNECT_TIMEOUT_MS);
socket = connect(socketPath, () => {
clearTimeout(connectTimer);
const responseTimer = setTimeout(() => {
settle(new Error("Timed out waiting for daemon response"));
}, RESPONSE_TIMEOUT_MS);
let buf = "";
socket?.on("data", (chunk: Buffer) => {
buf += chunk.toString("utf8");
const lines = buf.split("\n");
buf = lines.pop() ?? "";
for (const line of lines) {
const trimmed = line.trim();
if (trimmed.length === 0) continue;
clearTimeout(responseTimer);
try {
const obj = JSON.parse(trimmed) as unknown;
if (obj !== null && typeof obj === "object") {
const r = obj as Record<string, unknown>;
if (r.ok === false && typeof r.error === "string") {
settle({ ok: false, error: r.error });
return;
}
if (r.ok === true && Array.isArray(r.senses)) {
settle({ ok: true, senses: r.senses as SenseInfo[] });
return;
}
}
} catch {
// fall through
}
settle({ ok: false, error: `Unexpected daemon response: ${trimmed}` });
return;
}
});
socket?.write(`${JSON.stringify({ type: "list-senses" })}\n`);
});
socket.on("error", (err) => {
clearTimeout(connectTimer);
settle(new Error(`Cannot connect to daemon: ${err.message}`));
});
});
} }
+35
View File
@@ -40,6 +40,38 @@ export type LogQuery = {
limit?: number; limit?: number;
}; };
export type ArchiveLogsOptions = {
now?: number;
vacuum?: boolean;
maxDays?: number;
retentionMs?: number;
};
export type ArchiveLogsDayResult = {
day: string;
rowCount: number;
filePath: string;
};
export type ArchiveLogsResult = {
days: ArchiveLogsDayResult[];
vacuumed: boolean;
};
/** One role round row — keep in sync with daemon `log-store` `ThreadRoundRow`. */
export type ThreadRoundRow = {
round: number;
logId: number;
ts: number;
event: { type: string; [key: string]: unknown };
};
/** Keep in sync with daemon `log-store` `GetThreadRoundsParams`. */
export type GetThreadRoundsParams = {
before: number;
limit: number;
};
/** Subset of daemon LogStore used by the CLI workflow commands. */ /** Subset of daemon LogStore used by the CLI workflow commands. */
export type LogStore = { export type LogStore = {
query: (filter?: LogQuery) => LogEntry[]; query: (filter?: LogQuery) => LogEntry[];
@@ -47,5 +79,8 @@ export type LogStore = {
getActiveWorkflowRuns: (workflowName?: string) => WorkflowRun[]; getActiveWorkflowRuns: (workflowName?: string) => WorkflowRun[];
getAllWorkflowRuns: (workflowName: string | null) => WorkflowRun[]; getAllWorkflowRuns: (workflowName: string | null) => WorkflowRun[];
upsertWorkflowRun: (entry: Omit<LogEntry, "id">, run: WorkflowRun) => LogEntry; upsertWorkflowRun: (entry: Omit<LogEntry, "id">, run: WorkflowRun) => LogEntry;
getThreadRoundCount: (runId: string) => number;
getThreadRounds: (runId: string, params: GetThreadRoundsParams) => ThreadRoundRow[];
archiveLogs: (options?: ArchiveLogsOptions) => ArchiveLogsResult;
close: () => void; close: () => void;
}; };
+127
View File
@@ -0,0 +1,127 @@
import { existsSync } from "node:fs";
import { join } from "node:path";
import { DatabaseSync } from "node:sqlite";
/** SQLite path for a sense under the nerve workspace root. */
export function senseDbPath(nerveRoot: string, senseName: string): string {
return join(nerveRoot, "data", "senses", `${senseName}.db`);
}
export function assertSenseDbExists(nerveRoot: string, senseName: string): string {
const path = senseDbPath(nerveRoot, senseName);
if (!existsSync(path)) {
throw new Error(`No database at ${path}`);
}
return path;
}
/** Open a sense SQLite database in readonly mode using node:sqlite. */
export function openSenseDb(nerveRoot: string, senseName: string): DatabaseSync {
const path = assertSenseDbExists(nerveRoot, senseName);
return new DatabaseSync(path, { readOnly: true });
}
/** `SELECT sql FROM sqlite_master WHERE type='table'` (non-null sql only). */
export function listTableSqlStatements(db: DatabaseSync): string[] {
const rows = db
.prepare(
`SELECT sql FROM sqlite_master WHERE type = 'table' AND sql IS NOT NULL ORDER BY tbl_name`,
)
.all() as { sql: string }[];
return rows.map((r) => r.sql);
}
/**
* Table used for `nerve sense query <name>` with no SQL.
* Prefers real data tables over `_migrations`, then lexicographic by name.
*/
export function pickDefaultPreviewTable(db: DatabaseSync): string | null {
const row = db
.prepare(
`SELECT name FROM sqlite_master
WHERE type = 'table' AND sql IS NOT NULL
AND name NOT LIKE 'sqlite\\_%' ESCAPE '\\'
ORDER BY
CASE WHEN name = '_migrations' THEN 1 ELSE 0 END,
name
LIMIT 1`,
)
.get() as { name: string } | undefined;
return row?.name ?? null;
}
export function defaultPreviewSql(table: string): string {
return `SELECT * FROM "${table.replace(/"/g, '""')}" ORDER BY rowid DESC LIMIT 10`;
}
/** Parse sense name and optional SQL from subcommand raw argv (flags stripped). */
export function parseSenseQueryArgs(rawArgs: string[]): { name: string; sql: string | undefined } {
const pos: string[] = [];
for (let i = 0; i < rawArgs.length; i++) {
const a = rawArgs[i];
if (a === "--json" || a === "--no-json") continue;
if (a.startsWith("-")) {
const eq = a.indexOf("=");
if (eq === -1 && i + 1 < rawArgs.length && !rawArgs[i + 1].startsWith("-")) {
i += 1;
}
continue;
}
pos.push(a);
}
if (pos.length < 1) {
throw new Error("Missing sense name");
}
const name = pos[0];
const sql = pos.length > 1 ? pos.slice(1).join(" ") : undefined;
return { name, sql };
}
function stringifyCell(value: unknown): string {
if (value === null || value === undefined) return "";
if (typeof value === "bigint") return value.toString();
if (typeof value === "number" || typeof value === "boolean") return String(value);
if (typeof value === "string") return value;
if (Buffer.isBuffer(value)) return value.toString("hex");
try {
return JSON.stringify(value);
} catch {
return String(value);
}
}
/** Collect column keys in stable order (first row keys, then any extras). */
export function collectColumnKeys(rows: Record<string, unknown>[]): string[] {
const keys: string[] = [];
const seen = new Set<string>();
for (const row of rows) {
for (const k of Object.keys(row)) {
if (!seen.has(k)) {
seen.add(k);
keys.push(k);
}
}
}
return keys;
}
const MAX_CELL = 64;
function truncate(s: string): string {
if (s.length <= MAX_CELL) return s;
return `${s.slice(0, MAX_CELL - 1)}`;
}
/** Plain aligned table for terminal output. */
export function formatRowsAsAlignedTable(rows: Record<string, unknown>[]): string {
if (rows.length === 0) {
return "(0 rows)\n";
}
const cols = collectColumnKeys(rows);
const cells = rows.map((row) => cols.map((c) => truncate(stringifyCell(row[c]))));
const widths = cols.map((c, j) => Math.max(c.length, ...cells.map((r) => r[j].length)));
const sep = widths.map((w) => "-".repeat(w)).join("-+-");
const header = cols.map((c, j) => c.padEnd(widths[j])).join(" | ");
const body = cells.map((r) => r.map((cell, j) => cell.padEnd(widths[j])).join(" | ")).join("\n");
return `${header}\n${sep}\n${body}\n`;
}
+2 -1
View File
@@ -6,5 +6,6 @@
"composite": false, "composite": false,
"types": ["node"] "types": ["node"]
}, },
"include": ["src"] "include": ["src"],
"exclude": ["src/__tests__"]
} }
-13
View File
@@ -1,13 +0,0 @@
import { defineConfig } from "tsup";
export default defineConfig({
entry: ["src/index.ts", "src/cli.ts", "src/daemon-bootstrap.ts"],
format: ["esm"],
dts: true,
clean: true,
banner: {
js: "#!/usr/bin/env node",
},
/** Daemon is loaded from workspace node_modules at runtime — never bundle it. */
external: ["@uncaged/nerve-daemon"],
});
+39
View File
@@ -0,0 +1,39 @@
# @uncaged/nerve-core
Shared types and configuration parser for the [nerve](../../README.md) observation engine.
## What's Inside
- **Type definitions** — `Signal`, `SenseConfig`, `ReflexConfig`, `WorkflowConfig`, `NerveConfig`, and all related types
- **Config parser** — `parseNerveConfig(yaml)` validates and parses `nerve.yaml` into a typed `NerveConfig`
- **Result type** — `Result<T>` with `ok()` / `err()` helpers for explicit error handling (no thrown exceptions)
## Usage
```typescript
import { parseNerveConfig, ok, err } from "@uncaged/nerve-core";
import type { NerveConfig, Signal, Result } from "@uncaged/nerve-core";
const result: Result<NerveConfig> = parseNerveConfig(yamlString);
if (result.ok) {
console.log(result.value.senses);
}
```
## Duration Format
Config fields like `throttle`, `timeout`, and `interval` accept human-readable durations:
- `5s` — 5 seconds
- `10m` — 10 minutes
- `1h` — 1 hour
## Install
```bash
pnpm add @uncaged/nerve-core
```
## License
MIT
+10 -2
View File
@@ -1,17 +1,25 @@
{ {
"name": "@uncaged/nerve-core", "name": "@uncaged/nerve-core",
"version": "0.1.2", "version": "0.2.0",
"type": "module", "type": "module",
"main": "dist/index.js", "main": "dist/index.js",
"files": [
"dist"
],
"publishConfig": {
"access": "public"
},
"types": "dist/index.d.ts", "types": "dist/index.d.ts",
"scripts": { "scripts": {
"build": "tsup", "prepublishOnly": "bash ../../scripts/prepublish-check.sh",
"build": "rslib build",
"test": "vitest run" "test": "vitest run"
}, },
"dependencies": { "dependencies": {
"yaml": "^2.8.3" "yaml": "^2.8.3"
}, },
"devDependencies": { "devDependencies": {
"@rslib/core": "^0.21.3",
"vitest": "^4.1.5" "vitest": "^4.1.5"
} }
} }
+19
View File
@@ -0,0 +1,19 @@
import { defineConfig } from "@rslib/core";
export default defineConfig({
lib: [
{
format: "esm",
dts: true,
},
],
source: {
entry: {
index: "src/index.ts",
},
},
output: {
target: "node",
cleanDistPath: true,
},
});
+1
View File
@@ -1,6 +1,7 @@
export type { export type {
Signal, Signal,
SenseConfig, SenseConfig,
SenseInfo,
SenseReflexConfig, SenseReflexConfig,
WorkflowReflexConfig, WorkflowReflexConfig,
ReflexConfig, ReflexConfig,
+9
View File
@@ -12,6 +12,15 @@ export type SenseConfig = {
gracePeriod: number | null; gracePeriod: number | null;
}; };
/** Runtime metadata for a sense (e.g. daemon list-senses IPC). */
export type SenseInfo = {
name: string;
group: string;
throttle: number | null;
timeout: number | null;
lastSignalTs: number | null;
};
export type SenseReflexConfig = { export type SenseReflexConfig = {
kind: "sense"; kind: "sense";
sense: string; sense: string;
-8
View File
@@ -1,8 +0,0 @@
import { defineConfig } from "tsup";
export default defineConfig({
entry: ["src/index.ts"],
format: ["esm"],
dts: true,
clean: true,
});
+57
View File
@@ -0,0 +1,57 @@
# @uncaged/nerve-daemon
The observation engine runtime for [nerve](../../README.md) — runs senses, routes signals, schedules reflexes, and manages workflows.
## Architecture
| Module | Responsibility |
|--------|---------------|
| **Kernel** | Top-level orchestrator — spawns workers, wires up signal bus, scheduler, and workflow manager. Supports hot reload and graceful shutdown. |
| **Sense Runtime** | Per-sense SQLite database (via `node:sqlite` + Drizzle ORM), migration runner, peer DB read access. |
| **Sense Worker** | Forked child process — one per sense group. Runs compute functions in isolation. |
| **Signal Bus** | In-memory pub/sub. Sense computes emit signals; reflexes and workflows subscribe. |
| **Reflex Scheduler** | Drives compute triggers — interval timers, signal-based events, throttle/coalesce logic. |
| **Workflow Manager** | Concurrency control (drop/queue), thread lifecycle, worker process management (RFC-002). |
| **Log Store** | Structured log storage in WAL-mode SQLite. Supports retention policies, archival to JSONL, and workflow run tracking. |
| **Blob Store** | Binary artifact storage for workflow outputs. |
| **File Watcher** | Watches `nerve.yaml` and sense files for hot reload. |
| **Daemon IPC** | Unix socket server for CLI ↔ daemon communication. |
## Key Design Decisions
- **One worker process per sense group** — isolation between groups, shared compute within a group
- **`node:sqlite` (DatabaseSync)** — zero native addons, WAL mode, built into Node.js ≥ 22.5
- **Throttle + coalesce** — if compute is in-flight, at most one pending trigger is queued (no unbounded accumulation)
- **Log ≠ Signal** — logs are queryable data assets but cannot trigger reflexes (prevents feedback loops)
## Usage
The daemon is typically started via the CLI (`nerve daemon start`), but can be used programmatically:
```typescript
import { createKernel } from "@uncaged/nerve-daemon";
const kernel = await createKernel(nerveRoot);
await kernel.ready;
// Trigger a sense manually
kernel.triggerSense("cpu-usage");
// Check health
const health = kernel.getHealth();
// Graceful shutdown
await kernel.stop();
```
## Install
```bash
pnpm add @uncaged/nerve-daemon
```
Requires Node.js ≥ 22.5 (for `node:sqlite`).
## License
MIT
+6 -5
View File
@@ -1,6 +1,6 @@
{ {
"name": "@uncaged/nerve-daemon", "name": "@uncaged/nerve-daemon",
"version": "0.1.3", "version": "0.2.0",
"type": "module", "type": "module",
"main": "dist/index.js", "main": "dist/index.js",
"types": "dist/index.d.ts", "types": "dist/index.d.ts",
@@ -11,17 +11,18 @@
"access": "public" "access": "public"
}, },
"scripts": { "scripts": {
"build": "tsup", "prepublishOnly": "bash ../../scripts/prepublish-check.sh",
"build": "rslib build",
"test": "vitest run" "test": "vitest run"
}, },
"dependencies": { "dependencies": {
"@uncaged/nerve-core": "workspace:*", "@uncaged/nerve-core": "workspace:*",
"better-sqlite3": "^11.10.0", "drizzle-orm": "1.0.0-beta.23-c10d10c",
"drizzle-orm": "^0.43.1",
"yaml": "^2.8.3" "yaml": "^2.8.3"
}, },
"devDependencies": { "devDependencies": {
"@types/better-sqlite3": "^7.6.13", "@rslib/core": "^0.21.3",
"@types/node": "^22.0.0",
"vitest": "^4.1.5" "vitest": "^4.1.5"
} }
} }
+21
View File
@@ -0,0 +1,21 @@
import { defineConfig } from "@rslib/core";
export default defineConfig({
lib: [
{
format: "esm",
dts: true,
},
],
source: {
entry: {
index: "src/index.ts",
"sense-worker": "src/sense-worker.ts",
"workflow-worker": "src/workflow-worker.ts",
},
},
output: {
target: "node",
cleanDistPath: true,
},
});
@@ -0,0 +1,105 @@
import { createHash } from "node:crypto";
import { existsSync, readdirSync, writeFileSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { describe, expect, it } from "vitest";
import { createBlobStore, normalizeBlobHash } from "../blob-store.js";
function makeRoot(): string {
return join(tmpdir(), `nerve-blob-${Date.now()}-${Math.random().toString(16).slice(2)}`);
}
describe("normalizeBlobHash", () => {
it("accepts 64-char lowercase hex", () => {
const h = "a".repeat(64);
expect(normalizeBlobHash(h)).toBe(h);
});
it("normalizes uppercase to lowercase", () => {
const h = "A".repeat(64);
expect(normalizeBlobHash(h)).toBe("a".repeat(64));
});
it("rejects wrong length and non-hex", () => {
expect(normalizeBlobHash("ab")).toBeNull();
expect(normalizeBlobHash("g".repeat(64))).toBeNull();
});
});
describe("createBlobStore", () => {
it("write returns sha256 hex and stores under 2-char shard", () => {
const root = makeRoot();
const store = createBlobStore(root);
const content = "hello cas";
const hash = store.write(content);
expect(hash).toMatch(/^[0-9a-f]{64}$/);
expect(createHash("sha256").update(content, "utf8").digest("hex")).toBe(hash);
const shard = hash.slice(0, 2);
const rel = hash.slice(2);
const filePath = join(root, shard, rel);
expect(existsSync(filePath)).toBe(true);
});
it("read returns stored bytes and exists is true", () => {
const root = makeRoot();
const store = createBlobStore(root);
const buf = Buffer.from([0, 255, 128]);
const hash = store.write(buf);
expect(store.exists(hash)).toBe(true);
const got = store.read(hash);
expect(got).not.toBeNull();
expect(Buffer.compare(got as Buffer, buf)).toBe(0);
});
it("write is idempotent for same content", () => {
const root = makeRoot();
const store = createBlobStore(root);
const h1 = store.write("same");
const h2 = store.write("same");
expect(h1).toBe(h2);
const shard = h1.slice(0, 2);
const names = readdirSync(join(root, shard));
expect(names.filter((n: string) => !n.startsWith("."))).toHaveLength(1);
});
it("read returns null for missing blob", () => {
const root = makeRoot();
const store = createBlobStore(root);
const missing = "0".repeat(64);
expect(store.read(missing)).toBeNull();
expect(store.exists(missing)).toBe(false);
});
it("read and exists return null/false for invalid hash", () => {
const root = makeRoot();
const store = createBlobStore(root);
expect(store.read("not-a-hash")).toBeNull();
expect(store.exists("not-a-hash")).toBe(false);
});
it("throws when on-disk content does not match path hash", () => {
const root = makeRoot();
const store = createBlobStore(root);
const hash = store.write("ok");
const filePath = join(root, hash.slice(0, 2), hash.slice(2));
writeFileSync(filePath, "tampered");
expect(() => store.read(hash)).toThrow(/CAS mismatch/i);
});
it("write throws when an existing file at the digest path has wrong content", () => {
const root = makeRoot();
const store = createBlobStore(root);
const hash = store.write("truth");
const filePath = join(root, hash.slice(0, 2), hash.slice(2));
writeFileSync(filePath, "lies");
expect(() => store.write("truth")).toThrow(/CAS mismatch/i);
});
});
@@ -89,9 +89,13 @@ function makeLogStore(
} }
return activeRuns; return activeRuns;
}), }),
getTriggerPayload: vi.fn(() => ({ value: 42 })), getTriggerPayload: vi.fn((): unknown => ({ value: 42 })),
getThreadEvents: vi.fn(() => [{ type: "thread_start", triggerPayload: {} }]), getThreadEvents: vi.fn((): Array<{ type: string; [key: string]: unknown }> => [{ type: "thread_start", triggerPayload: {} }]),
getThreadRoundCount: vi.fn(() => 0),
getThreadRounds: vi.fn(() => []),
archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })),
close: vi.fn(), close: vi.fn(),
getAllWorkflowRuns: vi.fn(() => []),
}; };
return store; return store;
} }
@@ -126,7 +130,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
child.emit("exit", 1, null); child.emit("exit", 1, null);
const crashedCalls = logStore.upsertWorkflowRun.mock.calls.filter( const crashedCalls = logStore.upsertWorkflowRun.mock.calls.filter(
([entry]: [{ type: string }]) => entry.type === "crashed", (args: any[]) => (args[0] as { type: string }).type === "crashed",
); );
expect(crashedCalls).toHaveLength(2); expect(crashedCalls).toHaveLength(2);
@@ -215,10 +219,10 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
// resume-thread should have been sent // resume-thread should have been sent
const resumeCalls = (secondChild.send as ReturnType<typeof vi.fn>).mock.calls.filter( const resumeCalls = (secondChild.send as ReturnType<typeof vi.fn>).mock.calls.filter(
([msg]: [unknown]) => (args: any[]) =>
msg !== null && args[0] !== null &&
typeof msg === "object" && typeof args[0] === "object" &&
(msg as Record<string, unknown>).type === "resume-thread", (args[0] as Record<string, unknown>).type === "resume-thread",
); );
expect(resumeCalls).toHaveLength(1); expect(resumeCalls).toHaveLength(1);
expect(resumeCalls[0][0]).toMatchObject({ expect(resumeCalls[0][0]).toMatchObject({
@@ -285,7 +289,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
}); });
const appendCalls = logStore.append.mock.calls.filter( const appendCalls = logStore.append.mock.calls.filter(
([entry]: [{ type: string }]) => entry.type === "thread_command_event", (args: any[]) => (args[0] as { type: string }).type === "thread_command_event",
); );
expect(appendCalls).toHaveLength(1); expect(appendCalls).toHaveLength(1);
expect(appendCalls[0][0]).toMatchObject({ expect(appendCalls[0][0]).toMatchObject({
@@ -312,7 +316,7 @@ describe("WorkflowManager — crash recovery (Phase 3)", () => {
mgr.startWorkflow("my-wf", payload); mgr.startWorkflow("my-wf", payload);
const startedCall = logStore.upsertWorkflowRun.mock.calls.find( const startedCall = logStore.upsertWorkflowRun.mock.calls.find(
([entry]: [{ type: string }]) => entry.type === "started", (args: any[]) => (args[0] as { type: string }).type === "started",
); );
expect(startedCall).toBeDefined(); expect(startedCall).toBeDefined();
const logEntry = startedCall?.[0] as { payload: string | null }; const logEntry = startedCall?.[0] as { payload: string | null };
@@ -77,7 +77,11 @@ function makeLogStore() {
getActiveWorkflowRuns: vi.fn(() => []), getActiveWorkflowRuns: vi.fn(() => []),
getTriggerPayload: vi.fn(() => null), getTriggerPayload: vi.fn(() => null),
getThreadEvents: vi.fn(() => []), getThreadEvents: vi.fn(() => []),
getThreadRoundCount: vi.fn(() => 0),
getThreadRounds: vi.fn(() => []),
archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })),
close: vi.fn(), close: vi.fn(),
getAllWorkflowRuns: vi.fn(() => []),
}; };
} }
@@ -125,7 +129,7 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => {
await drainPromise; await drainPromise;
const interruptedCalls = logStore.upsertWorkflowRun.mock.calls.filter( const interruptedCalls = logStore.upsertWorkflowRun.mock.calls.filter(
([entry]: [{ type: string }]) => entry.type === "interrupted", (args: any[]) => (args[0] as { type: string }).type === "interrupted",
); );
expect(interruptedCalls).toHaveLength(2); expect(interruptedCalls).toHaveLength(2);
@@ -189,10 +193,10 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => {
const newChild = mockChildren[1]; const newChild = mockChildren[1];
const resumeCalls = (newChild.send as ReturnType<typeof vi.fn>).mock.calls.filter( const resumeCalls = (newChild.send as ReturnType<typeof vi.fn>).mock.calls.filter(
([msg]: [unknown]) => (args: any[]) =>
msg !== null && args[0] !== null &&
typeof msg === "object" && typeof args[0] === "object" &&
(msg as Record<string, unknown>).type === "resume-thread", (args[0] as Record<string, unknown>).type === "resume-thread",
); );
expect(resumeCalls).toHaveLength(0); expect(resumeCalls).toHaveLength(0);
@@ -217,10 +221,10 @@ describe("WorkflowManager — drainAndRespawn (Phase 3 hot reload)", () => {
const newChild = mockChildren[1]; const newChild = mockChildren[1];
const startCalls = (newChild.send as ReturnType<typeof vi.fn>).mock.calls.filter( const startCalls = (newChild.send as ReturnType<typeof vi.fn>).mock.calls.filter(
([msg]: [unknown]) => (args: any[]) =>
msg !== null && args[0] !== null &&
typeof msg === "object" && typeof args[0] === "object" &&
(msg as Record<string, unknown>).type === "start-thread", (args[0] as Record<string, unknown>).type === "start-thread",
); );
expect(startCalls).toHaveLength(1); expect(startCalls).toHaveLength(1);
@@ -265,7 +269,7 @@ describe("Kernel — workflow hot reload via file-watcher (Phase 3)", () => {
// Kernel's handleWorkflowFileChange should log a workflow_reload event // Kernel's handleWorkflowFileChange should log a workflow_reload event
// We test this via the kernel itself // We test this via the kernel itself
const appendCalls = logStore.append.mock.calls; const appendCalls = logStore.append.mock.calls;
const startCall = appendCalls.find(([e]: [{ type: string }]) => e.type === "start"); const startCall = appendCalls.find((args: any[]) => (args[0] as { type: string }).type === "start");
expect(startCall).toBeDefined(); expect(startCall).toBeDefined();
const stopPromise = kernel.stop(); const stopPromise = kernel.stop();
@@ -2,7 +2,7 @@
* Unit tests for kernel.triggerSense() — IPC issue #36. * Unit tests for kernel.triggerSense() — IPC issue #36.
* *
* These tests use a mock child_process and a mock LogStore so they do NOT * These tests use a mock child_process and a mock LogStore so they do NOT
* require better-sqlite3 to be present in the test environment. * require a real LogStore (node:sqlite) in integration tests.
*/ */
import { EventEmitter } from "node:events"; import { EventEmitter } from "node:events";
@@ -58,7 +58,7 @@ vi.mock("node:child_process", () => ({
const { createKernel } = await import("../kernel.js"); const { createKernel } = await import("../kernel.js");
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Mock LogStore factory (avoids better-sqlite3 dependency) // Mock LogStore factory (avoids SQLite I/O in this unit test)
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
function makeMockLogStore() { function makeMockLogStore() {
@@ -74,6 +74,9 @@ function makeMockLogStore() {
getAllWorkflowRuns: vi.fn(() => []), getAllWorkflowRuns: vi.fn(() => []),
getTriggerPayload: vi.fn(() => null), getTriggerPayload: vi.fn(() => null),
getThreadEvents: vi.fn(() => []), getThreadEvents: vi.fn(() => []),
getThreadRoundCount: vi.fn(() => 0),
getThreadRounds: vi.fn(() => []),
archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })),
close: vi.fn(), close: vi.fn(),
}; };
} }
@@ -78,8 +78,12 @@ function makeLogStore() {
appendWithWorkflowUpdate: vi.fn(), appendWithWorkflowUpdate: vi.fn(),
getWorkflowRun: vi.fn(() => null), getWorkflowRun: vi.fn(() => null),
getActiveWorkflowRuns: vi.fn(() => []), getActiveWorkflowRuns: vi.fn(() => []),
getAllWorkflowRuns: vi.fn(() => []),
getTriggerPayload: vi.fn(() => null), getTriggerPayload: vi.fn(() => null),
getThreadEvents: vi.fn(() => []), getThreadEvents: vi.fn(() => []),
getThreadRoundCount: vi.fn(() => 0),
getThreadRounds: vi.fn(() => []),
archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })),
close: vi.fn(), close: vi.fn(),
}; };
} }
@@ -136,10 +140,10 @@ describe("kernel + workflowManager integration", () => {
// We need to check that a start-thread message was sent to the workflow worker // We need to check that a start-thread message was sent to the workflow worker
const workflowWorker = mockChildren.find((c) => const workflowWorker = mockChildren.find((c) =>
(c.send as ReturnType<typeof vi.fn>).mock.calls.some( (c.send as ReturnType<typeof vi.fn>).mock.calls.some(
([msg]: [unknown]) => (args: unknown[]) =>
msg !== null && args[0] !== null &&
typeof msg === "object" && typeof args[0] === "object" &&
(msg as Record<string, unknown>).type === "start-thread", (args[0] as Record<string, unknown>).type === "start-thread",
), ),
); );
expect(workflowWorker).toBeDefined(); expect(workflowWorker).toBeDefined();
@@ -211,10 +215,10 @@ describe("kernel + workflowManager integration", () => {
// No workflow worker should have been spawned (only the sense group worker) // No workflow worker should have been spawned (only the sense group worker)
const workflowWorkerSpawned = mockChildren.some((c) => const workflowWorkerSpawned = mockChildren.some((c) =>
(c.send as ReturnType<typeof vi.fn>).mock.calls.some( (c.send as ReturnType<typeof vi.fn>).mock.calls.some(
([msg]: [unknown]) => (args: unknown[]) =>
msg !== null && args[0] !== null &&
typeof msg === "object" && typeof args[0] === "object" &&
(msg as Record<string, unknown>).type === "start-thread", (args[0] as Record<string, unknown>).type === "start-thread",
), ),
); );
expect(workflowWorkerSpawned).toBe(false); expect(workflowWorkerSpawned).toBe(false);
@@ -1,4 +1,7 @@
import { EventEmitter } from "node:events"; import { EventEmitter } from "node:events";
import { mkdtempSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import type { NerveConfig } from "@uncaged/nerve-core"; import type { NerveConfig } from "@uncaged/nerve-core";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
@@ -44,6 +47,7 @@ vi.mock("node:child_process", () => ({
// Import after mock is set up // Import after mock is set up
const { createKernel } = await import("../kernel.js"); const { createKernel } = await import("../kernel.js");
const { createLogStore } = await import("../log-store.js");
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Helpers // Helpers
@@ -93,6 +97,29 @@ describe("kernel — message routing", () => {
await kernel.stop(); await kernel.stop();
}); });
it("persists emitted signals as sense/signal log entries", async () => {
const tmpDir = mkdtempSync(join(tmpdir(), "nerve-kernel-sig-"));
const logStore = createLogStore(join(tmpDir, "logs.db"));
try {
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
});
const kernel = createKernel(config, tmpDir, { logStore });
const child = mockChildren[0];
child.emit("message", { type: "ready" });
child.emit("message", { type: "signal", sense: "cpu-usage", payload: 123 });
const rows = logStore.query({ source: "sense", type: "signal", refId: "cpu-usage" });
expect(rows).toHaveLength(1);
expect(rows[0].payload).toBe(JSON.stringify(123));
await kernel.stop();
} finally {
rmSync(tmpDir, { recursive: true, force: true });
}
});
it("routes error message to stderr", async () => { it("routes error message to stderr", async () => {
const stderrSpy = vi.spyOn(process.stderr, "write").mockImplementation(() => true); const stderrSpy = vi.spyOn(process.stderr, "write").mockImplementation(() => true);
const config = makeConfig({ const config = makeConfig({
@@ -0,0 +1,40 @@
import { describe, expect, it } from "vitest";
import {
assertValidUtcDay,
compareIsoDays,
lastArchivableUtcDay,
nextUtcDay,
prevUtcDay,
utcDateStringFromMs,
utcDayEndExclusiveMs,
utcDayStartMs,
} from "../log-archive.js";
describe("log-archive UTC helpers", () => {
it("lastArchivableUtcDay matches RFC-style boundary (exclusive end of day ≤ boundary)", () => {
const boundary = Date.UTC(2026, 1, 2, 12, 0, 0); // 2026-02-02 12:00 UTC
expect(lastArchivableUtcDay(boundary)).toBe("2026-02-01");
});
it("round-trips UTC day bounds", () => {
expect(utcDayStartMs("2026-02-01")).toBe(Date.UTC(2026, 1, 1));
expect(utcDayEndExclusiveMs("2026-02-01")).toBe(Date.UTC(2026, 1, 2));
expect(utcDateStringFromMs(Date.UTC(2026, 1, 1, 23, 59))).toBe("2026-02-01");
});
it("nextUtcDay / prevUtcDay", () => {
expect(nextUtcDay("2026-02-01")).toBe("2026-02-02");
expect(prevUtcDay("2026-02-01")).toBe("2026-01-31");
});
it("compareIsoDays sorts lexicographically for YYYY-MM-DD", () => {
expect(compareIsoDays("2026-01-01", "2026-02-01")).toBeLessThan(0);
expect(compareIsoDays("2026-02-01", "2026-02-01")).toBe(0);
});
it("assertValidUtcDay rejects invalid calendars", () => {
expect(() => assertValidUtcDay("2026-02-31")).toThrow();
expect(() => assertValidUtcDay("bad")).toThrow();
});
});
@@ -0,0 +1,139 @@
import { mkdtempSync, readFileSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { LOG_ARCHIVE_META_KEY, createLogStore } from "../log-store.js";
import type { LogStore } from "../log-store.js";
const DAY_MS = 86_400_000;
/** `now` such that 2026-02-01 is the last archivable UTC day under a 30-day window. */
function nowForLastArchivableFeb1(): number {
const boundary = Date.UTC(2026, 1, 2, 12, 0, 0);
return boundary + 30 * DAY_MS;
}
describe("LogStore — cold archive (RFC-001 §5.4)", () => {
let tmpDir: string;
let store: LogStore;
beforeEach(() => {
tmpDir = mkdtempSync(join(tmpdir(), "nerve-archive-"));
store = createLogStore(join(tmpDir, "data", "logs.db"));
});
afterEach(() => {
store.close();
rmSync(tmpDir, { recursive: true, force: true });
});
it("exports one UTC day to JSONL, deletes rows, advances archived_up_to", () => {
const ts = Date.UTC(2026, 1, 1, 10, 0, 0);
store.append({ source: "system", type: "x", refId: null, payload: '{"a":1}', ts });
store.append({ source: "reflex", type: "y", refId: "z", payload: null, ts: ts + 1 });
const now = nowForLastArchivableFeb1();
const result = store.archiveLogs({ now, retentionMs: 30 * DAY_MS });
expect(result.days).toHaveLength(1);
expect(result.days[0].day).toBe("2026-02-01");
expect(result.days[0].rowCount).toBe(2);
const jsonlPath = join(tmpDir, "data", "archive", "logs", "2026-02-01.jsonl");
expect(result.days[0].filePath).toBe(jsonlPath);
const lines = readFileSync(jsonlPath, "utf8").trim().split("\n");
expect(lines).toHaveLength(2);
const o = JSON.parse(lines[0] ?? "{}") as Record<string, unknown>;
expect(o.source).toBe("system");
expect(o.refId).toBeNull();
expect(store.query()).toHaveLength(0);
expect(store.getMeta(LOG_ARCHIVE_META_KEY)).toBe("2026-02-01");
});
it("returns nothing for an empty logs table", () => {
const r = store.archiveLogs({ now: nowForLastArchivableFeb1(), retentionMs: 30 * DAY_MS });
expect(r.days).toHaveLength(0);
expect(store.getMeta(LOG_ARCHIVE_META_KEY)).toBeNull();
});
it("does nothing when all logs are inside the hot window", () => {
const now = Date.UTC(2026, 3, 23, 12, 0, 0);
const ts = now - 5 * DAY_MS;
store.append({ source: "system", type: "warm", refId: null, payload: null, ts });
const r = store.archiveLogs({ now, retentionMs: 30 * DAY_MS });
expect(r.days).toHaveLength(0);
expect(store.query()).toHaveLength(1);
});
it("second archive with same clock is a no-op (watermark already caught up)", () => {
const ts = Date.UTC(2026, 1, 1, 10, 0, 0);
store.append({ source: "system", type: "x", refId: null, payload: null, ts });
const now = nowForLastArchivableFeb1();
store.archiveLogs({ now, retentionMs: 30 * DAY_MS });
const path = join(tmpDir, "data", "archive", "logs", "2026-02-01.jsonl");
const first = readFileSync(path, "utf8");
const second = store.archiveLogs({ now, retentionMs: 30 * DAY_MS });
expect(second.days).toHaveLength(0);
expect(readFileSync(path, "utf8")).toBe(first);
});
it("overwrites JSONL when the same UTC day is archived again after watermark rewind", () => {
const ts = Date.UTC(2026, 1, 1, 10, 0, 0);
store.append({ source: "a", type: "1", refId: null, payload: null, ts });
const now = nowForLastArchivableFeb1();
store.archiveLogs({ now, retentionMs: 30 * DAY_MS });
store.setMeta(LOG_ARCHIVE_META_KEY, "2026-01-31");
store.append({ source: "b", type: "2", refId: null, payload: null, ts: ts + 100 });
store.archiveLogs({ now, retentionMs: 30 * DAY_MS });
const path = join(tmpDir, "data", "archive", "logs", "2026-02-01.jsonl");
const lines = readFileSync(path, "utf8").trim().split("\n");
expect(lines).toHaveLength(1);
expect(JSON.parse(lines[0] ?? "{}").source).toBe("b");
});
it("respects maxDays across invocations", () => {
const t1 = Date.UTC(2026, 1, 1, 10, 0, 0);
const t2 = Date.UTC(2026, 1, 2, 10, 0, 0);
store.append({ source: "system", type: "a", refId: null, payload: null, ts: t1 });
store.append({ source: "system", type: "b", refId: null, payload: null, ts: t2 });
const now = Date.UTC(2027, 0, 1, 12, 0, 0);
const r1 = store.archiveLogs({ now, retentionMs: 30 * DAY_MS, maxDays: 1 });
expect(r1.days).toHaveLength(1);
expect(r1.days[0].day).toBe("2026-02-01");
const r2 = store.archiveLogs({ now, retentionMs: 30 * DAY_MS, maxDays: 1 });
expect(r2.days).toHaveLength(1);
expect(r2.days[0].day).toBe("2026-02-02");
expect(store.getMeta(LOG_ARCHIVE_META_KEY)).toBe("2026-02-02");
expect(store.query()).toHaveLength(0);
});
it("starts from earliest log day when it is before watermark+1", () => {
store.setMeta(LOG_ARCHIVE_META_KEY, "2026-01-10");
const ts = Date.UTC(2026, 1, 1, 10, 0, 0);
store.append({ source: "x", type: "p", refId: null, payload: null, ts });
const result = store.archiveLogs({ now: nowForLastArchivableFeb1(), retentionMs: 30 * DAY_MS });
expect(result.days.map((d) => d.day)).toContain("2026-02-01");
});
it("throws on invalid archived_up_to watermark", () => {
store.setMeta(LOG_ARCHIVE_META_KEY, "not-a-date");
expect(() => store.archiveLogs({ now: Date.now() })).toThrow();
});
it("runs VACUUM when vacuum: true", () => {
const ts = Date.UTC(2026, 1, 1, 10, 0, 0);
store.append({ source: "system", type: "x", refId: null, payload: null, ts });
const r = store.archiveLogs({
now: nowForLastArchivableFeb1(),
retentionMs: 30 * DAY_MS,
vacuum: true,
});
expect(r.vacuumed).toBe(true);
});
});
@@ -195,4 +195,65 @@ describe("LogStore — crash recovery helpers (Phase 3)", () => {
expect(result8[0].type).toBe("event_for_8"); expect(result8[0].type).toBe("event_for_8");
}); });
}); });
describe("getThreadRoundCount / getThreadRounds", () => {
it("excludes thread_start from rounds and assigns ROW_NUMBER in chronological order", () => {
store.append({
source: "workflow",
type: "thread_command_event",
refId: "run-tr",
payload: JSON.stringify({ type: "thread_start", triggerPayload: { x: 1 } }),
ts: 100,
});
store.append({
source: "workflow",
type: "thread_command_event",
refId: "run-tr",
payload: JSON.stringify({
type: "step_a",
role: "alpha",
content: "hello",
meta: 1,
}),
ts: 101,
});
store.append({
source: "workflow",
type: "thread_command_event",
refId: "run-tr",
payload: JSON.stringify({ type: "step_b", role: "beta", content: "world" }),
ts: 102,
});
expect(store.getThreadRoundCount("run-tr")).toBe(2);
const all = store.getThreadRounds("run-tr", { before: 0, limit: 50 });
expect(all).toHaveLength(2);
expect(all.map((r) => r.round)).toEqual([2, 1]);
expect(all[0].event.type).toBe("step_b");
expect(all[1].event.type).toBe("step_a");
});
it("getThreadRounds respects exclusive before bound", () => {
for (let i = 0; i < 3; i++) {
store.append({
source: "workflow",
type: "thread_command_event",
refId: "run-b4",
payload: JSON.stringify({ type: `ev_${i}`, role: "r", content: String(i) }),
ts: 200 + i,
});
}
expect(store.getThreadRoundCount("run-b4")).toBe(3);
const page = store.getThreadRounds("run-b4", { before: 3, limit: 50 });
expect(page.map((r) => r.round)).toEqual([2, 1]);
});
it("returns empty when no role rounds for runId", () => {
expect(store.getThreadRoundCount("missing")).toBe(0);
expect(store.getThreadRounds("missing", { before: 0, limit: 10 })).toHaveLength(0);
});
});
}); });
@@ -2,14 +2,15 @@ import { mkdirSync, mkdtempSync, writeFileSync } from "node:fs";
import { tmpdir } from "node:os"; import { tmpdir } from "node:os";
import { join } from "node:path"; import { join } from "node:path";
import Database from "better-sqlite3"; import { DatabaseSync } from "node:sqlite";
import { drizzle } from "drizzle-orm/better-sqlite3"; import { drizzle } from "drizzle-orm/node-sqlite";
import { integer, real, sqliteTable } from "drizzle-orm/sqlite-core"; import { integer, real, sqliteTable } from "drizzle-orm/sqlite-core";
import { describe, expect, it } from "vitest"; import { describe, expect, it } from "vitest";
import { createBlobStore } from "../blob-store.js";
import { parseParentMessage } from "../ipc.js"; import { parseParentMessage } from "../ipc.js";
import { executeCompute, openPeerDb, openSenseDb, runMigrations } from "../sense-runtime.js"; import { executeCompute, openPeerDb, openSenseDb, runMigrations } from "../sense-runtime.js";
import type { DrizzleDB, PeerMap, SenseRuntime } from "../sense-runtime.js"; import type { ComputeFn, DrizzleDB, PeerMap, SenseRuntime } from "../sense-runtime.js";
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Helpers // Helpers
@@ -48,7 +49,7 @@ const samples = sqliteTable("samples", {
describe("runMigrations", () => { describe("runMigrations", () => {
it("creates table via SQL migration file", () => { it("creates table via SQL migration file", () => {
const sqlite = new Database(":memory:"); const sqlite = new DatabaseSync(":memory:");
const migrationsDir = makeTempMigrationsDir(INIT_SQL); const migrationsDir = makeTempMigrationsDir(INIT_SQL);
const result = runMigrations(sqlite, migrationsDir); const result = runMigrations(sqlite, migrationsDir);
@@ -63,7 +64,7 @@ describe("runMigrations", () => {
}); });
it("runs multiple migrations in lexicographic order", () => { it("runs multiple migrations in lexicographic order", () => {
const sqlite = new Database(":memory:"); const sqlite = new DatabaseSync(":memory:");
const dir = mkdtempSync(join(tmpdir(), "nerve-multi-")); const dir = mkdtempSync(join(tmpdir(), "nerve-multi-"));
writeFileSync(join(dir, "0001_init.sql"), INIT_SQL); writeFileSync(join(dir, "0001_init.sql"), INIT_SQL);
@@ -80,7 +81,7 @@ describe("runMigrations", () => {
}); });
it("returns ok when migrations directory is empty", () => { it("returns ok when migrations directory is empty", () => {
const sqlite = new Database(":memory:"); const sqlite = new DatabaseSync(":memory:");
const dir = makeTempMigrationsDirEmpty(); const dir = makeTempMigrationsDirEmpty();
const result = runMigrations(sqlite, dir); const result = runMigrations(sqlite, dir);
expect(result.ok).toBe(true); expect(result.ok).toBe(true);
@@ -88,14 +89,14 @@ describe("runMigrations", () => {
}); });
it("returns err when migrations directory does not exist", () => { it("returns err when migrations directory does not exist", () => {
const sqlite = new Database(":memory:"); const sqlite = new DatabaseSync(":memory:");
const result = runMigrations(sqlite, "/nonexistent/path/migrations"); const result = runMigrations(sqlite, "/nonexistent/path/migrations");
expect(result.ok).toBe(false); expect(result.ok).toBe(false);
sqlite.close(); sqlite.close();
}); });
it("returns err when a migration SQL is invalid", () => { it("returns err when a migration SQL is invalid", () => {
const sqlite = new Database(":memory:"); const sqlite = new DatabaseSync(":memory:");
const dir = mkdtempSync(join(tmpdir(), "nerve-bad-sql-")); const dir = mkdtempSync(join(tmpdir(), "nerve-bad-sql-"));
writeFileSync(join(dir, "0001_bad.sql"), "NOT VALID SQL !!!;"); writeFileSync(join(dir, "0001_bad.sql"), "NOT VALID SQL !!!;");
const result = runMigrations(sqlite, dir); const result = runMigrations(sqlite, dir);
@@ -140,7 +141,7 @@ describe("openPeerDb", () => {
it("opens an existing db in read-only mode", () => { it("opens an existing db in read-only mode", () => {
// Create a writable db first // Create a writable db first
const dbPath = makeTempDbPath(); const dbPath = makeTempDbPath();
const sqlite = new Database(dbPath); const sqlite = new DatabaseSync(dbPath);
sqlite.exec(INIT_SQL); sqlite.exec(INIT_SQL);
sqlite.prepare("INSERT INTO samples (ts, value) VALUES (1, 42.0)").run(); sqlite.prepare("INSERT INTO samples (ts, value) VALUES (1, 42.0)").run();
sqlite.close(); sqlite.close();
@@ -167,13 +168,13 @@ describe("openPeerDb", () => {
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
describe("executeCompute", () => { describe("executeCompute", () => {
function makeRuntime(computeFn: (db: DrizzleDB, peers: PeerMap) => Promise<unknown | null>): { function makeRuntime(computeFn: ComputeFn): {
runtime: SenseRuntime; runtime: SenseRuntime;
sqlite: Database.Database; sqlite: DatabaseSync;
} { } {
const sqlite = new Database(":memory:"); const sqlite = new DatabaseSync(":memory:");
sqlite.exec(INIT_SQL); sqlite.exec(INIT_SQL);
const db = drizzle(sqlite) as DrizzleDB; const db = drizzle({ client: sqlite }) as DrizzleDB;
return { return {
runtime: { name: "test-sense", db, compute: computeFn }, runtime: { name: "test-sense", db, compute: computeFn },
sqlite, sqlite,
@@ -225,10 +226,10 @@ describe("executeCompute", () => {
it("compute can read from peers", async () => { it("compute can read from peers", async () => {
// Set up a peer db with data // Set up a peer db with data
const peerSqlite = new Database(":memory:"); const peerSqlite = new DatabaseSync(":memory:");
peerSqlite.exec(INIT_SQL); peerSqlite.exec(INIT_SQL);
peerSqlite.prepare("INSERT INTO samples (ts, value) VALUES (100, 3.14)").run(); peerSqlite.prepare("INSERT INTO samples (ts, value) VALUES (100, 3.14)").run();
const peerDb = drizzle(peerSqlite) as DrizzleDB; const peerDb = drizzle({ client: peerSqlite }) as DrizzleDB;
const peers: PeerMap = { "other-sense": peerDb }; const peers: PeerMap = { "other-sense": peerDb };
@@ -247,9 +248,9 @@ describe("executeCompute", () => {
}); });
it("write to own db does not affect peer db (isolation)", async () => { it("write to own db does not affect peer db (isolation)", async () => {
const peerSqlite = new Database(":memory:"); const peerSqlite = new DatabaseSync(":memory:");
peerSqlite.exec(INIT_SQL); peerSqlite.exec(INIT_SQL);
const peerDb = drizzle(peerSqlite) as DrizzleDB; const peerDb = drizzle({ client: peerSqlite }) as DrizzleDB;
const peers: PeerMap = { "peer-sense": peerDb }; const peers: PeerMap = { "peer-sense": peerDb };
const { runtime, sqlite } = makeRuntime(async (db) => { const { runtime, sqlite } = makeRuntime(async (db) => {
@@ -340,6 +341,20 @@ describe("executeCompute", () => {
expect(capturedSignal).toBeInstanceOf(AbortSignal); expect(capturedSignal).toBeInstanceOf(AbortSignal);
sqlite.close(); sqlite.close();
}); });
it("passes BlobStore as options.blobs when blobStore argument is provided", async () => {
const blobsRoot = mkdtempSync(join(tmpdir(), "nerve-blobs-"));
const blobStore = createBlobStore(blobsRoot);
let seen: ReturnType<typeof createBlobStore> | undefined;
const { runtime, sqlite } = makeRuntime(async (_db, _peers, options) => {
seen = options?.blobs;
return null;
});
await executeCompute(runtime, emptyPeers, undefined, blobStore);
expect(seen).toBe(blobStore);
sqlite.close();
});
}); });
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -388,7 +403,7 @@ describe("parseParentMessage", () => {
describe("runMigrations journal", () => { describe("runMigrations journal", () => {
it("does not re-run an already-applied migration", () => { it("does not re-run an already-applied migration", () => {
const sqlite = new Database(":memory:"); const sqlite = new DatabaseSync(":memory:");
const dir = mkdtempSync(join(tmpdir(), "nerve-journal-")); const dir = mkdtempSync(join(tmpdir(), "nerve-journal-"));
writeFileSync(join(dir, "0001_init.sql"), INIT_SQL); writeFileSync(join(dir, "0001_init.sql"), INIT_SQL);
@@ -415,7 +430,7 @@ describe("runMigrations journal", () => {
}); });
it("tracks migrations in _migrations table", () => { it("tracks migrations in _migrations table", () => {
const sqlite = new Database(":memory:"); const sqlite = new DatabaseSync(":memory:");
const dir = mkdtempSync(join(tmpdir(), "nerve-journal2-")); const dir = mkdtempSync(join(tmpdir(), "nerve-journal2-"));
writeFileSync(join(dir, "0001_init.sql"), INIT_SQL); writeFileSync(join(dir, "0001_init.sql"), INIT_SQL);
@@ -74,7 +74,11 @@ function makeLogStore() {
getActiveWorkflowRuns: vi.fn(() => []), getActiveWorkflowRuns: vi.fn(() => []),
getTriggerPayload: vi.fn(() => null), getTriggerPayload: vi.fn(() => null),
getThreadEvents: vi.fn(() => []), getThreadEvents: vi.fn(() => []),
getThreadRoundCount: vi.fn(() => 0),
getThreadRounds: vi.fn(() => []),
archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })),
close: vi.fn(), close: vi.fn(),
getAllWorkflowRuns: vi.fn(() => []),
}; };
} }
+106
View File
@@ -0,0 +1,106 @@
/**
* CAS blob store — sha256 content-addressable files under `data/blobs/`.
*
* Layout: `<root>/<2-hex-shard>/<62-hex-rest>` (RFC-001 §8).
*/
import { createHash, randomBytes } from "node:crypto";
import {
existsSync,
mkdirSync,
readFileSync,
renameSync,
unlinkSync,
writeFileSync,
} from "node:fs";
import { dirname, join } from "node:path";
const SHA256_HEX_LEN = 64;
const HEX_RE = /^[0-9a-f]+$/;
export type BlobStore = {
/** Persist UTF-8 or raw bytes; returns lowercase hex sha256. Idempotent for identical content. */
write: (content: string | Uint8Array | Buffer) => string;
/** Returns bytes or null if the hash is invalid or no blob exists. Verifies digest matches path. */
read: (hash: string) => Buffer | null;
/** True when hash is well-formed and the blob file is present. */
exists: (hash: string) => boolean;
};
function toBuffer(content: string | Uint8Array | Buffer): Buffer {
if (typeof content === "string") return Buffer.from(content, "utf8");
if (Buffer.isBuffer(content)) return content;
return Buffer.from(content);
}
function digestHex(buf: Buffer): string {
return createHash("sha256").update(buf).digest("hex");
}
/** @returns normalized lowercase hex or null if not a valid sha256 hex string */
export function normalizeBlobHash(hash: string): string | null {
const h = hash.trim().toLowerCase();
if (h.length !== SHA256_HEX_LEN) return null;
if (!HEX_RE.test(h)) return null;
return h;
}
function pathForHash(blobsRoot: string, hashLower: string): string {
return join(blobsRoot, hashLower.slice(0, 2), hashLower.slice(2));
}
function verifyPathMatchesContent(filePath: string, expectedHash: string): Buffer {
const data = readFileSync(filePath);
const actual = digestHex(data);
if (actual !== expectedHash) {
throw new Error(
`Blob CAS mismatch at "${filePath}": file digests to ${actual}, path expects ${expectedHash}`,
);
}
return data;
}
export function createBlobStore(blobsRoot: string): BlobStore {
function write(content: string | Uint8Array | Buffer): string {
const buf = toBuffer(content);
const hash = digestHex(buf);
const filePath = pathForHash(blobsRoot, hash);
if (existsSync(filePath)) {
verifyPathMatchesContent(filePath, hash);
return hash;
}
mkdirSync(dirname(filePath), { recursive: true });
const tmp = join(dirname(filePath), `.tmp.${randomBytes(16).toString("hex")}`);
try {
writeFileSync(tmp, buf);
renameSync(tmp, filePath);
} catch (e) {
try {
unlinkSync(tmp);
} catch {
// ignore cleanup errors
}
throw e;
}
return hash;
}
function read(hash: string): Buffer | null {
const h = normalizeBlobHash(hash);
if (h === null) return null;
const filePath = pathForHash(blobsRoot, h);
if (!existsSync(filePath)) return null;
return verifyPathMatchesContent(filePath, h);
}
function exists(hash: string): boolean {
const h = normalizeBlobHash(hash);
if (h === null) return false;
return existsSync(pathForHash(blobsRoot, h));
}
return { write, read, exists };
}
+4 -9
View File
@@ -13,8 +13,12 @@
import { rmSync } from "node:fs"; import { rmSync } from "node:fs";
import { type Server, type Socket, createServer } from "node:net"; import { type Server, type Socket, createServer } from "node:net";
import type { SenseInfo } from "@uncaged/nerve-core";
import type { WorkflowManager } from "./workflow-manager.js"; import type { WorkflowManager } from "./workflow-manager.js";
export type { SenseInfo };
/** JSON message sent by the CLI to trigger a workflow. */ /** JSON message sent by the CLI to trigger a workflow. */
export type TriggerWorkflowRequest = { export type TriggerWorkflowRequest = {
type: "trigger-workflow"; type: "trigger-workflow";
@@ -33,15 +37,6 @@ export type ListSensesRequest = {
type: "list-senses"; type: "list-senses";
}; };
/** Runtime info about a single sense returned by list-senses. */
export type SenseInfo = {
name: string;
group: string;
throttle: number | null;
timeout: number | null;
lastSignalTs: number | null;
};
type DaemonRequest = TriggerWorkflowRequest | TriggerSenseRequest | ListSensesRequest; type DaemonRequest = TriggerWorkflowRequest | TriggerSenseRequest | ListSensesRequest;
type DaemonResponse = type DaemonResponse =
+11 -1
View File
@@ -29,16 +29,26 @@ export {
export { createKernel } from "./kernel.js"; export { createKernel } from "./kernel.js";
export type { Kernel, KernelOptions, KernelHealth } from "./kernel.js"; export type { Kernel, KernelOptions, KernelHealth } from "./kernel.js";
export type { SenseInfo } from "./daemon-ipc.js";
export { createFileWatcher } from "./file-watcher.js"; export { createFileWatcher } from "./file-watcher.js";
export type { FileWatcher, FileChange, FileChangeHandler } from "./file-watcher.js"; export type { FileWatcher, FileChange, FileChangeHandler } from "./file-watcher.js";
export { createLogStore } from "./log-store.js"; export { createBlobStore, normalizeBlobHash } from "./blob-store.js";
export type { BlobStore } from "./blob-store.js";
export { createLogStore, LOG_ARCHIVE_META_KEY } from "./log-store.js";
export type { export type {
LogStore, LogStore,
LogEntry, LogEntry,
LogQuery, LogQuery,
WorkflowRun, WorkflowRun,
WorkflowRunStatus, WorkflowRunStatus,
ArchiveLogsDayResult,
ArchiveLogsOptions,
ArchiveLogsResult,
ThreadRoundRow,
GetThreadRoundsParams,
} from "./log-store.js"; } from "./log-store.js";
export { createWorkflowManager } from "./workflow-manager.js"; export { createWorkflowManager } from "./workflow-manager.js";
+29 -12
View File
@@ -18,11 +18,11 @@ import { readFileSync } from "node:fs";
import { dirname, join } from "node:path"; import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url"; import { fileURLToPath } from "node:url";
import type { NerveConfig, Signal } from "@uncaged/nerve-core"; import type { NerveConfig, SenseInfo, Signal } from "@uncaged/nerve-core";
import { parseNerveConfig } from "@uncaged/nerve-core"; import { parseNerveConfig } from "@uncaged/nerve-core";
import { createDaemonIpcServer } from "./daemon-ipc.js"; import { createDaemonIpcServer } from "./daemon-ipc.js";
import type { DaemonIpcServer, SenseInfo } from "./daemon-ipc.js"; import type { DaemonIpcServer } from "./daemon-ipc.js";
import { createFileWatcher } from "./file-watcher.js"; import { createFileWatcher } from "./file-watcher.js";
import type { FileWatcher } from "./file-watcher.js"; import type { FileWatcher } from "./file-watcher.js";
import type { ComputeMessage, ShutdownMessage } from "./ipc.js"; import type { ComputeMessage, ShutdownMessage } from "./ipc.js";
@@ -33,6 +33,11 @@ import { createReflexScheduler } from "./reflex-scheduler.js";
import type { ReflexScheduler } from "./reflex-scheduler.js"; import type { ReflexScheduler } from "./reflex-scheduler.js";
import { createSignalBus } from "./signal-bus.js"; import { createSignalBus } from "./signal-bus.js";
import type { SignalBus } from "./signal-bus.js"; import type { SignalBus } from "./signal-bus.js";
import {
formatCapturedStderrTail,
formatChildExitSummary,
teeCapturedStderr,
} from "./worker-fork-support.js";
import { createWorkflowManager } from "./workflow-manager.js"; import { createWorkflowManager } from "./workflow-manager.js";
import type { WorkflowManager } from "./workflow-manager.js"; import type { WorkflowManager } from "./workflow-manager.js";
@@ -84,12 +89,22 @@ function resolveWorkerScript(): string {
return join(__dir, "sense-worker.js"); return join(__dir, "sense-worker.js");
} }
function spawnWorker(nerveRoot: string, group: string, workerScript: string): ChildProcess { function spawnWorker(
nerveRoot: string,
group: string,
workerScript: string,
stderrTail: { value: string },
): ChildProcess {
const child = fork(workerScript, ["--group", group, "--root", nerveRoot], { const child = fork(workerScript, ["--group", group, "--root", nerveRoot], {
stdio: ["ignore", "inherit", "inherit", "ipc"], stdio: ["ignore", "inherit", "pipe", "ipc"],
}); });
teeCapturedStderr(child, stderrTail);
// Prevent unhandled EPIPE when writing to a child whose IPC channel closed // Prevent unhandled EPIPE when writing to a child whose IPC channel closed
child.on("error", () => {}); child.on("error", (err) => {
if ((err as NodeJS.ErrnoException).code !== "EPIPE") {
console.error("[worker] error:", err.message);
}
});
return child; return child;
} }
@@ -222,8 +237,8 @@ export function createKernel(
ts: Date.now(), ts: Date.now(),
}; };
logStore.append({ logStore.append({
source: "reflex", source: "sense",
type: "run_complete", type: "signal",
refId: msg.sense, refId: msg.sense,
payload: JSON.stringify(msg.payload), payload: JSON.stringify(msg.payload),
ts: signal.ts, ts: signal.ts,
@@ -236,7 +251,8 @@ export function createKernel(
} }
function startWorker(group: string): Promise<void> { function startWorker(group: string): Promise<void> {
const child = spawnWorker(nerveRoot, group, workerScript); const stderrTail = { value: "" };
const child = spawnWorker(nerveRoot, group, workerScript, stderrTail);
let workerReadyResolve: (() => void) | undefined; let workerReadyResolve: (() => void) | undefined;
const workerReady = new Promise<void>((resolve) => { const workerReady = new Promise<void>((resolve) => {
@@ -251,9 +267,10 @@ export function createKernel(
handleWorkerMessage(raw); handleWorkerMessage(raw);
}); });
child.on("exit", (code) => { child.on("exit", (code, signal) => {
const summary = formatChildExitSummary(code, signal ?? null);
process.stderr.write( process.stderr.write(
`[kernel] worker for group "${group}" exited with code ${code ?? "null"}\n`, `[kernel] worker for group "${group}" exited (${summary})${formatCapturedStderrTail(stderrTail.value)}\n`,
); );
// Resolve ready in case the worker exits before sending ready (prevents hangs) // Resolve ready in case the worker exits before sending ready (prevents hangs)
workerReadyResolve?.(); workerReadyResolve?.();
@@ -524,8 +541,8 @@ export function createKernel(
listSenses(): SenseInfo[] { listSenses(): SenseInfo[] {
return Object.entries(config.senses).map(([name, senseConfig]) => { return Object.entries(config.senses).map(([name, senseConfig]) => {
const entries = logStore.query({ const entries = logStore.query({
source: "reflex", source: "sense",
type: "run_complete", type: "signal",
refId: name, refId: name,
}); });
const lastEntry = entries.length > 0 ? entries[entries.length - 1] : null; const lastEntry = entries.length > 0 ? entries[entries.length - 1] : null;
+78
View File
@@ -0,0 +1,78 @@
/** Log cold-archive helpers (RFC-001 §5.4) — UTC calendar days, JSONL export. */
export const LOG_ARCHIVE_META_KEY = "archived_up_to";
export const DEFAULT_LOG_RETENTION_MS = 30 * 86_400_000;
export type ArchiveLogsOptions = {
/** Wall clock for retention boundary (default: `Date.now()`). */
now?: number;
/** Run `VACUUM` after archiving (outside the per-day transaction). */
vacuum?: boolean;
/** Max UTC days to process in one call (default: unlimited). */
maxDays?: number;
/** Override default 30-day retention (tests). */
retentionMs?: number;
};
export type ArchiveLogsDayResult = {
day: string;
rowCount: number;
filePath: string;
};
export type ArchiveLogsResult = {
days: ArchiveLogsDayResult[];
vacuumed: boolean;
};
export function utcDateStringFromMs(ms: number): string {
return new Date(ms).toISOString().slice(0, 10);
}
function parseUtcDayParts(day: string): [number, number, number] {
const m = /^(\d{4})-(\d{2})-(\d{2})$/.exec(day);
if (m === null) {
throw new Error(`Invalid UTC day (expected YYYY-MM-DD): ${day}`);
}
const y = Number(m[1]);
const mo = Number(m[2]);
const d = Number(m[3]);
const t = Date.UTC(y, mo - 1, d);
if (utcDateStringFromMs(t) !== day) {
throw new Error(`Invalid UTC calendar day: ${day}`);
}
return [y, mo, d];
}
export function assertValidUtcDay(day: string): void {
parseUtcDayParts(day);
}
export function utcDayStartMs(day: string): number {
const [y, mo, d] = parseUtcDayParts(day);
return Date.UTC(y, mo - 1, d);
}
export function utcDayEndExclusiveMs(day: string): number {
return utcDayStartMs(day) + 86_400_000;
}
export function prevUtcDay(day: string): string {
return utcDateStringFromMs(utcDayStartMs(day) - 86_400_000);
}
export function nextUtcDay(day: string): string {
return utcDateStringFromMs(utcDayEndExclusiveMs(day));
}
/** Last UTC calendar day D such that the exclusive end of D is ≤ boundaryMs. */
export function lastArchivableUtcDay(boundaryMs: number): string {
return prevUtcDay(utcDateStringFromMs(boundaryMs));
}
export function compareIsoDays(a: string, b: string): number {
if (a < b) return -1;
if (a > b) return 1;
return 0;
}
+275 -13
View File
@@ -7,10 +7,25 @@
* Also provides a `meta` key-value table for bookkeeping (e.g. archive watermarks). * Also provides a `meta` key-value table for bookkeeping (e.g. archive watermarks).
*/ */
import { mkdirSync } from "node:fs"; import { mkdirSync, writeFileSync } from "node:fs";
import { dirname } from "node:path"; import { dirname, join } from "node:path";
import Database from "better-sqlite3"; import { DatabaseSync, type StatementSync } from "node:sqlite";
import type BetterSqlite3 from "better-sqlite3";
import {
DEFAULT_LOG_RETENTION_MS,
LOG_ARCHIVE_META_KEY,
assertValidUtcDay,
compareIsoDays,
lastArchivableUtcDay,
nextUtcDay,
utcDateStringFromMs,
utcDayEndExclusiveMs,
utcDayStartMs,
} from "./log-archive.js";
import type { ArchiveLogsDayResult, ArchiveLogsOptions, ArchiveLogsResult } from "./log-archive.js";
export { LOG_ARCHIVE_META_KEY } from "./log-archive.js";
export type { ArchiveLogsDayResult, ArchiveLogsOptions, ArchiveLogsResult } from "./log-archive.js";
export type LogEntry = { export type LogEntry = {
id?: number; id?: number;
@@ -68,6 +83,25 @@ export type WorkflowRun = {
ts: number; ts: number;
}; };
/** One role-produced command-event row with 1-based round index (ROW_NUMBER over role events only). */
export type ThreadRoundRow = {
round: number;
logId: number;
ts: number;
event: { type: string; [key: string]: unknown };
};
/** Parameters for {@link LogStore.getThreadRounds}. */
export type GetThreadRoundsParams = {
/**
* Exclusive upper bound on round index (1-based among role events).
* Use `0` to include all rounds (subject to `limit`).
*/
before: number;
/** Maximum rows returned from the DB (DESC by round). */
limit: number;
};
export type LogStore = { export type LogStore = {
append: (entry: Omit<LogEntry, "id">) => LogEntry; append: (entry: Omit<LogEntry, "id">) => LogEntry;
query: (filter?: LogQuery) => LogEntry[]; query: (filter?: LogQuery) => LogEntry[];
@@ -105,6 +139,23 @@ export type LogStore = {
* Used for crash recovery to rebuild ThreadState. * Used for crash recovery to rebuild ThreadState.
*/ */
getThreadEvents: (runId: string) => Array<{ type: string; [key: string]: unknown }>; getThreadEvents: (runId: string) => Array<{ type: string; [key: string]: unknown }>;
/**
* Count role command events for a run (excludes `thread_start` and invalid payloads).
* Round indices for {@link getThreadRounds} are 1..count in chronological order.
*/
getThreadRoundCount: (runId: string) => number;
/**
* Role rounds for agent-oriented retrieval: each row is one `thread_command_event`
* whose JSON `type` is not `thread_start`, with `round` from ROW_NUMBER() OVER (ORDER BY id ASC).
* No schema migration — numbering is computed in SQL.
*/
getThreadRounds: (runId: string, params: GetThreadRoundsParams) => ThreadRoundRow[];
/**
* Export logs older than the retention window to `data/archive/logs/YYYY-MM-DD.jsonl`,
* then delete those rows and advance `meta.archived_up_to` in one transaction per day
* (RFC-001 §5.4).
*/
archiveLogs: (options?: ArchiveLogsOptions) => ArchiveLogsResult;
close: () => void; close: () => void;
}; };
@@ -138,11 +189,99 @@ CREATE INDEX IF NOT EXISTS idx_workflow_runs_status ON workflow_runs(status);
CREATE INDEX IF NOT EXISTS idx_workflow_runs_workflow ON workflow_runs(workflow); CREATE INDEX IF NOT EXISTS idx_workflow_runs_workflow ON workflow_runs(workflow);
`; `;
type SqlLogRow = {
id: number;
source: string;
type: string;
ref_id: string | null;
payload: string | null;
ts: number;
};
function buildJsonlBody(rows: SqlLogRow[]): string {
if (rows.length === 0) return "";
const lines = rows.map((r) =>
JSON.stringify({
id: r.id,
source: r.source,
type: r.type,
refId: r.ref_id,
payload: r.payload,
ts: r.ts,
}),
);
return `${lines.join("\n")}\n`;
}
function runInTransaction<T>(db: DatabaseSync, fn: () => T): T {
db.exec("BEGIN IMMEDIATE");
try {
const out = fn();
db.exec("COMMIT");
return out;
} catch (e) {
try {
db.exec("ROLLBACK");
} catch {
// ignore rollback errors
}
throw e;
}
}
function runOptionalVacuum(sqlite: DatabaseSync, vacuum?: boolean): boolean {
if (vacuum !== true) return false;
sqlite.exec("VACUUM");
return true;
}
function resolveArchiveStartDay(watermark: string | null, minDay: string): string {
if (watermark === null) return minDay;
const afterWatermark = nextUtcDay(watermark);
return compareIsoDays(minDay, afterWatermark) < 0 ? minDay : afterWatermark;
}
function runArchiveDayLoop(
dbPath: string,
options: ArchiveLogsOptions,
selectLogsForDayStmt: StatementSync,
archiveDayTx: (day: string, start: number, endExclusive: number) => void,
startDay: string,
lastDay: string,
): ArchiveLogsDayResult[] {
const archiveDir = join(dirname(dbPath), "archive", "logs");
mkdirSync(archiveDir, { recursive: true });
const days: ArchiveLogsDayResult[] = [];
let d = startDay;
let processed = 0;
while (compareIsoDays(d, lastDay) <= 0) {
if (options.maxDays !== undefined && processed >= options.maxDays) {
break;
}
const start = utcDayStartMs(d);
const endExclusive = utcDayEndExclusiveMs(d);
const rows = selectLogsForDayStmt.all({ start, endExclusive }) as SqlLogRow[];
const filePath = join(archiveDir, `${d}.jsonl`);
writeFileSync(filePath, buildJsonlBody(rows), "utf8");
archiveDayTx(d, start, endExclusive);
days.push({ day: d, rowCount: rows.length, filePath });
processed += 1;
d = nextUtcDay(d);
}
return days;
}
export function createLogStore(dbPath: string): LogStore { export function createLogStore(dbPath: string): LogStore {
mkdirSync(dirname(dbPath), { recursive: true }); mkdirSync(dirname(dbPath), { recursive: true });
const sqlite: BetterSqlite3.Database = new Database(dbPath); const sqlite = new DatabaseSync(dbPath);
sqlite.pragma("journal_mode = WAL"); sqlite.exec("PRAGMA journal_mode=WAL");
sqlite.exec(SCHEMA_SQL); sqlite.exec(SCHEMA_SQL);
const insertStmt = sqlite.prepare( const insertStmt = sqlite.prepare(
@@ -170,6 +309,28 @@ export function createLogStore(dbPath: string): LogStore {
"SELECT payload FROM logs WHERE source = 'workflow' AND type = 'thread_command_event' AND ref_id = ? ORDER BY id ASC", "SELECT payload FROM logs WHERE source = 'workflow' AND type = 'thread_command_event' AND ref_id = ? ORDER BY id ASC",
); );
const getThreadRoundCountStmt = sqlite.prepare(
`SELECT COUNT(*) AS c FROM logs
WHERE source = 'workflow' AND type = 'thread_command_event' AND ref_id = ?
AND payload IS NOT NULL AND json_valid(payload) = 1
AND COALESCE(json_extract(payload, '$.type'), '') != 'thread_start'`,
);
const getThreadRoundsStmt = sqlite.prepare(
`WITH numbered AS (
SELECT id, ts, payload,
ROW_NUMBER() OVER (ORDER BY id ASC) AS rn
FROM logs
WHERE source = 'workflow' AND type = 'thread_command_event' AND ref_id = @runId
AND payload IS NOT NULL AND json_valid(payload) = 1
AND COALESCE(json_extract(payload, '$.type'), '') != 'thread_start'
)
SELECT id, ts, payload, rn FROM numbered
WHERE (@before = 0 OR rn < @before)
ORDER BY rn DESC
LIMIT @lim`,
);
const getActiveWorkflowRunsStmt = sqlite.prepare( const getActiveWorkflowRunsStmt = sqlite.prepare(
"SELECT run_id, workflow, status, ts FROM workflow_runs WHERE status IN ('queued', 'started') ORDER BY ts ASC", "SELECT run_id, workflow, status, ts FROM workflow_runs WHERE status IN ('queued', 'started') ORDER BY ts ASC",
); );
@@ -186,8 +347,16 @@ export function createLogStore(dbPath: string): LogStore {
"SELECT run_id, workflow, status, ts FROM workflow_runs WHERE workflow = ? ORDER BY ts DESC", "SELECT run_id, workflow, status, ts FROM workflow_runs WHERE workflow = ? ORDER BY ts DESC",
); );
const upsertWorkflowRunTx = sqlite.transaction( const minLogTsStmt = sqlite.prepare("SELECT MIN(ts) AS m FROM logs");
(entry: Omit<LogEntry, "id">, run: WorkflowRun) => { const selectLogsForDayStmt = sqlite.prepare(
"SELECT id, source, type, ref_id, payload, ts FROM logs WHERE ts >= @start AND ts < @endExclusive ORDER BY id ASC",
);
const deleteLogsForDayStmt = sqlite.prepare(
"DELETE FROM logs WHERE ts >= @start AND ts < @endExclusive",
);
function upsertWorkflowRunTx(entry: Omit<LogEntry, "id">, run: WorkflowRun): LogEntry {
return runInTransaction(sqlite, () => {
const info = insertStmt.run({ const info = insertStmt.run({
source: entry.source, source: entry.source,
type: entry.type, type: entry.type,
@@ -202,8 +371,8 @@ export function createLogStore(dbPath: string): LogStore {
ts: run.ts, ts: run.ts,
}); });
return { ...entry, id: Number(info.lastInsertRowid) }; return { ...entry, id: Number(info.lastInsertRowid) };
}, });
); }
function append(entry: Omit<LogEntry, "id">): LogEntry { function append(entry: Omit<LogEntry, "id">): LogEntry {
const info = insertStmt.run({ const info = insertStmt.run({
@@ -218,7 +387,7 @@ export function createLogStore(dbPath: string): LogStore {
function query(filter: LogQuery = {}): LogEntry[] { function query(filter: LogQuery = {}): LogEntry[] {
const conditions: string[] = []; const conditions: string[] = [];
const params: Record<string, unknown> = {}; const params: Record<string, string | number> = {};
if (filter.source !== undefined) { if (filter.source !== undefined) {
conditions.push("source = @source"); conditions.push("source = @source");
@@ -274,11 +443,11 @@ export function createLogStore(dbPath: string): LogStore {
} }
function upsertWorkflowRun(entry: Omit<LogEntry, "id">, run: WorkflowRun): LogEntry { function upsertWorkflowRun(entry: Omit<LogEntry, "id">, run: WorkflowRun): LogEntry {
return upsertWorkflowRunTx(entry, run) as LogEntry; return upsertWorkflowRunTx(entry, run);
} }
function appendWithWorkflowUpdate(entry: Omit<LogEntry, "id">, run: WorkflowRun): LogEntry { function appendWithWorkflowUpdate(entry: Omit<LogEntry, "id">, run: WorkflowRun): LogEntry {
return upsertWorkflowRunTx(entry, run) as LogEntry; return upsertWorkflowRunTx(entry, run);
} }
function getWorkflowRun(runId: string): WorkflowRun | null { function getWorkflowRun(runId: string): WorkflowRun | null {
@@ -358,6 +527,96 @@ export function createLogStore(dbPath: string): LogStore {
return result; return result;
} }
function getThreadRoundCount(runId: string): number {
const row = getThreadRoundCountStmt.get(runId) as { c: number } | undefined;
const c = row?.c;
if (c === null || c === undefined) return 0;
return Number(c);
}
function getThreadRounds(runId: string, params: GetThreadRoundsParams): ThreadRoundRow[] {
const before = params.before;
const lim = params.limit;
if (lim < 1) return [];
const rows = getThreadRoundsStmt.all({
runId,
before,
lim,
}) as Array<{ id: number; ts: number; payload: string | null; rn: number }>;
const out: ThreadRoundRow[] = [];
for (const row of rows) {
if (row.payload === null) continue;
try {
const parsed = JSON.parse(row.payload) as unknown;
if (
parsed !== null &&
typeof parsed === "object" &&
typeof (parsed as Record<string, unknown>).type === "string"
) {
out.push({
round: row.rn,
logId: row.id,
ts: row.ts,
event: parsed as { type: string; [key: string]: unknown },
});
}
} catch {
// skip malformed payloads
}
}
return out;
}
function archiveDayTx(day: string, start: number, endExclusive: number): void {
runInTransaction(sqlite, () => {
deleteLogsForDayStmt.run({ start, endExclusive });
setMetaStmt.run({ key: LOG_ARCHIVE_META_KEY, value: day });
});
}
function readWatermark(): string | null {
const raw = getMeta(LOG_ARCHIVE_META_KEY);
if (raw === null) return null;
assertValidUtcDay(raw);
return raw;
}
function firstLogUtcDay(): string | null {
const row = minLogTsStmt.get() as { m: number | null } | undefined;
const m = row?.m;
if (m === null || m === undefined) return null;
return utcDateStringFromMs(m);
}
function archiveLogs(options: ArchiveLogsOptions = {}): ArchiveLogsResult {
const now = options.now ?? Date.now();
const retentionMs = options.retentionMs ?? DEFAULT_LOG_RETENTION_MS;
const lastDay = lastArchivableUtcDay(now - retentionMs);
const watermark = readWatermark();
const minDay = firstLogUtcDay();
if (minDay === null) {
return { days: [], vacuumed: runOptionalVacuum(sqlite, options.vacuum) };
}
const startDay = resolveArchiveStartDay(watermark, minDay);
if (compareIsoDays(startDay, lastDay) > 0) {
return { days: [], vacuumed: runOptionalVacuum(sqlite, options.vacuum) };
}
const days = runArchiveDayLoop(
dbPath,
options,
selectLogsForDayStmt,
archiveDayTx,
startDay,
lastDay,
);
return { days, vacuumed: runOptionalVacuum(sqlite, options.vacuum) };
}
function close(): void { function close(): void {
sqlite.close(); sqlite.close();
} }
@@ -374,6 +633,9 @@ export function createLogStore(dbPath: string): LogStore {
getAllWorkflowRuns, getAllWorkflowRuns,
getTriggerPayload, getTriggerPayload,
getThreadEvents, getThreadEvents,
getThreadRoundCount,
getThreadRounds,
archiveLogs,
close, close,
}; };
} }
+38 -27
View File
@@ -1,15 +1,17 @@
import { mkdirSync, readFileSync, readdirSync } from "node:fs"; import { mkdirSync, readFileSync, readdirSync } from "node:fs";
import { dirname, join } from "node:path"; import { dirname, join } from "node:path";
import { DatabaseSync } from "node:sqlite";
import Database from "better-sqlite3"; import { drizzle } from "drizzle-orm/node-sqlite";
import { drizzle } from "drizzle-orm/better-sqlite3"; import type { NodeSQLiteDatabase } from "drizzle-orm/node-sqlite";
import type { BetterSQLite3Database } from "drizzle-orm/better-sqlite3";
import type { Result } from "@uncaged/nerve-core"; import type { Result } from "@uncaged/nerve-core";
import { err, ok } from "@uncaged/nerve-core"; import { err, ok } from "@uncaged/nerve-core";
import type { BlobStore } from "./blob-store.js";
/** A Drizzle DB instance (schema-generic) */ /** A Drizzle DB instance (schema-generic) */
export type DrizzleDB = BetterSQLite3Database<Record<string, never>>; export type DrizzleDB = NodeSQLiteDatabase<Record<string, never>>;
/** Read-only map of peer sense name → their Drizzle DB */ /** Read-only map of peer sense name → their Drizzle DB */
export type PeerMap = Readonly<Record<string, DrizzleDB>>; export type PeerMap = Readonly<Record<string, DrizzleDB>>;
@@ -17,11 +19,14 @@ export type PeerMap = Readonly<Record<string, DrizzleDB>>;
/** Options passed to a compute function */ /** Options passed to a compute function */
export type ComputeOptions = { export type ComputeOptions = {
signal: AbortSignal; signal: AbortSignal;
/** CAS under `data/blobs/`; injected by the sense worker when available. */
blobs?: BlobStore;
}; };
/** /**
* The shape every sense's index.ts must export. * The shape every sense's index.ts must export.
* Engine injects `db` (read-write), `peers` (read-only), and `options`. * Engine injects `db` (read-write), `peers` (read-only), and `options`
* (`signal`, and `blobs` when running in the sense worker — RFC-001 §8 CAS).
* Returns T when a signal should be emitted, null for silence. * Returns T when a signal should be emitted, null for silence.
*/ */
export type ComputeFn<T = unknown> = ( export type ComputeFn<T = unknown> = (
@@ -37,7 +42,7 @@ export type SenseRuntime = {
compute: ComputeFn; compute: ComputeFn;
}; };
function ensureMigrationsTable(sqlite: Database.Database): Result<void> { function ensureMigrationsTable(sqlite: DatabaseSync): Result<void> {
try { try {
sqlite.exec( sqlite.exec(
`CREATE TABLE IF NOT EXISTS _migrations ( `CREATE TABLE IF NOT EXISTS _migrations (
@@ -64,11 +69,7 @@ function listMigrationFiles(migrationsDir: string): Result<string[]> {
} }
} }
function applyMigrationFile( function applyMigrationFile(sqlite: DatabaseSync, file: string, filePath: string): Result<void> {
sqlite: Database.Database,
file: string,
filePath: string,
): Result<void> {
let sql: string; let sql: string;
try { try {
sql = readFileSync(filePath, "utf8"); sql = readFileSync(filePath, "utf8");
@@ -78,13 +79,18 @@ function applyMigrationFile(
} }
const insertJournal = sqlite.prepare("INSERT INTO _migrations (name, applied_at) VALUES (?, ?)"); const insertJournal = sqlite.prepare("INSERT INTO _migrations (name, applied_at) VALUES (?, ?)");
sqlite.exec("BEGIN IMMEDIATE");
try { try {
sqlite.transaction(() => { sqlite.exec(sql);
sqlite.exec(sql); insertJournal.run(file, Date.now());
insertJournal.run(file, Date.now()); sqlite.exec("COMMIT");
})();
return ok(undefined); return ok(undefined);
} catch (e) { } catch (e) {
try {
sqlite.exec("ROLLBACK");
} catch {
// ignore secondary errors during rollback
}
const msg = e instanceof Error ? e.message : String(e); const msg = e instanceof Error ? e.message : String(e);
return err(new Error(`Migration "${file}" failed: ${msg}`)); return err(new Error(`Migration "${file}" failed: ${msg}`));
} }
@@ -92,10 +98,10 @@ function applyMigrationFile(
/** /**
* Run all *.sql migration files in the given directory against a * Run all *.sql migration files in the given directory against a
* better-sqlite3 Database, in lexicographic order. * `node:sqlite` DatabaseSync, in lexicographic order.
* Tracks applied migrations in _migrations table to avoid re-running. * Tracks applied migrations in _migrations table to avoid re-running.
*/ */
export function runMigrations(sqlite: Database.Database, migrationsDir: string): Result<void> { export function runMigrations(sqlite: DatabaseSync, migrationsDir: string): Result<void> {
const tableResult = ensureMigrationsTable(sqlite); const tableResult = ensureMigrationsTable(sqlite);
if (!tableResult.ok) return tableResult; if (!tableResult.ok) return tableResult;
@@ -124,14 +130,13 @@ export function runMigrations(sqlite: Database.Database, migrationsDir: string):
export function openSenseDb( export function openSenseDb(
dbPath: string, dbPath: string,
migrationsDir: string, migrationsDir: string,
): Result<{ sqlite: Database.Database; db: DrizzleDB }> { ): Result<{ sqlite: DatabaseSync; db: DrizzleDB }> {
let sqlite: Database.Database; let sqlite: DatabaseSync;
try { try {
mkdirSync(dirname(dbPath), { recursive: true }); mkdirSync(dirname(dbPath), { recursive: true });
sqlite = new Database(dbPath); sqlite = new DatabaseSync(dbPath);
// WAL mode for better concurrent read performance sqlite.exec("PRAGMA journal_mode=WAL");
sqlite.pragma("journal_mode = WAL");
} catch (e) { } catch (e) {
const msg = e instanceof Error ? e.message : String(e); const msg = e instanceof Error ? e.message : String(e);
return err(new Error(`Failed to open database "${dbPath}": ${msg}`)); return err(new Error(`Failed to open database "${dbPath}": ${msg}`));
@@ -140,7 +145,7 @@ export function openSenseDb(
const migResult = runMigrations(sqlite, migrationsDir); const migResult = runMigrations(sqlite, migrationsDir);
if (!migResult.ok) return migResult; if (!migResult.ok) return migResult;
const db = drizzle(sqlite) as DrizzleDB; const db = drizzle({ client: sqlite }) as DrizzleDB;
return ok({ sqlite, db }); return ok({ sqlite, db });
} }
@@ -148,16 +153,16 @@ export function openSenseDb(
* Open a peer sense DB in read-only mode (no migrations). * Open a peer sense DB in read-only mode (no migrations).
*/ */
export function openPeerDb(dbPath: string): Result<DrizzleDB> { export function openPeerDb(dbPath: string): Result<DrizzleDB> {
let sqlite: Database.Database; let sqlite: DatabaseSync;
try { try {
sqlite = new Database(dbPath, { readonly: true }); sqlite = new DatabaseSync(dbPath, { readOnly: true });
} catch (e) { } catch (e) {
const msg = e instanceof Error ? e.message : String(e); const msg = e instanceof Error ? e.message : String(e);
return err(new Error(`Failed to open peer database "${dbPath}" (readonly): ${msg}`)); return err(new Error(`Failed to open peer database "${dbPath}" (readonly): ${msg}`));
} }
return ok(drizzle(sqlite) as DrizzleDB); return ok(drizzle({ client: sqlite }) as DrizzleDB);
} }
/** /**
@@ -168,6 +173,7 @@ export async function loadComputeFn(senseIndexPath: string): Promise<Result<Comp
let mod: unknown; let mod: unknown;
try { try {
// Dynamic import required: user-authored sense module, path resolved at runtime
mod = await import(senseIndexPath); mod = await import(senseIndexPath);
} catch (e) { } catch (e) {
const msg = e instanceof Error ? e.message : String(e); const msg = e instanceof Error ? e.message : String(e);
@@ -192,14 +198,19 @@ export async function loadComputeFn(senseIndexPath: string): Promise<Result<Comp
* Execute a sense's compute function with an optional soft timeout. * Execute a sense's compute function with an optional soft timeout.
* If timeoutMs is provided and compute takes longer, the AbortSignal is * If timeoutMs is provided and compute takes longer, the AbortSignal is
* triggered and an error Result is returned. * triggered and an error Result is returned.
* When `blobStore` is set, it is exposed as `options.blobs` (see RFC-001 §8).
*/ */
export async function executeCompute( export async function executeCompute(
runtime: SenseRuntime, runtime: SenseRuntime,
peers: PeerMap, peers: PeerMap,
timeoutMs?: number, timeoutMs?: number,
blobStore?: BlobStore,
): Promise<Result<unknown | null>> { ): Promise<Result<unknown | null>> {
const controller = new AbortController(); const controller = new AbortController();
const options: ComputeOptions = { signal: controller.signal }; const options: ComputeOptions =
blobStore !== undefined
? { signal: controller.signal, blobs: blobStore }
: { signal: controller.signal };
let timer: ReturnType<typeof setTimeout> | undefined; let timer: ReturnType<typeof setTimeout> | undefined;
const timeoutPromise = const timeoutPromise =
+13 -3
View File
@@ -10,6 +10,7 @@
* senses/<name>/index.js ← compiled compute * senses/<name>/index.js ← compiled compute
* senses/<name>/migrations/ ← SQL migration files * senses/<name>/migrations/ ← SQL migration files
* data/senses/<name>.db ← SQLite data file * data/senses/<name>.db ← SQLite data file
* data/blobs/<aa>/<hashrest> ← CAS (sha256), via options.blobs in compute
* nerve.yaml ← config * nerve.yaml ← config
*/ */
@@ -19,10 +20,12 @@ import { join, resolve } from "node:path";
import { parseNerveConfig } from "@uncaged/nerve-core"; import { parseNerveConfig } from "@uncaged/nerve-core";
import type { NerveConfig } from "@uncaged/nerve-core"; import type { NerveConfig } from "@uncaged/nerve-core";
import { createBlobStore } from "./blob-store.js";
import type { WorkerToParentMessage } from "./ipc.js"; import type { WorkerToParentMessage } from "./ipc.js";
import { parseParentMessage } from "./ipc.js"; import { parseParentMessage } from "./ipc.js";
import { executeCompute, loadComputeFn, openPeerDb, openSenseDb } from "./sense-runtime.js"; import { executeCompute, loadComputeFn, openPeerDb, openSenseDb } from "./sense-runtime.js";
import type { DrizzleDB, PeerMap, SenseRuntime } from "./sense-runtime.js"; import type { DrizzleDB, PeerMap, SenseRuntime } from "./sense-runtime.js";
import { ignoreSessionBroadcastSignals } from "./worker-fork-support.js";
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// IPC helpers // IPC helpers
@@ -162,9 +165,10 @@ async function runCompute(
peers: PeerMap, peers: PeerMap,
timeoutMs: number, timeoutMs: number,
gracePeriodMs: number | null, gracePeriodMs: number | null,
blobStore: ReturnType<typeof createBlobStore>,
): Promise<void> { ): Promise<void> {
try { try {
const result = await executeCompute(runtime, peers, timeoutMs); const result = await executeCompute(runtime, peers, timeoutMs, blobStore);
if (!result.ok) { if (!result.ok) {
sendError(senseName, result.error.message); sendError(senseName, result.error.message);
if (gracePeriodMs !== null && result.error.message.includes("timed out")) { if (gracePeriodMs !== null && result.error.message.includes("timed out")) {
@@ -193,6 +197,7 @@ function handleMessage(
group: string, group: string,
senseConfigs: Map<string, { timeout: number | null; gracePeriod: number | null }>, senseConfigs: Map<string, { timeout: number | null; gracePeriod: number | null }>,
inFlight: Map<string, Promise<void>>, inFlight: Map<string, Promise<void>>,
blobStore: ReturnType<typeof createBlobStore>,
): void { ): void {
const parseResult = parseParentMessage(raw); const parseResult = parseParentMessage(raw);
if (!parseResult.ok) { if (!parseResult.ok) {
@@ -230,7 +235,7 @@ function handleMessage(
const previous = inFlight.get(msg.sense) ?? Promise.resolve(); const previous = inFlight.get(msg.sense) ?? Promise.resolve();
const next = previous const next = previous
.then(() => runCompute(msg.sense, runtime, peers, timeoutMs, gracePeriodMs)) .then(() => runCompute(msg.sense, runtime, peers, timeoutMs, gracePeriodMs, blobStore))
.catch((e: unknown) => { .catch((e: unknown) => {
const errMsg = e instanceof Error ? e.message : String(e); const errMsg = e instanceof Error ? e.message : String(e);
sendError(msg.sense, errMsg); sendError(msg.sense, errMsg);
@@ -294,11 +299,12 @@ async function bootstrap(nerveRoot: string, group: string): Promise<void> {
} }
const inFlight = new Map<string, Promise<void>>(); const inFlight = new Map<string, Promise<void>>();
const blobStore = createBlobStore(join(nerveRoot, "data", "blobs"));
sendReady(); sendReady();
process.on("message", (raw: unknown) => { process.on("message", (raw: unknown) => {
handleMessage(raw, runtimes, peers, group, senseConfigs, inFlight); handleMessage(raw, runtimes, peers, group, senseConfigs, inFlight, blobStore);
}); });
} }
@@ -331,6 +337,10 @@ if (!parsed) {
process.exit(1); process.exit(1);
} }
if (typeof process.send === "function") {
ignoreSessionBroadcastSignals();
}
bootstrap(parsed.nerveRoot, parsed.group).catch((e) => { bootstrap(parsed.nerveRoot, parsed.group).catch((e) => {
const msg = e instanceof Error ? e.message : String(e); const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[sense-worker] Unhandled bootstrap error: ${msg}\n`); process.stderr.write(`[sense-worker] Unhandled bootstrap error: ${msg}\n`);
@@ -0,0 +1,48 @@
import type { ChildProcess } from "node:child_process";
const STDERR_TAIL_MAX_CHARS = 16_384;
/**
* Forked workers inherit the parent's process group. In foreground `nerve dev`,
* terminal-driven SIGINT/SIGTERM is delivered to the whole group, so workers can exit
* on the default handler before the kernel sends `{ type: "shutdown" }` over IPC.
* Swallow these in worker processes so the parent coordinates shutdown (issue #55).
* Only call when `process.send` is defined (fork IPC); standalone `node …-worker.js` keeps default Ctrl+C behaviour.
*/
export function ignoreSessionBroadcastSignals(): void {
const swallow = (): void => {};
process.on("SIGINT", swallow);
process.on("SIGTERM", swallow);
}
export function teeCapturedStderr(child: ChildProcess, tail: { value: string }): void {
const stream = child.stderr;
if (stream === null || stream === undefined) return;
stream.setEncoding("utf8");
stream.on("data", (chunk: string | Buffer) => {
const text = typeof chunk === "string" ? chunk : chunk.toString("utf8");
process.stderr.write(text);
tail.value = (tail.value + text).slice(-STDERR_TAIL_MAX_CHARS);
});
}
export function formatChildExitSummary(
code: number | null,
signal: NodeJS.Signals | null,
): string {
const codeStr = code === null || code === undefined ? "null" : String(code);
if (signal) {
return `code=${codeStr} signal=${signal}`;
}
return `code=${codeStr}`;
}
export function formatCapturedStderrTail(tail: string, maxChars = 800): string {
const trimmed = tail.trim();
if (trimmed.length === 0) return "";
const normalized = trimmed.replace(/\r?\n/g, "\\n");
if (normalized.length <= maxChars) {
return ` worker_stderr=${normalized}`;
}
return ` worker_stderr=…${normalized.slice(-maxChars)}`;
}
+33 -8
View File
@@ -22,6 +22,11 @@ import type {
import { parseWorkerMessage } from "./ipc.js"; import { parseWorkerMessage } from "./ipc.js";
import type { LogStore } from "./log-store.js"; import type { LogStore } from "./log-store.js";
import type { WorkflowRunStatus } from "./log-store.js"; import type { WorkflowRunStatus } from "./log-store.js";
import {
formatCapturedStderrTail,
formatChildExitSummary,
teeCapturedStderr,
} from "./worker-fork-support.js";
export type WorkflowManager = { export type WorkflowManager = {
/** Trigger a new workflow thread (called by Reflex scheduler). */ /** Trigger a new workflow thread (called by Reflex scheduler). */
@@ -60,6 +65,7 @@ type WorkerEntry = {
stopping: boolean; stopping: boolean;
/** When set, the worker is draining before a hot-reload respawn. */ /** When set, the worker is draining before a hot-reload respawn. */
draining: boolean; draining: boolean;
stderrTail: { value: string };
}; };
// Crash respawn backoff: track crash timestamps per workflow. // Crash respawn backoff: track crash timestamps per workflow.
@@ -85,12 +91,18 @@ function spawnWorkflowWorker(
nerveRoot: string, nerveRoot: string,
workflowName: string, workflowName: string,
workerScript: string, workerScript: string,
stderrTail: { value: string },
): ChildProcess { ): ChildProcess {
const child = fork(workerScript, ["--workflow", workflowName, "--root", nerveRoot], { const child = fork(workerScript, ["--workflow", workflowName, "--root", nerveRoot], {
stdio: ["ignore", "inherit", "inherit", "ipc"], stdio: ["ignore", "inherit", "pipe", "ipc"],
}); });
teeCapturedStderr(child, stderrTail);
// Prevent unhandled EPIPE when writing to a child whose IPC channel closed // Prevent unhandled EPIPE when writing to a child whose IPC channel closed
child.on("error", () => {}); child.on("error", (err) => {
if ((err as NodeJS.ErrnoException).code !== "EPIPE") {
console.error("[worker] error:", err.message);
}
});
return child; return child;
} }
@@ -391,7 +403,11 @@ export function createWorkflowManager(
state.active.clear(); state.active.clear();
} }
function handleWorkerExit(workflowName: string, code: number | null): void { function handleWorkerExit(
workflowName: string,
code: number | null,
signal: NodeJS.Signals | null,
): void {
const entry = workers.get(workflowName); const entry = workers.get(workflowName);
if (entry?.draining) { if (entry?.draining) {
workers.delete(workflowName); workers.delete(workflowName);
@@ -412,8 +428,10 @@ export function createWorkflowManager(
} }
return; return;
} }
const summary = formatChildExitSummary(code, signal);
const stderrExtra = entry !== undefined ? formatCapturedStderrTail(entry.stderrTail.value) : "";
process.stderr.write( process.stderr.write(
`[workflow-manager] worker for "${workflowName}" exited with code ${code ?? "null"}\n`, `[workflow-manager] worker for "${workflowName}" exited (${summary})${stderrExtra}\n`,
); );
handleWorkerCrash(workflowName); handleWorkerCrash(workflowName);
} }
@@ -424,17 +442,24 @@ export function createWorkflowManager(
return existing; return existing;
} }
const child = spawnWorkflowWorker(nerveRoot, workflowName, workerScript); const stderrTail = { value: "" };
const child = spawnWorkflowWorker(nerveRoot, workflowName, workerScript, stderrTail);
child.on("message", (raw: unknown) => { child.on("message", (raw: unknown) => {
handleWorkerMessage(workflowName, raw); handleWorkerMessage(workflowName, raw);
}); });
child.on("exit", (code) => { child.on("exit", (code, signal) => {
handleWorkerExit(workflowName, code); handleWorkerExit(workflowName, code, signal ?? null);
}); });
const entry: WorkerEntry = { workflowName, process: child, stopping: false, draining: false }; const entry: WorkerEntry = {
workflowName,
process: child,
stopping: false,
draining: false,
stderrTail,
};
workers.set(workflowName, entry); workers.set(workflowName, entry);
return entry; return entry;
} }
+7 -2
View File
@@ -21,6 +21,7 @@ import type {
import type { ThreadCommandEventMessage, ThreadEventType, WorkerToParentMessage } from "./ipc.js"; import type { ThreadCommandEventMessage, ThreadEventType, WorkerToParentMessage } from "./ipc.js";
import { parseParentMessage } from "./ipc.js"; import { parseParentMessage } from "./ipc.js";
import { ignoreSessionBroadcastSignals } from "./worker-fork-support.js";
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// IPC helpers // IPC helpers
@@ -119,8 +120,7 @@ async function runThread(
const initialEvent: CommandEvent = { const initialEvent: CommandEvent = {
type: "thread_start", type: "thread_start",
triggerPayload: triggerPayload: triggerPayload ?? {},
triggerPayload != null && typeof triggerPayload === "object" ? triggerPayload : {},
}; };
// On resume: replay persisted events, run the next un-executed role, then continue. // On resume: replay persisted events, run the next un-executed role, then continue.
@@ -197,6 +197,7 @@ async function loadWorkflowDefinition(
); );
} }
// Dynamic import required: user-authored workflow module, path resolved at runtime
const mod = await import(indexPath); const mod = await import(indexPath);
const def: unknown = mod.default ?? mod; const def: unknown = mod.default ?? mod;
@@ -334,6 +335,10 @@ if (!parsed) {
process.exit(1); process.exit(1);
} }
if (typeof process.send === "function") {
ignoreSessionBroadcastSignals();
}
bootstrap(parsed.nerveRoot, parsed.workflow).catch((e) => { bootstrap(parsed.nerveRoot, parsed.workflow).catch((e) => {
const msg = e instanceof Error ? e.message : String(e); const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`[workflow-worker] Unhandled bootstrap error: ${msg}\n`); process.stderr.write(`[workflow-worker] Unhandled bootstrap error: ${msg}\n`);
-8
View File
@@ -1,8 +0,0 @@
import { defineConfig } from "tsup";
export default defineConfig({
entry: ["src/index.ts", "src/sense-worker.ts"],
format: ["esm"],
dts: true,
clean: true,
});
+1040 -488
View File
File diff suppressed because it is too large Load Diff
-1
View File
@@ -3,5 +3,4 @@ packages:
onlyBuiltDependencies: onlyBuiltDependencies:
- "@biomejs/biome" - "@biomejs/biome"
- better-sqlite3
- esbuild - esbuild
+8
View File
@@ -0,0 +1,8 @@
#!/bin/bash
# All packages must use pnpm publish. Block npm publish unconditionally.
if [ -z "$npm_execpath" ] || [[ "$npm_execpath" != *pnpm* ]]; then
echo "❌ Use 'pnpm publish' instead of 'npm publish'."
echo " pnpm auto-converts workspace:* dependencies to real versions."
exit 1
fi