Compare commits

...

14 Commits

Author SHA1 Message Date
xiaoju 640f170de8 refactor: add daemon subcommand group and dev foreground mode
- Create 'nerve daemon' subcommand group: start, stop, status, restart, logs
- Create 'nerve dev' for foreground mode (replaces old start without -d)
- 'nerve daemon start' is always background (removed -d/--daemon flag)
- Keep top-level aliases: nerve start/stop/status/logs → nerve daemon *
- Extract runStopCommand() for restart reuse
- Add daemon-cli tests

Closes #53

小橘 🍊(NEKO Team)
2026-04-23 01:16:13 +00:00
xiaoju 119b1f3722 chore: enforce pnpm publish for all packages unconditionally
小橘 <xiaoju@shazhou.work>
2026-04-23 00:49:39 +00:00
xiaoju 96ea4b46ff chore: add prepublish guard against npm publish with workspace:* deps
小橘 <xiaoju@shazhou.work>
2026-04-23 00:47:56 +00:00
xiaoju 57881533a8 docs: fix publish skill — use pnpm publish for workspace:* conversion
小橘 <xiaoju@shazhou.work>
2026-04-23 00:43:51 +00:00
xiaoju a62a993a82 fix(cli): remove duplicate shebang in daemon-bootstrap causing crash on nerve start -d
小橘 <xiaoju@shazhou.work>
2026-04-23 00:43:18 +00:00
xiaoju 3f22eb4664 release: @uncaged/nerve-core@0.1.3, @uncaged/nerve-daemon@0.1.4, @uncaged/nerve-cli@0.1.5
小橘 <xiaoju@shazhou.work>
2026-04-23 00:35:40 +00:00
xiaoju b5913263e4 docs: add publish and setup skills
小橘 <xiaoju@shazhou.work>
2026-04-23 00:31:27 +00:00
xiaomo d3ecd2a492 Merge pull request 'fix: address review issues #46-#49' (#52) from fix/review-issues-46-49 into main 2026-04-23 00:24:19 +00:00
xiaoju 8763440436 fix: address review issues #46-#49
#46 — EPIPE handler: only silence EPIPE, log other child errors
#47 — lastSignalTs: query sense/signal instead of reflex/run_complete
#48 — SenseInfo: deduplicate to @uncaged/nerve-core, add expectTypeOf test
#49 — IPC client: extract sendAndReceive<T> to eliminate duplication

小橘 <xiaoju@shazhou.work>
2026-04-23 00:22:55 +00:00
xiaomo f270804002 Merge pull request 'feat(daemon): CAS blob store — sha256 content-addressable storage (closes #39)' (#51) from feat/blob-store into main 2026-04-23 00:21:46 +00:00
xiaoju 404ee3e34f feat(daemon): add CAS blob store with sha256 content-addressable storage — closes #39
- createBlobStore(root) with write/read/exists API
- sha256 hex, first 2 chars as shard directory
- Atomic writes via temp file + rename
- CAS mismatch detection on read and write
- Inject blobStore into sense compute via options.blobs
- Export createBlobStore, normalizeBlobHash, BlobStore type
2026-04-23 00:19:35 +00:00
xiaomo cbc6db6b7d Merge pull request 'feat(daemon): log store archival — Meta table + JSONL cold archive (closes #38)' (#45) from feat/log-archive into main 2026-04-23 00:17:54 +00:00
xiaomo b1f6c775ce Merge pull request 'fix(init): auto-verify and retry better-sqlite3 native build — closes #44' (#50) from fix/init-sqlite-retry into main 2026-04-23 00:14:30 +00:00
xiaoju 978b1680a3 feat(daemon): add log store archival with meta watermark + JSONL cold archive — closes #38
- Add meta table with archived_up_to watermark in logs.db
- Archive logs older than 30 days to data/archive/logs/YYYY-MM-DD.jsonl
- Idempotent: same-day re-export overwrites file
- Single transaction: DELETE + UPDATE meta
- Optional VACUUM after archive loop
- CLI: nerve store archive [--vacuum]
- 15+ new tests for archive logic
2026-04-23 00:10:20 +00:00
40 changed files with 1228 additions and 186 deletions
+80
View File
@@ -0,0 +1,80 @@
# Skill: Publish @uncaged/nerve packages to npm
## When to use
When releasing a new version of any `@uncaged/nerve-*` package to npm.
## Prerequisites
- npm login with an account that has **owner** access to the `@uncaged` org
- All tests pass: `pnpm -r run test`
- Clean working tree (no uncommitted changes)
## Packages
| Package | Path | npm |
|---------|------|-----|
| `@uncaged/nerve-core` | `packages/core` | [link](https://www.npmjs.com/package/@uncaged/nerve-core) |
| `@uncaged/nerve-daemon` | `packages/daemon` | [link](https://www.npmjs.com/package/@uncaged/nerve-daemon) |
| `@uncaged/nerve-cli` | `packages/cli` | [link](https://www.npmjs.com/package/@uncaged/nerve-cli) |
## Dependency order
`core``daemon``cli`
Always publish in this order. If `core` has changes, bump and publish it first, then update dependents.
## Steps
### 1. Ensure clean state
```bash
git checkout main && git pull origin main
pnpm install
pnpm -r run build
pnpm -r run test
```
### 2. Bump versions
Manually update `version` in each changed package's `package.json`.
Follow semver:
- **patch** (0.1.x): bug fixes, refactors
- **minor** (0.x.0): new features, non-breaking API additions
- **major** (x.0.0): breaking changes
If bumping `core`, also update the `@uncaged/nerve-core` dependency version in `daemon` and `cli` package.json. Same for `daemon``cli`.
### 3. Build
```bash
pnpm -r run build
```
### 4. Publish (in order)
```bash
# Only publish packages that have version bumps
# MUST use pnpm publish (not npm) — pnpm converts workspace:* to real versions
cd packages/core && pnpm publish --access public --no-git-checks
cd packages/daemon && pnpm publish --access public --no-git-checks
cd packages/cli && pnpm publish --access public --no-git-checks
```
### 5. Commit & tag
```bash
git add -A
git commit -m "release: @uncaged/nerve-core@X.Y.Z, @uncaged/nerve-daemon@X.Y.Z, @uncaged/nerve-cli@X.Y.Z"
git tag -a vX.Y.Z -m "Release vX.Y.Z"
git push origin main --tags
```
## Pitfalls
- **Don't publish without building first** — `tsup` output in `dist/` is what npm ships
- **Dependency order matters** — if you publish `daemon` before `core`, npm may resolve the old `core` version
- **`--access public`** is required for scoped packages on first publish; safe to always include
- **Check `npm whoami`** to confirm you're logged in as the right account
- **No changeset tool** — this project uses manual version bumps (no changesets/lerna)
+101
View File
@@ -0,0 +1,101 @@
# Skill: Setup nerve from scratch
## When to use
Setting up the nerve project for local development from a fresh clone.
## Prerequisites
- **Node.js** ≥ 18
- **pnpm** ≥ 9 (`npm install -g pnpm`)
- **Git** access to `git.shazhou.work`
## Steps
### 1. Clone
```bash
git clone https://git.shazhou.work/uncaged/nerve.git
cd nerve
```
### 2. Install dependencies
```bash
pnpm install
```
This installs all workspace packages and links internal dependencies (`core``daemon``cli`).
### 3. Build all packages
```bash
pnpm -r run build
```
Build order is handled automatically by pnpm workspace — `core` builds first, then `daemon`, then `cli`.
### 4. Run tests
```bash
pnpm -r run test
```
Or test individual packages:
```bash
pnpm --filter @uncaged/nerve-core test
pnpm --filter @uncaged/nerve-daemon test
pnpm --filter @uncaged/nerve-cli test
```
### 5. Try the CLI
```bash
# Link the CLI globally
cd packages/cli && npm link
# Initialize a workspace
mkdir ~/my-nerve-workspace && cd ~/my-nerve-workspace
nerve init
# Edit senses in nerve.yaml, then:
nerve start # start the daemon
nerve sense list # list registered senses
nerve stop # stop the daemon
```
### 6. Lint & format
```bash
pnpm run check # biome lint check
pnpm run format # biome auto-format
```
## Project structure
```
nerve/
├── packages/
│ ├── core/ # @uncaged/nerve-core — shared types, log store, blob store
│ ├── daemon/ # @uncaged/nerve-daemon — kernel, sense runtime, workflow manager
│ └── cli/ # @uncaged/nerve-cli — CLI commands (init, start, stop, sense, etc.)
├── docs/ # RFCs, conventions, skills
├── pnpm-workspace.yaml
└── biome.json # linter/formatter config
```
## Key conventions
- **Monorepo** with pnpm workspaces
- **ESM only** — all packages output ESM (`"type": "module"`)
- **tsup** for builds, **vitest** for tests, **biome** for lint/format
- **SQLite** (better-sqlite3) for log store and blob store
- See `docs/coding-conventions.md` for code style rules
## Pitfalls
- **Must build before test** — daemon and cli import compiled output from core
- **better-sqlite3** requires native compilation — if `pnpm install` fails, ensure you have build tools (`build-essential` on Linux, Xcode CLI tools on macOS)
- **Node 18+** required — uses native `fetch`, `crypto.randomUUID`, etc.
- **pnpm only** — don't use npm/yarn, workspace links won't resolve correctly
+2 -1
View File
@@ -1,6 +1,6 @@
{ {
"name": "@uncaged/nerve-cli", "name": "@uncaged/nerve-cli",
"version": "0.1.4", "version": "0.1.7",
"type": "module", "type": "module",
"bin": { "bin": {
"nerve": "dist/cli.js" "nerve": "dist/cli.js"
@@ -14,6 +14,7 @@
"access": "public" "access": "public"
}, },
"scripts": { "scripts": {
"prepublishOnly": "bash ../../scripts/prepublish-check.sh",
"build": "tsup", "build": "tsup",
"test": "vitest run" "test": "vitest run"
}, },
@@ -0,0 +1,28 @@
import { describe, expect, it } from "vitest";
import { daemonCommand } from "../commands/daemon.js";
import { devCommand } from "../commands/dev.js";
import { daemonStartCommand } from "../commands/start.js";
describe("nerve daemon command group", () => {
it("exposes start, stop, status, restart, and logs subcommands", () => {
const subs = daemonCommand.subCommands;
expect(subs).toBeDefined();
if (!subs) {
throw new Error("expected daemonCommand.subCommands");
}
expect(Object.keys(subs).sort()).toEqual(["logs", "restart", "start", "status", "stop"]);
});
it("shares the same start command object as top-level nerve start alias", () => {
const subs = daemonCommand.subCommands;
expect(subs?.start).toBe(daemonStartCommand);
});
});
describe("nerve dev", () => {
it("is a foreground dev command", () => {
expect(devCommand.meta?.name).toBe("dev");
expect(devCommand.meta?.description).toMatch(/foreground/i);
});
});
@@ -3,16 +3,24 @@
* If the daemon package changes its public API, this file will fail to compile. * If the daemon package changes its public API, this file will fail to compile.
*/ */
import type { SenseInfo } from "@uncaged/nerve-core";
import type { import type {
ArchiveLogsDayResult as DaemonArchiveLogsDayResult,
ArchiveLogsOptions as DaemonArchiveLogsOptions,
ArchiveLogsResult as DaemonArchiveLogsResult,
LogEntry as DaemonLogEntry, LogEntry as DaemonLogEntry,
LogQuery as DaemonLogQuery, LogQuery as DaemonLogQuery,
LogStore as DaemonLogStore, LogStore as DaemonLogStore,
SenseInfo as DaemonSenseInfo,
WorkflowRun as DaemonWorkflowRun, WorkflowRun as DaemonWorkflowRun,
WorkflowRunStatus as DaemonWorkflowRunStatus, WorkflowRunStatus as DaemonWorkflowRunStatus,
} from "@uncaged/nerve-daemon"; } from "@uncaged/nerve-daemon";
import { describe, it, expectTypeOf } from "vitest"; import { describe, expectTypeOf, it } from "vitest";
import type { import type {
ArchiveLogsDayResult,
ArchiveLogsOptions,
ArchiveLogsResult,
LogEntry, LogEntry,
LogQuery, LogQuery,
LogStore, LogStore,
@@ -21,6 +29,11 @@ import type {
} from "../daemon-types.js"; } from "../daemon-types.js";
describe("daemon-types drift guard", () => { describe("daemon-types drift guard", () => {
it("SenseInfo matches daemon package export (list-senses IPC)", () => {
expectTypeOf<SenseInfo>().toMatchTypeOf<DaemonSenseInfo>();
expectTypeOf<DaemonSenseInfo>().toMatchTypeOf<SenseInfo>();
});
it("WorkflowRunStatus is assignable both ways", () => { it("WorkflowRunStatus is assignable both ways", () => {
expectTypeOf<WorkflowRunStatus>().toMatchTypeOf<DaemonWorkflowRunStatus>(); expectTypeOf<WorkflowRunStatus>().toMatchTypeOf<DaemonWorkflowRunStatus>();
expectTypeOf<DaemonWorkflowRunStatus>().toMatchTypeOf<WorkflowRunStatus>(); expectTypeOf<DaemonWorkflowRunStatus>().toMatchTypeOf<WorkflowRunStatus>();
@@ -42,6 +55,26 @@ describe("daemon-types drift guard", () => {
}); });
it("LogStore has all required methods", () => { it("LogStore has all required methods", () => {
expectTypeOf<LogStore>().toMatchTypeOf<Pick<DaemonLogStore, "query" | "getWorkflowRun" | "getActiveWorkflowRuns" | "getAllWorkflowRuns" | "upsertWorkflowRun" | "close">>(); expectTypeOf<LogStore>().toMatchTypeOf<
Pick<
DaemonLogStore,
| "query"
| "getWorkflowRun"
| "getActiveWorkflowRuns"
| "getAllWorkflowRuns"
| "upsertWorkflowRun"
| "archiveLogs"
| "close"
>
>();
});
it("ArchiveLogs types match daemon", () => {
expectTypeOf<ArchiveLogsOptions>().toMatchTypeOf<DaemonArchiveLogsOptions>();
expectTypeOf<DaemonArchiveLogsOptions>().toMatchTypeOf<ArchiveLogsOptions>();
expectTypeOf<ArchiveLogsResult>().toMatchTypeOf<DaemonArchiveLogsResult>();
expectTypeOf<DaemonArchiveLogsResult>().toMatchTypeOf<ArchiveLogsResult>();
expectTypeOf<ArchiveLogsDayResult>().toMatchTypeOf<DaemonArchiveLogsDayResult>();
expectTypeOf<DaemonArchiveLogsDayResult>().toMatchTypeOf<ArchiveLogsDayResult>();
}); });
}); });
@@ -13,18 +13,24 @@ import { createServer } from "node:net";
import { tmpdir } from "node:os"; import { tmpdir } from "node:os";
import { join } from "node:path"; import { join } from "node:path";
import type { SenseInfo } from "@uncaged/nerve-core";
import { afterEach, beforeEach, describe, expect, it } from "vitest"; import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { listSensesViaDaemon } from "../daemon-client.js";
import type { SenseInfo } from "../daemon-client.js";
import { formatDuration, formatSenseList, sensesFromConfig } from "../commands/sense.js"; import { formatDuration, formatSenseList, sensesFromConfig } from "../commands/sense.js";
import { listSensesViaDaemon } from "../daemon-client.js";
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Fixtures // Fixtures
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
const SAMPLE_SENSES: SenseInfo[] = [ const SAMPLE_SENSES: SenseInfo[] = [
{ name: "cpu-usage", group: "system", throttle: 5000, timeout: 3000, lastSignalTs: 1_700_000_000_000 }, {
name: "cpu-usage",
group: "system",
throttle: 5000,
timeout: 3000,
lastSignalTs: 1_700_000_000_000,
},
{ name: "disk-usage", group: "system", throttle: 30000, timeout: null, lastSignalTs: null }, { name: "disk-usage", group: "system", throttle: 30000, timeout: null, lastSignalTs: null },
{ name: "active-tasks", group: "tasks", throttle: 10000, timeout: 30000, lastSignalTs: null }, { name: "active-tasks", group: "tasks", throttle: 10000, timeout: 30000, lastSignalTs: null },
]; ];
+8 -2
View File
@@ -1,11 +1,14 @@
import { defineCommand, runMain } from "citty"; import { defineCommand, runMain } from "citty";
import { daemonCommand } from "./commands/daemon.js";
import { devCommand } from "./commands/dev.js";
import { initCommand } from "./commands/init.js"; import { initCommand } from "./commands/init.js";
import { logsCommand } from "./commands/logs.js"; import { logsCommand } from "./commands/logs.js";
import { senseCommand } from "./commands/sense.js"; import { senseCommand } from "./commands/sense.js";
import { startCommand } from "./commands/start.js"; import { daemonStartCommand } from "./commands/start.js";
import { statusCommand } from "./commands/status.js"; import { statusCommand } from "./commands/status.js";
import { stopCommand } from "./commands/stop.js"; import { stopCommand } from "./commands/stop.js";
import { storeCommand } from "./commands/store.js";
import { validateCommand } from "./commands/validate.js"; import { validateCommand } from "./commands/validate.js";
import { workflowCommand } from "./commands/workflow.js"; import { workflowCommand } from "./commands/workflow.js";
@@ -16,12 +19,15 @@ const main = defineCommand({
}, },
subCommands: { subCommands: {
init: initCommand, init: initCommand,
start: startCommand, daemon: daemonCommand,
dev: devCommand,
start: daemonStartCommand,
stop: stopCommand, stop: stopCommand,
status: statusCommand, status: statusCommand,
logs: logsCommand, logs: logsCommand,
validate: validateCommand, validate: validateCommand,
sense: senseCommand, sense: senseCommand,
store: storeCommand,
workflow: workflowCommand, workflow: workflowCommand,
}, },
}); });
+31
View File
@@ -0,0 +1,31 @@
import { defineCommand } from "citty";
import { logsCommand } from "./logs.js";
import { daemonStartCommand, runDaemonStartCommand } from "./start.js";
import { statusCommand } from "./status.js";
import { runStopCommand, stopCommand } from "./stop.js";
const daemonRestartCommand = defineCommand({
meta: {
name: "restart",
description: "Stop then start the nerve daemon",
},
async run() {
await runStopCommand();
await runDaemonStartCommand();
},
});
export const daemonCommand = defineCommand({
meta: {
name: "daemon",
description: "Manage the nerve background daemon",
},
subCommands: {
start: daemonStartCommand,
stop: stopCommand,
status: statusCommand,
restart: daemonRestartCommand,
logs: logsCommand,
},
});
+17
View File
@@ -0,0 +1,17 @@
import { defineCommand } from "citty";
import { runForegroundKernelSession } from "../run-foreground-kernel.js";
import { loadDaemonModule } from "../workspace-daemon.js";
import { getNerveRoot } from "../workspace.js";
export const devCommand = defineCommand({
meta: {
name: "dev",
description: "Run the nerve kernel in the foreground (development mode)",
},
async run() {
const nerveRoot = getNerveRoot();
const { createKernel } = await loadDaemonModule(nerveRoot);
await runForegroundKernelSession(nerveRoot, createKernel);
},
});
+2 -4
View File
@@ -1,11 +1,10 @@
import { readFileSync } from "node:fs"; import { readFileSync } from "node:fs";
import { join } from "node:path"; import { join } from "node:path";
import { parseNerveConfig } from "@uncaged/nerve-core"; import { type SenseInfo, parseNerveConfig } from "@uncaged/nerve-core";
import { defineCommand } from "citty"; import { defineCommand } from "citty";
import { listSensesViaDaemon, triggerSenseViaDaemon } from "../daemon-client.js"; import { listSensesViaDaemon, triggerSenseViaDaemon } from "../daemon-client.js";
import type { SenseInfo } from "../daemon-client.js";
import { getNerveRoot, getSocketPath, isRunning } from "../workspace.js"; import { getNerveRoot, getSocketPath, isRunning } from "../workspace.js";
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -35,8 +34,7 @@ export function formatSenseList(senses: SenseInfo[]): string {
lines.push(` group: ${s.group}\n`); lines.push(` group: ${s.group}\n`);
lines.push(` throttle: ${formatDuration(s.throttle)}\n`); lines.push(` throttle: ${formatDuration(s.throttle)}\n`);
lines.push(` timeout: ${formatDuration(s.timeout)}\n`); lines.push(` timeout: ${formatDuration(s.timeout)}\n`);
const lastSignal = const lastSignal = s.lastSignalTs !== null ? new Date(s.lastSignalTs).toISOString() : "(never)";
s.lastSignalTs !== null ? new Date(s.lastSignalTs).toISOString() : "(never)";
lines.push(` last signal: ${lastSignal}\n`); lines.push(` last signal: ${lastSignal}\n`);
} }
return lines.join(""); return lines.join("");
+11 -27
View File
@@ -5,8 +5,6 @@ import { fileURLToPath } from "node:url";
import { defineCommand } from "citty"; import { defineCommand } from "citty";
import { runForegroundKernelSession } from "../run-foreground-kernel.js";
import { loadDaemonModule } from "../workspace-daemon.js";
import { import {
getLogPath, getLogPath,
getNerveRoot, getNerveRoot,
@@ -52,15 +50,10 @@ function daemonBootstrapScript(): string {
return bootstrapJs; return bootstrapJs;
} }
throw new Error( throw new Error(
`daemon-bootstrap.js not found next to CLI at ${bootstrapJs}. Build the CLI package (e.g. \`pnpm --filter @uncaged/nerve-cli build\`) before using background mode (\`nerve start -d\`).`, `daemon-bootstrap.js not found next to CLI at ${bootstrapJs}. Build the CLI package (e.g. \`pnpm --filter @uncaged/nerve-cli build\`) before using \`nerve daemon start\`.`,
); );
} }
async function runForeground(nerveRoot: string): Promise<void> {
const { createKernel } = await loadDaemonModule(nerveRoot);
await runForegroundKernelSession(nerveRoot, createKernel);
}
async function runDaemon(nerveRoot: string): Promise<void> { async function runDaemon(nerveRoot: string): Promise<void> {
if (isRunning()) { if (isRunning()) {
const pid = readPidFile(); const pid = readPidFile();
@@ -110,29 +103,20 @@ async function runDaemon(nerveRoot: string): Promise<void> {
process.stdout.write(`✅ Nerve daemon started (pid ${pid}).\n`); process.stdout.write(`✅ Nerve daemon started (pid ${pid}).\n`);
process.stdout.write(` Logs: ${logPath}\n`); process.stdout.write(` Logs: ${logPath}\n`);
process.stdout.write(" Run `nerve stop` to stop.\n"); process.stdout.write(" Run `nerve daemon stop` (or `nerve stop`) to stop.\n");
} }
export const startCommand = defineCommand({ /** Background daemon only — use `nerve dev` for foreground mode. */
export async function runDaemonStartCommand(): Promise<void> {
await runDaemon(getNerveRoot());
}
export const daemonStartCommand = defineCommand({
meta: { meta: {
name: "start", name: "start",
description: "Start the nerve daemon", description: "Start the nerve daemon in the background",
}, },
args: { async run() {
daemon: { await runDaemonStartCommand();
type: "boolean",
alias: "d",
description: "Run as background daemon",
default: false,
},
},
async run({ args }) {
const nerveRoot = getNerveRoot();
if (args.daemon) {
await runDaemon(nerveRoot);
} else {
await runForeground(nerveRoot);
}
}, },
}); });
+38 -33
View File
@@ -15,44 +15,49 @@ async function waitForExit(pid: number, timeoutMs: number): Promise<boolean> {
return false; return false;
} }
/** Core stop logic — also used by `nerve daemon restart`. */
export async function runStopCommand(): Promise<void> {
const pid = readPidFile();
if (pid === null) {
process.stdout.write("⚠️ No PID file found — daemon may not be running.\n");
return;
}
if (!isRunning()) {
process.stdout.write("⚠️ Daemon is not running (stale PID file). Cleaning up.\n");
removePidFile();
return;
}
process.stdout.write(`Stopping nerve daemon (pid ${pid})…\n`);
try {
process.kill(pid, "SIGTERM");
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`❌ Failed to send SIGTERM: ${msg}\n`);
process.exit(1);
}
const graceful = await waitForExit(pid, 10_000);
if (!graceful) {
process.stdout.write("⚠️ Daemon did not exit in 10s — sending SIGKILL.\n");
try {
process.kill(pid, "SIGKILL");
} catch {
// already dead
}
}
removePidFile();
process.stdout.write("✅ Daemon stopped.\n");
}
export const stopCommand = defineCommand({ export const stopCommand = defineCommand({
meta: { meta: {
name: "stop", name: "stop",
description: "Stop the nerve daemon", description: "Stop the nerve daemon",
}, },
async run() { async run() {
const pid = readPidFile(); await runStopCommand();
if (pid === null) {
process.stdout.write("⚠️ No PID file found — daemon may not be running.\n");
return;
}
if (!isRunning()) {
process.stdout.write("⚠️ Daemon is not running (stale PID file). Cleaning up.\n");
removePidFile();
return;
}
process.stdout.write(`Stopping nerve daemon (pid ${pid})…\n`);
try {
process.kill(pid, "SIGTERM");
} catch (e) {
const msg = e instanceof Error ? e.message : String(e);
process.stderr.write(`❌ Failed to send SIGTERM: ${msg}\n`);
process.exit(1);
}
const graceful = await waitForExit(pid, 10_000);
if (!graceful) {
process.stdout.write("⚠️ Daemon did not exit in 10s — sending SIGKILL.\n");
try {
process.kill(pid, "SIGKILL");
} catch {
// already dead
}
}
removePidFile();
process.stdout.write("✅ Daemon stopped.\n");
}, },
}); });
+70
View File
@@ -0,0 +1,70 @@
import { existsSync } from "node:fs";
import { join } from "node:path";
import { defineCommand } from "citty";
import { loadDaemonModule } from "../workspace-daemon.js";
import { getNerveRoot } from "../workspace.js";
// ---------------------------------------------------------------------------
// nerve store archive
// ---------------------------------------------------------------------------
const storeArchiveCommand = defineCommand({
meta: {
name: "archive",
description:
"Export logs older than 30 days from logs.db to data/archive/logs/YYYY-MM-DD.jsonl and delete those rows (RFC-001 §5.4)",
},
args: {
vacuum: {
type: "boolean",
description: "Run SQLite VACUUM after archiving",
default: false,
},
},
async run({ args }) {
const nerveRoot = getNerveRoot();
const dbPath = join(nerveRoot, "data", "logs.db");
if (!existsSync(dbPath)) {
process.stderr.write("❌ No data/logs.db found — start the daemon at least once.\n");
process.exit(1);
}
const { createLogStore } = await loadDaemonModule(nerveRoot);
const store = createLogStore(dbPath);
try {
const result = store.archiveLogs({ vacuum: args.vacuum });
if (result.days.length === 0) {
process.stdout.write(
"✅ Nothing to archive (no eligible UTC days beyond the 30-day window).\n",
);
} else {
process.stdout.write(`✅ Archived ${result.days.length} day(s):\n`);
for (const d of result.days) {
process.stdout.write(` ${d.day} rows=${d.rowCount} ${d.filePath}\n`);
}
}
if (result.vacuumed) {
process.stdout.write(" VACUUM completed.\n");
}
} finally {
store.close();
}
},
});
// ---------------------------------------------------------------------------
// nerve store
// ---------------------------------------------------------------------------
export const storeCommand = defineCommand({
meta: {
name: "store",
description: "Maintain local Nerve SQLite stores (log cold-archive, …)",
},
subCommands: {
archive: storeArchiveCommand,
},
});
-2
View File
@@ -1,5 +1,3 @@
#!/usr/bin/env node
import { runForegroundKernelSession } from "./run-foreground-kernel.js"; import { runForegroundKernelSession } from "./run-foreground-kernel.js";
import { loadDaemonModule } from "./workspace-daemon.js"; import { loadDaemonModule } from "./workspace-daemon.js";
+40 -85
View File
@@ -8,18 +8,14 @@
import { connect } from "node:net"; import { connect } from "node:net";
import type { Socket } from "node:net"; import type { Socket } from "node:net";
import type { SenseInfo } from "@uncaged/nerve-core";
const CONNECT_TIMEOUT_MS = 3_000; const CONNECT_TIMEOUT_MS = 3_000;
const RESPONSE_TIMEOUT_MS = 5_000; const RESPONSE_TIMEOUT_MS = 5_000;
type TriggerResponse = { ok: true } | { ok: false; error: string }; export type { SenseInfo };
export type SenseInfo = { type TriggerResponse = { ok: true } | { ok: false; error: string };
name: string;
group: string;
throttle: number | null;
timeout: number | null;
lastSignalTs: number | null;
};
type ListSensesResponse = { ok: true; senses: SenseInfo[] } | { ok: false; error: string }; type ListSensesResponse = { ok: true; senses: SenseInfo[] } | { ok: false; error: string };
@@ -37,12 +33,36 @@ function parseDaemonResponse(line: string): TriggerResponse {
return { ok: false, error: `Unexpected daemon response: ${line}` }; return { ok: false, error: `Unexpected daemon response: ${line}` };
} }
function sendAndReceive(socketPath: string, message: object): Promise<TriggerResponse> { function parseListSensesResponse(line: string): ListSensesResponse {
try {
const obj = JSON.parse(line) as unknown;
if (obj !== null && typeof obj === "object") {
const r = obj as Record<string, unknown>;
if (r.ok === false && typeof r.error === "string") return { ok: false, error: r.error };
if (r.ok === true && Array.isArray(r.senses))
return { ok: true, senses: r.senses as SenseInfo[] };
}
} catch {
// fall through
}
return { ok: false, error: `Unexpected daemon response: ${line}` };
}
/**
* Connect to the daemon socket, send one JSON request (newline-terminated),
* and resolve with the first non-empty line parsed by `parseFirstLine`.
*/
function sendAndReceive<T>(
socketPath: string,
message: object,
parseFirstLine: (trimmed: string) => T,
responseTimeoutMs: number = RESPONSE_TIMEOUT_MS,
): Promise<T> {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
let socket: Socket | null = null; let socket: Socket | null = null;
let settled = false; let settled = false;
function settle(result: TriggerResponse | Error): void { function settle(result: T | Error): void {
if (settled) return; if (settled) return;
settled = true; settled = true;
if (socket !== null) { if (socket !== null) {
@@ -65,7 +85,7 @@ function sendAndReceive(socketPath: string, message: object): Promise<TriggerRes
const responseTimer = setTimeout(() => { const responseTimer = setTimeout(() => {
settle(new Error("Timed out waiting for daemon response")); settle(new Error("Timed out waiting for daemon response"));
}, RESPONSE_TIMEOUT_MS); }, responseTimeoutMs);
let buf = ""; let buf = "";
socket?.on("data", (chunk: Buffer) => { socket?.on("data", (chunk: Buffer) => {
@@ -76,7 +96,7 @@ function sendAndReceive(socketPath: string, message: object): Promise<TriggerRes
const trimmed = line.trim(); const trimmed = line.trim();
if (trimmed.length === 0) continue; if (trimmed.length === 0) continue;
clearTimeout(responseTimer); clearTimeout(responseTimer);
settle(parseDaemonResponse(trimmed)); settle(parseFirstLine(trimmed));
return; return;
} }
}); });
@@ -101,18 +121,19 @@ export function triggerWorkflowViaDaemon(
workflow: string, workflow: string,
payload: unknown, payload: unknown,
): Promise<TriggerResponse> { ): Promise<TriggerResponse> {
return sendAndReceive(socketPath, { type: "trigger-workflow", workflow, payload }); return sendAndReceive(
socketPath,
{ type: "trigger-workflow", workflow, payload },
parseDaemonResponse,
);
} }
/** /**
* Send a trigger-sense message to the running daemon via its Unix socket. * Send a trigger-sense message to the running daemon via its Unix socket.
* Resolves with the daemon's response or rejects on connection/timeout errors. * Resolves with the daemon's response or rejects on connection/timeout errors.
*/ */
export function triggerSenseViaDaemon( export function triggerSenseViaDaemon(socketPath: string, sense: string): Promise<TriggerResponse> {
socketPath: string, return sendAndReceive(socketPath, { type: "trigger-sense", sense }, parseDaemonResponse);
sense: string,
): Promise<TriggerResponse> {
return sendAndReceive(socketPath, { type: "trigger-sense", sense });
} }
/** /**
@@ -120,71 +141,5 @@ export function triggerSenseViaDaemon(
* Resolves with the list of registered senses or rejects on connection/timeout errors. * Resolves with the list of registered senses or rejects on connection/timeout errors.
*/ */
export function listSensesViaDaemon(socketPath: string): Promise<ListSensesResponse> { export function listSensesViaDaemon(socketPath: string): Promise<ListSensesResponse> {
return new Promise((resolve, reject) => { return sendAndReceive(socketPath, { type: "list-senses" }, parseListSensesResponse);
let socket: Socket | null = null;
let settled = false;
function settle(result: ListSensesResponse | Error): void {
if (settled) return;
settled = true;
if (socket !== null) {
socket.destroy();
socket = null;
}
if (result instanceof Error) {
reject(result);
} else {
resolve(result);
}
}
const connectTimer = setTimeout(() => {
settle(new Error(`Timed out connecting to daemon socket: ${socketPath}`));
}, CONNECT_TIMEOUT_MS);
socket = connect(socketPath, () => {
clearTimeout(connectTimer);
const responseTimer = setTimeout(() => {
settle(new Error("Timed out waiting for daemon response"));
}, RESPONSE_TIMEOUT_MS);
let buf = "";
socket?.on("data", (chunk: Buffer) => {
buf += chunk.toString("utf8");
const lines = buf.split("\n");
buf = lines.pop() ?? "";
for (const line of lines) {
const trimmed = line.trim();
if (trimmed.length === 0) continue;
clearTimeout(responseTimer);
try {
const obj = JSON.parse(trimmed) as unknown;
if (obj !== null && typeof obj === "object") {
const r = obj as Record<string, unknown>;
if (r.ok === false && typeof r.error === "string") {
settle({ ok: false, error: r.error });
return;
}
if (r.ok === true && Array.isArray(r.senses)) {
settle({ ok: true, senses: r.senses as SenseInfo[] });
return;
}
}
} catch {
// fall through
}
settle({ ok: false, error: `Unexpected daemon response: ${trimmed}` });
return;
}
});
socket?.write(`${JSON.stringify({ type: "list-senses" })}\n`);
});
socket.on("error", (err) => {
clearTimeout(connectTimer);
settle(new Error(`Cannot connect to daemon: ${err.message}`));
});
});
} }
+19
View File
@@ -40,6 +40,24 @@ export type LogQuery = {
limit?: number; limit?: number;
}; };
export type ArchiveLogsOptions = {
now?: number;
vacuum?: boolean;
maxDays?: number;
retentionMs?: number;
};
export type ArchiveLogsDayResult = {
day: string;
rowCount: number;
filePath: string;
};
export type ArchiveLogsResult = {
days: ArchiveLogsDayResult[];
vacuumed: boolean;
};
/** Subset of daemon LogStore used by the CLI workflow commands. */ /** Subset of daemon LogStore used by the CLI workflow commands. */
export type LogStore = { export type LogStore = {
query: (filter?: LogQuery) => LogEntry[]; query: (filter?: LogQuery) => LogEntry[];
@@ -47,5 +65,6 @@ export type LogStore = {
getActiveWorkflowRuns: (workflowName?: string) => WorkflowRun[]; getActiveWorkflowRuns: (workflowName?: string) => WorkflowRun[];
getAllWorkflowRuns: (workflowName: string | null) => WorkflowRun[]; getAllWorkflowRuns: (workflowName: string | null) => WorkflowRun[];
upsertWorkflowRun: (entry: Omit<LogEntry, "id">, run: WorkflowRun) => LogEntry; upsertWorkflowRun: (entry: Omit<LogEntry, "id">, run: WorkflowRun) => LogEntry;
archiveLogs: (options?: ArchiveLogsOptions) => ArchiveLogsResult;
close: () => void; close: () => void;
}; };
+2 -1
View File
@@ -1,10 +1,11 @@
{ {
"name": "@uncaged/nerve-core", "name": "@uncaged/nerve-core",
"version": "0.1.2", "version": "0.1.4",
"type": "module", "type": "module",
"main": "dist/index.js", "main": "dist/index.js",
"types": "dist/index.d.ts", "types": "dist/index.d.ts",
"scripts": { "scripts": {
"prepublishOnly": "bash ../../scripts/prepublish-check.sh",
"build": "tsup", "build": "tsup",
"test": "vitest run" "test": "vitest run"
}, },
+1
View File
@@ -1,6 +1,7 @@
export type { export type {
Signal, Signal,
SenseConfig, SenseConfig,
SenseInfo,
SenseReflexConfig, SenseReflexConfig,
WorkflowReflexConfig, WorkflowReflexConfig,
ReflexConfig, ReflexConfig,
+9
View File
@@ -12,6 +12,15 @@ export type SenseConfig = {
gracePeriod: number | null; gracePeriod: number | null;
}; };
/** Runtime metadata for a sense (e.g. daemon list-senses IPC). */
export type SenseInfo = {
name: string;
group: string;
throttle: number | null;
timeout: number | null;
lastSignalTs: number | null;
};
export type SenseReflexConfig = { export type SenseReflexConfig = {
kind: "sense"; kind: "sense";
sense: string; sense: string;
+2 -1
View File
@@ -1,6 +1,6 @@
{ {
"name": "@uncaged/nerve-daemon", "name": "@uncaged/nerve-daemon",
"version": "0.1.3", "version": "0.1.5",
"type": "module", "type": "module",
"main": "dist/index.js", "main": "dist/index.js",
"types": "dist/index.d.ts", "types": "dist/index.d.ts",
@@ -11,6 +11,7 @@
"access": "public" "access": "public"
}, },
"scripts": { "scripts": {
"prepublishOnly": "bash ../../scripts/prepublish-check.sh",
"build": "tsup", "build": "tsup",
"test": "vitest run" "test": "vitest run"
}, },
@@ -0,0 +1,105 @@
import { createHash } from "node:crypto";
import { existsSync, readdirSync, writeFileSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { describe, expect, it } from "vitest";
import { createBlobStore, normalizeBlobHash } from "../blob-store.js";
function makeRoot(): string {
return join(tmpdir(), `nerve-blob-${Date.now()}-${Math.random().toString(16).slice(2)}`);
}
describe("normalizeBlobHash", () => {
it("accepts 64-char lowercase hex", () => {
const h = "a".repeat(64);
expect(normalizeBlobHash(h)).toBe(h);
});
it("normalizes uppercase to lowercase", () => {
const h = "A".repeat(64);
expect(normalizeBlobHash(h)).toBe("a".repeat(64));
});
it("rejects wrong length and non-hex", () => {
expect(normalizeBlobHash("ab")).toBeNull();
expect(normalizeBlobHash("g".repeat(64))).toBeNull();
});
});
describe("createBlobStore", () => {
it("write returns sha256 hex and stores under 2-char shard", () => {
const root = makeRoot();
const store = createBlobStore(root);
const content = "hello cas";
const hash = store.write(content);
expect(hash).toMatch(/^[0-9a-f]{64}$/);
expect(createHash("sha256").update(content, "utf8").digest("hex")).toBe(hash);
const shard = hash.slice(0, 2);
const rel = hash.slice(2);
const filePath = join(root, shard, rel);
expect(existsSync(filePath)).toBe(true);
});
it("read returns stored bytes and exists is true", () => {
const root = makeRoot();
const store = createBlobStore(root);
const buf = Buffer.from([0, 255, 128]);
const hash = store.write(buf);
expect(store.exists(hash)).toBe(true);
const got = store.read(hash);
expect(got).not.toBeNull();
expect(Buffer.compare(got as Buffer, buf)).toBe(0);
});
it("write is idempotent for same content", () => {
const root = makeRoot();
const store = createBlobStore(root);
const h1 = store.write("same");
const h2 = store.write("same");
expect(h1).toBe(h2);
const shard = h1.slice(0, 2);
const names = readdirSync(join(root, shard));
expect(names.filter((n: string) => !n.startsWith("."))).toHaveLength(1);
});
it("read returns null for missing blob", () => {
const root = makeRoot();
const store = createBlobStore(root);
const missing = "0".repeat(64);
expect(store.read(missing)).toBeNull();
expect(store.exists(missing)).toBe(false);
});
it("read and exists return null/false for invalid hash", () => {
const root = makeRoot();
const store = createBlobStore(root);
expect(store.read("not-a-hash")).toBeNull();
expect(store.exists("not-a-hash")).toBe(false);
});
it("throws when on-disk content does not match path hash", () => {
const root = makeRoot();
const store = createBlobStore(root);
const hash = store.write("ok");
const filePath = join(root, hash.slice(0, 2), hash.slice(2));
writeFileSync(filePath, "tampered");
expect(() => store.read(hash)).toThrow(/CAS mismatch/i);
});
it("write throws when an existing file at the digest path has wrong content", () => {
const root = makeRoot();
const store = createBlobStore(root);
const hash = store.write("truth");
const filePath = join(root, hash.slice(0, 2), hash.slice(2));
writeFileSync(filePath, "lies");
expect(() => store.write("truth")).toThrow(/CAS mismatch/i);
});
});
@@ -91,6 +91,7 @@ function makeLogStore(
}), }),
getTriggerPayload: vi.fn(() => ({ value: 42 })), getTriggerPayload: vi.fn(() => ({ value: 42 })),
getThreadEvents: vi.fn(() => [{ type: "thread_start", triggerPayload: {} }]), getThreadEvents: vi.fn(() => [{ type: "thread_start", triggerPayload: {} }]),
archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })),
close: vi.fn(), close: vi.fn(),
}; };
return store; return store;
@@ -77,6 +77,7 @@ function makeLogStore() {
getActiveWorkflowRuns: vi.fn(() => []), getActiveWorkflowRuns: vi.fn(() => []),
getTriggerPayload: vi.fn(() => null), getTriggerPayload: vi.fn(() => null),
getThreadEvents: vi.fn(() => []), getThreadEvents: vi.fn(() => []),
archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })),
close: vi.fn(), close: vi.fn(),
}; };
} }
@@ -74,6 +74,7 @@ function makeMockLogStore() {
getAllWorkflowRuns: vi.fn(() => []), getAllWorkflowRuns: vi.fn(() => []),
getTriggerPayload: vi.fn(() => null), getTriggerPayload: vi.fn(() => null),
getThreadEvents: vi.fn(() => []), getThreadEvents: vi.fn(() => []),
archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })),
close: vi.fn(), close: vi.fn(),
}; };
} }
@@ -80,6 +80,7 @@ function makeLogStore() {
getActiveWorkflowRuns: vi.fn(() => []), getActiveWorkflowRuns: vi.fn(() => []),
getTriggerPayload: vi.fn(() => null), getTriggerPayload: vi.fn(() => null),
getThreadEvents: vi.fn(() => []), getThreadEvents: vi.fn(() => []),
archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })),
close: vi.fn(), close: vi.fn(),
}; };
} }
@@ -1,4 +1,7 @@
import { EventEmitter } from "node:events"; import { EventEmitter } from "node:events";
import { mkdtempSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import type { NerveConfig } from "@uncaged/nerve-core"; import type { NerveConfig } from "@uncaged/nerve-core";
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
@@ -44,6 +47,7 @@ vi.mock("node:child_process", () => ({
// Import after mock is set up // Import after mock is set up
const { createKernel } = await import("../kernel.js"); const { createKernel } = await import("../kernel.js");
const { createLogStore } = await import("../log-store.js");
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
// Helpers // Helpers
@@ -93,6 +97,29 @@ describe("kernel — message routing", () => {
await kernel.stop(); await kernel.stop();
}); });
it("persists emitted signals as sense/signal log entries", async () => {
const tmpDir = mkdtempSync(join(tmpdir(), "nerve-kernel-sig-"));
const logStore = createLogStore(join(tmpDir, "logs.db"));
try {
const config = makeConfig({
senses: {
"cpu-usage": { group: "system", throttle: null, timeout: null, gracePeriod: null },
},
reflexes: [],
});
const kernel = createKernel(config, tmpDir, { logStore });
const child = mockChildren[0];
child.emit("message", { type: "ready" });
child.emit("message", { type: "signal", sense: "cpu-usage", payload: 123 });
const rows = logStore.query({ source: "sense", type: "signal", refId: "cpu-usage" });
expect(rows).toHaveLength(1);
expect(rows[0].payload).toBe(JSON.stringify(123));
await kernel.stop();
} finally {
rmSync(tmpDir, { recursive: true, force: true });
}
});
it("routes error message to stderr", async () => { it("routes error message to stderr", async () => {
const stderrSpy = vi.spyOn(process.stderr, "write").mockImplementation(() => true); const stderrSpy = vi.spyOn(process.stderr, "write").mockImplementation(() => true);
const config = makeConfig({ const config = makeConfig({
@@ -0,0 +1,40 @@
import { describe, expect, it } from "vitest";
import {
assertValidUtcDay,
compareIsoDays,
lastArchivableUtcDay,
nextUtcDay,
prevUtcDay,
utcDateStringFromMs,
utcDayEndExclusiveMs,
utcDayStartMs,
} from "../log-archive.js";
describe("log-archive UTC helpers", () => {
it("lastArchivableUtcDay matches RFC-style boundary (exclusive end of day ≤ boundary)", () => {
const boundary = Date.UTC(2026, 1, 2, 12, 0, 0); // 2026-02-02 12:00 UTC
expect(lastArchivableUtcDay(boundary)).toBe("2026-02-01");
});
it("round-trips UTC day bounds", () => {
expect(utcDayStartMs("2026-02-01")).toBe(Date.UTC(2026, 1, 1));
expect(utcDayEndExclusiveMs("2026-02-01")).toBe(Date.UTC(2026, 1, 2));
expect(utcDateStringFromMs(Date.UTC(2026, 1, 1, 23, 59))).toBe("2026-02-01");
});
it("nextUtcDay / prevUtcDay", () => {
expect(nextUtcDay("2026-02-01")).toBe("2026-02-02");
expect(prevUtcDay("2026-02-01")).toBe("2026-01-31");
});
it("compareIsoDays sorts lexicographically for YYYY-MM-DD", () => {
expect(compareIsoDays("2026-01-01", "2026-02-01")).toBeLessThan(0);
expect(compareIsoDays("2026-02-01", "2026-02-01")).toBe(0);
});
it("assertValidUtcDay rejects invalid calendars", () => {
expect(() => assertValidUtcDay("2026-02-31")).toThrow();
expect(() => assertValidUtcDay("bad")).toThrow();
});
});
@@ -0,0 +1,139 @@
import { mkdtempSync, readFileSync, rmSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { LOG_ARCHIVE_META_KEY, createLogStore } from "../log-store.js";
import type { LogStore } from "../log-store.js";
const DAY_MS = 86_400_000;
/** `now` such that 2026-02-01 is the last archivable UTC day under a 30-day window. */
function nowForLastArchivableFeb1(): number {
const boundary = Date.UTC(2026, 1, 2, 12, 0, 0);
return boundary + 30 * DAY_MS;
}
describe("LogStore — cold archive (RFC-001 §5.4)", () => {
let tmpDir: string;
let store: LogStore;
beforeEach(() => {
tmpDir = mkdtempSync(join(tmpdir(), "nerve-archive-"));
store = createLogStore(join(tmpDir, "data", "logs.db"));
});
afterEach(() => {
store.close();
rmSync(tmpDir, { recursive: true, force: true });
});
it("exports one UTC day to JSONL, deletes rows, advances archived_up_to", () => {
const ts = Date.UTC(2026, 1, 1, 10, 0, 0);
store.append({ source: "system", type: "x", refId: null, payload: '{"a":1}', ts });
store.append({ source: "reflex", type: "y", refId: "z", payload: null, ts: ts + 1 });
const now = nowForLastArchivableFeb1();
const result = store.archiveLogs({ now, retentionMs: 30 * DAY_MS });
expect(result.days).toHaveLength(1);
expect(result.days[0].day).toBe("2026-02-01");
expect(result.days[0].rowCount).toBe(2);
const jsonlPath = join(tmpDir, "data", "archive", "logs", "2026-02-01.jsonl");
expect(result.days[0].filePath).toBe(jsonlPath);
const lines = readFileSync(jsonlPath, "utf8").trim().split("\n");
expect(lines).toHaveLength(2);
const o = JSON.parse(lines[0] ?? "{}") as Record<string, unknown>;
expect(o.source).toBe("system");
expect(o.refId).toBeNull();
expect(store.query()).toHaveLength(0);
expect(store.getMeta(LOG_ARCHIVE_META_KEY)).toBe("2026-02-01");
});
it("returns nothing for an empty logs table", () => {
const r = store.archiveLogs({ now: nowForLastArchivableFeb1(), retentionMs: 30 * DAY_MS });
expect(r.days).toHaveLength(0);
expect(store.getMeta(LOG_ARCHIVE_META_KEY)).toBeNull();
});
it("does nothing when all logs are inside the hot window", () => {
const now = Date.UTC(2026, 3, 23, 12, 0, 0);
const ts = now - 5 * DAY_MS;
store.append({ source: "system", type: "warm", refId: null, payload: null, ts });
const r = store.archiveLogs({ now, retentionMs: 30 * DAY_MS });
expect(r.days).toHaveLength(0);
expect(store.query()).toHaveLength(1);
});
it("second archive with same clock is a no-op (watermark already caught up)", () => {
const ts = Date.UTC(2026, 1, 1, 10, 0, 0);
store.append({ source: "system", type: "x", refId: null, payload: null, ts });
const now = nowForLastArchivableFeb1();
store.archiveLogs({ now, retentionMs: 30 * DAY_MS });
const path = join(tmpDir, "data", "archive", "logs", "2026-02-01.jsonl");
const first = readFileSync(path, "utf8");
const second = store.archiveLogs({ now, retentionMs: 30 * DAY_MS });
expect(second.days).toHaveLength(0);
expect(readFileSync(path, "utf8")).toBe(first);
});
it("overwrites JSONL when the same UTC day is archived again after watermark rewind", () => {
const ts = Date.UTC(2026, 1, 1, 10, 0, 0);
store.append({ source: "a", type: "1", refId: null, payload: null, ts });
const now = nowForLastArchivableFeb1();
store.archiveLogs({ now, retentionMs: 30 * DAY_MS });
store.setMeta(LOG_ARCHIVE_META_KEY, "2026-01-31");
store.append({ source: "b", type: "2", refId: null, payload: null, ts: ts + 100 });
store.archiveLogs({ now, retentionMs: 30 * DAY_MS });
const path = join(tmpDir, "data", "archive", "logs", "2026-02-01.jsonl");
const lines = readFileSync(path, "utf8").trim().split("\n");
expect(lines).toHaveLength(1);
expect(JSON.parse(lines[0] ?? "{}").source).toBe("b");
});
it("respects maxDays across invocations", () => {
const t1 = Date.UTC(2026, 1, 1, 10, 0, 0);
const t2 = Date.UTC(2026, 1, 2, 10, 0, 0);
store.append({ source: "system", type: "a", refId: null, payload: null, ts: t1 });
store.append({ source: "system", type: "b", refId: null, payload: null, ts: t2 });
const now = Date.UTC(2027, 0, 1, 12, 0, 0);
const r1 = store.archiveLogs({ now, retentionMs: 30 * DAY_MS, maxDays: 1 });
expect(r1.days).toHaveLength(1);
expect(r1.days[0].day).toBe("2026-02-01");
const r2 = store.archiveLogs({ now, retentionMs: 30 * DAY_MS, maxDays: 1 });
expect(r2.days).toHaveLength(1);
expect(r2.days[0].day).toBe("2026-02-02");
expect(store.getMeta(LOG_ARCHIVE_META_KEY)).toBe("2026-02-02");
expect(store.query()).toHaveLength(0);
});
it("starts from earliest log day when it is before watermark+1", () => {
store.setMeta(LOG_ARCHIVE_META_KEY, "2026-01-10");
const ts = Date.UTC(2026, 1, 1, 10, 0, 0);
store.append({ source: "x", type: "p", refId: null, payload: null, ts });
const result = store.archiveLogs({ now: nowForLastArchivableFeb1(), retentionMs: 30 * DAY_MS });
expect(result.days.map((d) => d.day)).toContain("2026-02-01");
});
it("throws on invalid archived_up_to watermark", () => {
store.setMeta(LOG_ARCHIVE_META_KEY, "not-a-date");
expect(() => store.archiveLogs({ now: Date.now() })).toThrow();
});
it("runs VACUUM when vacuum: true", () => {
const ts = Date.UTC(2026, 1, 1, 10, 0, 0);
store.append({ source: "system", type: "x", refId: null, payload: null, ts });
const r = store.archiveLogs({
now: nowForLastArchivableFeb1(),
retentionMs: 30 * DAY_MS,
vacuum: true,
});
expect(r.vacuumed).toBe(true);
});
});
@@ -7,6 +7,7 @@ import { drizzle } from "drizzle-orm/better-sqlite3";
import { integer, real, sqliteTable } from "drizzle-orm/sqlite-core"; import { integer, real, sqliteTable } from "drizzle-orm/sqlite-core";
import { describe, expect, it } from "vitest"; import { describe, expect, it } from "vitest";
import { createBlobStore } from "../blob-store.js";
import { parseParentMessage } from "../ipc.js"; import { parseParentMessage } from "../ipc.js";
import { executeCompute, openPeerDb, openSenseDb, runMigrations } from "../sense-runtime.js"; import { executeCompute, openPeerDb, openSenseDb, runMigrations } from "../sense-runtime.js";
import type { DrizzleDB, PeerMap, SenseRuntime } from "../sense-runtime.js"; import type { DrizzleDB, PeerMap, SenseRuntime } from "../sense-runtime.js";
@@ -340,6 +341,20 @@ describe("executeCompute", () => {
expect(capturedSignal).toBeInstanceOf(AbortSignal); expect(capturedSignal).toBeInstanceOf(AbortSignal);
sqlite.close(); sqlite.close();
}); });
it("passes BlobStore as options.blobs when blobStore argument is provided", async () => {
const blobsRoot = mkdtempSync(join(tmpdir(), "nerve-blobs-"));
const blobStore = createBlobStore(blobsRoot);
let seen: ReturnType<typeof createBlobStore> | undefined;
const { runtime, sqlite } = makeRuntime(async (_db, _peers, options) => {
seen = options?.blobs;
return null;
});
await executeCompute(runtime, emptyPeers, undefined, blobStore);
expect(seen).toBe(blobStore);
sqlite.close();
});
}); });
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
@@ -74,6 +74,7 @@ function makeLogStore() {
getActiveWorkflowRuns: vi.fn(() => []), getActiveWorkflowRuns: vi.fn(() => []),
getTriggerPayload: vi.fn(() => null), getTriggerPayload: vi.fn(() => null),
getThreadEvents: vi.fn(() => []), getThreadEvents: vi.fn(() => []),
archiveLogs: vi.fn(() => ({ days: [], vacuumed: false })),
close: vi.fn(), close: vi.fn(),
}; };
} }
+106
View File
@@ -0,0 +1,106 @@
/**
* CAS blob store — sha256 content-addressable files under `data/blobs/`.
*
* Layout: `<root>/<2-hex-shard>/<62-hex-rest>` (RFC-001 §8).
*/
import { createHash, randomBytes } from "node:crypto";
import {
existsSync,
mkdirSync,
readFileSync,
renameSync,
unlinkSync,
writeFileSync,
} from "node:fs";
import { dirname, join } from "node:path";
const SHA256_HEX_LEN = 64;
const HEX_RE = /^[0-9a-f]+$/;
export type BlobStore = {
/** Persist UTF-8 or raw bytes; returns lowercase hex sha256. Idempotent for identical content. */
write: (content: string | Uint8Array | Buffer) => string;
/** Returns bytes or null if the hash is invalid or no blob exists. Verifies digest matches path. */
read: (hash: string) => Buffer | null;
/** True when hash is well-formed and the blob file is present. */
exists: (hash: string) => boolean;
};
function toBuffer(content: string | Uint8Array | Buffer): Buffer {
if (typeof content === "string") return Buffer.from(content, "utf8");
if (Buffer.isBuffer(content)) return content;
return Buffer.from(content);
}
function digestHex(buf: Buffer): string {
return createHash("sha256").update(buf).digest("hex");
}
/** @returns normalized lowercase hex or null if not a valid sha256 hex string */
export function normalizeBlobHash(hash: string): string | null {
const h = hash.trim().toLowerCase();
if (h.length !== SHA256_HEX_LEN) return null;
if (!HEX_RE.test(h)) return null;
return h;
}
function pathForHash(blobsRoot: string, hashLower: string): string {
return join(blobsRoot, hashLower.slice(0, 2), hashLower.slice(2));
}
function verifyPathMatchesContent(filePath: string, expectedHash: string): Buffer {
const data = readFileSync(filePath);
const actual = digestHex(data);
if (actual !== expectedHash) {
throw new Error(
`Blob CAS mismatch at "${filePath}": file digests to ${actual}, path expects ${expectedHash}`,
);
}
return data;
}
export function createBlobStore(blobsRoot: string): BlobStore {
function write(content: string | Uint8Array | Buffer): string {
const buf = toBuffer(content);
const hash = digestHex(buf);
const filePath = pathForHash(blobsRoot, hash);
if (existsSync(filePath)) {
verifyPathMatchesContent(filePath, hash);
return hash;
}
mkdirSync(dirname(filePath), { recursive: true });
const tmp = join(dirname(filePath), `.tmp.${randomBytes(16).toString("hex")}`);
try {
writeFileSync(tmp, buf);
renameSync(tmp, filePath);
} catch (e) {
try {
unlinkSync(tmp);
} catch {
// ignore cleanup errors
}
throw e;
}
return hash;
}
function read(hash: string): Buffer | null {
const h = normalizeBlobHash(hash);
if (h === null) return null;
const filePath = pathForHash(blobsRoot, h);
if (!existsSync(filePath)) return null;
return verifyPathMatchesContent(filePath, h);
}
function exists(hash: string): boolean {
const h = normalizeBlobHash(hash);
if (h === null) return false;
return existsSync(pathForHash(blobsRoot, h));
}
return { write, read, exists };
}
+4 -9
View File
@@ -13,8 +13,12 @@
import { rmSync } from "node:fs"; import { rmSync } from "node:fs";
import { type Server, type Socket, createServer } from "node:net"; import { type Server, type Socket, createServer } from "node:net";
import type { SenseInfo } from "@uncaged/nerve-core";
import type { WorkflowManager } from "./workflow-manager.js"; import type { WorkflowManager } from "./workflow-manager.js";
export type { SenseInfo };
/** JSON message sent by the CLI to trigger a workflow. */ /** JSON message sent by the CLI to trigger a workflow. */
export type TriggerWorkflowRequest = { export type TriggerWorkflowRequest = {
type: "trigger-workflow"; type: "trigger-workflow";
@@ -33,15 +37,6 @@ export type ListSensesRequest = {
type: "list-senses"; type: "list-senses";
}; };
/** Runtime info about a single sense returned by list-senses. */
export type SenseInfo = {
name: string;
group: string;
throttle: number | null;
timeout: number | null;
lastSignalTs: number | null;
};
type DaemonRequest = TriggerWorkflowRequest | TriggerSenseRequest | ListSensesRequest; type DaemonRequest = TriggerWorkflowRequest | TriggerSenseRequest | ListSensesRequest;
type DaemonResponse = type DaemonResponse =
+9 -1
View File
@@ -29,16 +29,24 @@ export {
export { createKernel } from "./kernel.js"; export { createKernel } from "./kernel.js";
export type { Kernel, KernelOptions, KernelHealth } from "./kernel.js"; export type { Kernel, KernelOptions, KernelHealth } from "./kernel.js";
export type { SenseInfo } from "./daemon-ipc.js";
export { createFileWatcher } from "./file-watcher.js"; export { createFileWatcher } from "./file-watcher.js";
export type { FileWatcher, FileChange, FileChangeHandler } from "./file-watcher.js"; export type { FileWatcher, FileChange, FileChangeHandler } from "./file-watcher.js";
export { createLogStore } from "./log-store.js"; export { createBlobStore, normalizeBlobHash } from "./blob-store.js";
export type { BlobStore } from "./blob-store.js";
export { createLogStore, LOG_ARCHIVE_META_KEY } from "./log-store.js";
export type { export type {
LogStore, LogStore,
LogEntry, LogEntry,
LogQuery, LogQuery,
WorkflowRun, WorkflowRun,
WorkflowRunStatus, WorkflowRunStatus,
ArchiveLogsDayResult,
ArchiveLogsOptions,
ArchiveLogsResult,
} from "./log-store.js"; } from "./log-store.js";
export { createWorkflowManager } from "./workflow-manager.js"; export { createWorkflowManager } from "./workflow-manager.js";
+11 -7
View File
@@ -18,11 +18,11 @@ import { readFileSync } from "node:fs";
import { dirname, join } from "node:path"; import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url"; import { fileURLToPath } from "node:url";
import type { NerveConfig, Signal } from "@uncaged/nerve-core"; import type { NerveConfig, SenseInfo, Signal } from "@uncaged/nerve-core";
import { parseNerveConfig } from "@uncaged/nerve-core"; import { parseNerveConfig } from "@uncaged/nerve-core";
import { createDaemonIpcServer } from "./daemon-ipc.js"; import { createDaemonIpcServer } from "./daemon-ipc.js";
import type { DaemonIpcServer, SenseInfo } from "./daemon-ipc.js"; import type { DaemonIpcServer } from "./daemon-ipc.js";
import { createFileWatcher } from "./file-watcher.js"; import { createFileWatcher } from "./file-watcher.js";
import type { FileWatcher } from "./file-watcher.js"; import type { FileWatcher } from "./file-watcher.js";
import type { ComputeMessage, ShutdownMessage } from "./ipc.js"; import type { ComputeMessage, ShutdownMessage } from "./ipc.js";
@@ -89,7 +89,11 @@ function spawnWorker(nerveRoot: string, group: string, workerScript: string): Ch
stdio: ["ignore", "inherit", "inherit", "ipc"], stdio: ["ignore", "inherit", "inherit", "ipc"],
}); });
// Prevent unhandled EPIPE when writing to a child whose IPC channel closed // Prevent unhandled EPIPE when writing to a child whose IPC channel closed
child.on("error", () => {}); child.on("error", (err) => {
if ((err as NodeJS.ErrnoException).code !== "EPIPE") {
console.error("[worker] error:", err.message);
}
});
return child; return child;
} }
@@ -222,8 +226,8 @@ export function createKernel(
ts: Date.now(), ts: Date.now(),
}; };
logStore.append({ logStore.append({
source: "reflex", source: "sense",
type: "run_complete", type: "signal",
refId: msg.sense, refId: msg.sense,
payload: JSON.stringify(msg.payload), payload: JSON.stringify(msg.payload),
ts: signal.ts, ts: signal.ts,
@@ -524,8 +528,8 @@ export function createKernel(
listSenses(): SenseInfo[] { listSenses(): SenseInfo[] {
return Object.entries(config.senses).map(([name, senseConfig]) => { return Object.entries(config.senses).map(([name, senseConfig]) => {
const entries = logStore.query({ const entries = logStore.query({
source: "reflex", source: "sense",
type: "run_complete", type: "signal",
refId: name, refId: name,
}); });
const lastEntry = entries.length > 0 ? entries[entries.length - 1] : null; const lastEntry = entries.length > 0 ? entries[entries.length - 1] : null;
+78
View File
@@ -0,0 +1,78 @@
/** Log cold-archive helpers (RFC-001 §5.4) — UTC calendar days, JSONL export. */
export const LOG_ARCHIVE_META_KEY = "archived_up_to";
export const DEFAULT_LOG_RETENTION_MS = 30 * 86_400_000;
export type ArchiveLogsOptions = {
/** Wall clock for retention boundary (default: `Date.now()`). */
now?: number;
/** Run `VACUUM` after archiving (outside the per-day transaction). */
vacuum?: boolean;
/** Max UTC days to process in one call (default: unlimited). */
maxDays?: number;
/** Override default 30-day retention (tests). */
retentionMs?: number;
};
export type ArchiveLogsDayResult = {
day: string;
rowCount: number;
filePath: string;
};
export type ArchiveLogsResult = {
days: ArchiveLogsDayResult[];
vacuumed: boolean;
};
export function utcDateStringFromMs(ms: number): string {
return new Date(ms).toISOString().slice(0, 10);
}
function parseUtcDayParts(day: string): [number, number, number] {
const m = /^(\d{4})-(\d{2})-(\d{2})$/.exec(day);
if (m === null) {
throw new Error(`Invalid UTC day (expected YYYY-MM-DD): ${day}`);
}
const y = Number(m[1]);
const mo = Number(m[2]);
const d = Number(m[3]);
const t = Date.UTC(y, mo - 1, d);
if (utcDateStringFromMs(t) !== day) {
throw new Error(`Invalid UTC calendar day: ${day}`);
}
return [y, mo, d];
}
export function assertValidUtcDay(day: string): void {
parseUtcDayParts(day);
}
export function utcDayStartMs(day: string): number {
const [y, mo, d] = parseUtcDayParts(day);
return Date.UTC(y, mo - 1, d);
}
export function utcDayEndExclusiveMs(day: string): number {
return utcDayStartMs(day) + 86_400_000;
}
export function prevUtcDay(day: string): string {
return utcDateStringFromMs(utcDayStartMs(day) - 86_400_000);
}
export function nextUtcDay(day: string): string {
return utcDateStringFromMs(utcDayEndExclusiveMs(day));
}
/** Last UTC calendar day D such that the exclusive end of D is ≤ boundaryMs. */
export function lastArchivableUtcDay(boundaryMs: number): string {
return prevUtcDay(utcDateStringFromMs(boundaryMs));
}
export function compareIsoDays(a: string, b: string): number {
if (a < b) return -1;
if (a > b) return 1;
return 0;
}
+151 -2
View File
@@ -7,11 +7,27 @@
* Also provides a `meta` key-value table for bookkeeping (e.g. archive watermarks). * Also provides a `meta` key-value table for bookkeeping (e.g. archive watermarks).
*/ */
import { mkdirSync } from "node:fs"; import { mkdirSync, writeFileSync } from "node:fs";
import { dirname } from "node:path"; import { dirname, join } from "node:path";
import Database from "better-sqlite3"; import Database from "better-sqlite3";
import type BetterSqlite3 from "better-sqlite3"; import type BetterSqlite3 from "better-sqlite3";
import {
DEFAULT_LOG_RETENTION_MS,
LOG_ARCHIVE_META_KEY,
assertValidUtcDay,
compareIsoDays,
lastArchivableUtcDay,
nextUtcDay,
utcDateStringFromMs,
utcDayEndExclusiveMs,
utcDayStartMs,
} from "./log-archive.js";
import type { ArchiveLogsDayResult, ArchiveLogsOptions, ArchiveLogsResult } from "./log-archive.js";
export { LOG_ARCHIVE_META_KEY } from "./log-archive.js";
export type { ArchiveLogsDayResult, ArchiveLogsOptions, ArchiveLogsResult } from "./log-archive.js";
export type LogEntry = { export type LogEntry = {
id?: number; id?: number;
source: string; source: string;
@@ -105,6 +121,12 @@ export type LogStore = {
* Used for crash recovery to rebuild ThreadState. * Used for crash recovery to rebuild ThreadState.
*/ */
getThreadEvents: (runId: string) => Array<{ type: string; [key: string]: unknown }>; getThreadEvents: (runId: string) => Array<{ type: string; [key: string]: unknown }>;
/**
* Export logs older than the retention window to `data/archive/logs/YYYY-MM-DD.jsonl`,
* then delete those rows and advance `meta.archived_up_to` in one transaction per day
* (RFC-001 §5.4).
*/
archiveLogs: (options?: ArchiveLogsOptions) => ArchiveLogsResult;
close: () => void; close: () => void;
}; };
@@ -138,6 +160,78 @@ CREATE INDEX IF NOT EXISTS idx_workflow_runs_status ON workflow_runs(status);
CREATE INDEX IF NOT EXISTS idx_workflow_runs_workflow ON workflow_runs(workflow); CREATE INDEX IF NOT EXISTS idx_workflow_runs_workflow ON workflow_runs(workflow);
`; `;
type SqlLogRow = {
id: number;
source: string;
type: string;
ref_id: string | null;
payload: string | null;
ts: number;
};
function buildJsonlBody(rows: SqlLogRow[]): string {
if (rows.length === 0) return "";
const lines = rows.map((r) =>
JSON.stringify({
id: r.id,
source: r.source,
type: r.type,
refId: r.ref_id,
payload: r.payload,
ts: r.ts,
}),
);
return `${lines.join("\n")}\n`;
}
function runOptionalVacuum(sqlite: BetterSqlite3.Database, vacuum?: boolean): boolean {
if (vacuum !== true) return false;
sqlite.exec("VACUUM");
return true;
}
function resolveArchiveStartDay(watermark: string | null, minDay: string): string {
if (watermark === null) return minDay;
const afterWatermark = nextUtcDay(watermark);
return compareIsoDays(minDay, afterWatermark) < 0 ? minDay : afterWatermark;
}
function runArchiveDayLoop(
dbPath: string,
options: ArchiveLogsOptions,
selectLogsForDayStmt: BetterSqlite3.Statement,
archiveDayTx: (day: string, start: number, endExclusive: number) => void,
startDay: string,
lastDay: string,
): ArchiveLogsDayResult[] {
const archiveDir = join(dirname(dbPath), "archive", "logs");
mkdirSync(archiveDir, { recursive: true });
const days: ArchiveLogsDayResult[] = [];
let d = startDay;
let processed = 0;
while (compareIsoDays(d, lastDay) <= 0) {
if (options.maxDays !== undefined && processed >= options.maxDays) {
break;
}
const start = utcDayStartMs(d);
const endExclusive = utcDayEndExclusiveMs(d);
const rows = selectLogsForDayStmt.all({ start, endExclusive }) as SqlLogRow[];
const filePath = join(archiveDir, `${d}.jsonl`);
writeFileSync(filePath, buildJsonlBody(rows), "utf8");
archiveDayTx(d, start, endExclusive);
days.push({ day: d, rowCount: rows.length, filePath });
processed += 1;
d = nextUtcDay(d);
}
return days;
}
export function createLogStore(dbPath: string): LogStore { export function createLogStore(dbPath: string): LogStore {
mkdirSync(dirname(dbPath), { recursive: true }); mkdirSync(dirname(dbPath), { recursive: true });
@@ -186,6 +280,14 @@ export function createLogStore(dbPath: string): LogStore {
"SELECT run_id, workflow, status, ts FROM workflow_runs WHERE workflow = ? ORDER BY ts DESC", "SELECT run_id, workflow, status, ts FROM workflow_runs WHERE workflow = ? ORDER BY ts DESC",
); );
const minLogTsStmt = sqlite.prepare("SELECT MIN(ts) AS m FROM logs");
const selectLogsForDayStmt = sqlite.prepare(
"SELECT id, source, type, ref_id, payload, ts FROM logs WHERE ts >= @start AND ts < @endExclusive ORDER BY id ASC",
);
const deleteLogsForDayStmt = sqlite.prepare(
"DELETE FROM logs WHERE ts >= @start AND ts < @endExclusive",
);
const upsertWorkflowRunTx = sqlite.transaction( const upsertWorkflowRunTx = sqlite.transaction(
(entry: Omit<LogEntry, "id">, run: WorkflowRun) => { (entry: Omit<LogEntry, "id">, run: WorkflowRun) => {
const info = insertStmt.run({ const info = insertStmt.run({
@@ -358,6 +460,52 @@ export function createLogStore(dbPath: string): LogStore {
return result; return result;
} }
const archiveDayTx = sqlite.transaction((day: string, start: number, endExclusive: number) => {
deleteLogsForDayStmt.run({ start, endExclusive });
setMetaStmt.run({ key: LOG_ARCHIVE_META_KEY, value: day });
});
function readWatermark(): string | null {
const raw = getMeta(LOG_ARCHIVE_META_KEY);
if (raw === null) return null;
assertValidUtcDay(raw);
return raw;
}
function firstLogUtcDay(): string | null {
const row = minLogTsStmt.get() as { m: number | null } | undefined;
const m = row?.m;
if (m === null || m === undefined) return null;
return utcDateStringFromMs(m);
}
function archiveLogs(options: ArchiveLogsOptions = {}): ArchiveLogsResult {
const now = options.now ?? Date.now();
const retentionMs = options.retentionMs ?? DEFAULT_LOG_RETENTION_MS;
const lastDay = lastArchivableUtcDay(now - retentionMs);
const watermark = readWatermark();
const minDay = firstLogUtcDay();
if (minDay === null) {
return { days: [], vacuumed: runOptionalVacuum(sqlite, options.vacuum) };
}
const startDay = resolveArchiveStartDay(watermark, minDay);
if (compareIsoDays(startDay, lastDay) > 0) {
return { days: [], vacuumed: runOptionalVacuum(sqlite, options.vacuum) };
}
const days = runArchiveDayLoop(
dbPath,
options,
selectLogsForDayStmt,
archiveDayTx,
startDay,
lastDay,
);
return { days, vacuumed: runOptionalVacuum(sqlite, options.vacuum) };
}
function close(): void { function close(): void {
sqlite.close(); sqlite.close();
} }
@@ -374,6 +522,7 @@ export function createLogStore(dbPath: string): LogStore {
getAllWorkflowRuns, getAllWorkflowRuns,
getTriggerPayload, getTriggerPayload,
getThreadEvents, getThreadEvents,
archiveLogs,
close, close,
}; };
} }
+12 -2
View File
@@ -8,6 +8,8 @@ import type { BetterSQLite3Database } from "drizzle-orm/better-sqlite3";
import type { Result } from "@uncaged/nerve-core"; import type { Result } from "@uncaged/nerve-core";
import { err, ok } from "@uncaged/nerve-core"; import { err, ok } from "@uncaged/nerve-core";
import type { BlobStore } from "./blob-store.js";
/** A Drizzle DB instance (schema-generic) */ /** A Drizzle DB instance (schema-generic) */
export type DrizzleDB = BetterSQLite3Database<Record<string, never>>; export type DrizzleDB = BetterSQLite3Database<Record<string, never>>;
@@ -17,11 +19,14 @@ export type PeerMap = Readonly<Record<string, DrizzleDB>>;
/** Options passed to a compute function */ /** Options passed to a compute function */
export type ComputeOptions = { export type ComputeOptions = {
signal: AbortSignal; signal: AbortSignal;
/** CAS under `data/blobs/`; injected by the sense worker when available. */
blobs?: BlobStore;
}; };
/** /**
* The shape every sense's index.ts must export. * The shape every sense's index.ts must export.
* Engine injects `db` (read-write), `peers` (read-only), and `options`. * Engine injects `db` (read-write), `peers` (read-only), and `options`
* (`signal`, and `blobs` when running in the sense worker — RFC-001 §8 CAS).
* Returns T when a signal should be emitted, null for silence. * Returns T when a signal should be emitted, null for silence.
*/ */
export type ComputeFn<T = unknown> = ( export type ComputeFn<T = unknown> = (
@@ -192,14 +197,19 @@ export async function loadComputeFn(senseIndexPath: string): Promise<Result<Comp
* Execute a sense's compute function with an optional soft timeout. * Execute a sense's compute function with an optional soft timeout.
* If timeoutMs is provided and compute takes longer, the AbortSignal is * If timeoutMs is provided and compute takes longer, the AbortSignal is
* triggered and an error Result is returned. * triggered and an error Result is returned.
* When `blobStore` is set, it is exposed as `options.blobs` (see RFC-001 §8).
*/ */
export async function executeCompute( export async function executeCompute(
runtime: SenseRuntime, runtime: SenseRuntime,
peers: PeerMap, peers: PeerMap,
timeoutMs?: number, timeoutMs?: number,
blobStore?: BlobStore,
): Promise<Result<unknown | null>> { ): Promise<Result<unknown | null>> {
const controller = new AbortController(); const controller = new AbortController();
const options: ComputeOptions = { signal: controller.signal }; const options: ComputeOptions =
blobStore !== undefined
? { signal: controller.signal, blobs: blobStore }
: { signal: controller.signal };
let timer: ReturnType<typeof setTimeout> | undefined; let timer: ReturnType<typeof setTimeout> | undefined;
const timeoutPromise = const timeoutPromise =
+8 -3
View File
@@ -10,6 +10,7 @@
* senses/<name>/index.js ← compiled compute * senses/<name>/index.js ← compiled compute
* senses/<name>/migrations/ ← SQL migration files * senses/<name>/migrations/ ← SQL migration files
* data/senses/<name>.db ← SQLite data file * data/senses/<name>.db ← SQLite data file
* data/blobs/<aa>/<hashrest> ← CAS (sha256), via options.blobs in compute
* nerve.yaml ← config * nerve.yaml ← config
*/ */
@@ -19,6 +20,7 @@ import { join, resolve } from "node:path";
import { parseNerveConfig } from "@uncaged/nerve-core"; import { parseNerveConfig } from "@uncaged/nerve-core";
import type { NerveConfig } from "@uncaged/nerve-core"; import type { NerveConfig } from "@uncaged/nerve-core";
import { createBlobStore } from "./blob-store.js";
import type { WorkerToParentMessage } from "./ipc.js"; import type { WorkerToParentMessage } from "./ipc.js";
import { parseParentMessage } from "./ipc.js"; import { parseParentMessage } from "./ipc.js";
import { executeCompute, loadComputeFn, openPeerDb, openSenseDb } from "./sense-runtime.js"; import { executeCompute, loadComputeFn, openPeerDb, openSenseDb } from "./sense-runtime.js";
@@ -162,9 +164,10 @@ async function runCompute(
peers: PeerMap, peers: PeerMap,
timeoutMs: number, timeoutMs: number,
gracePeriodMs: number | null, gracePeriodMs: number | null,
blobStore: ReturnType<typeof createBlobStore>,
): Promise<void> { ): Promise<void> {
try { try {
const result = await executeCompute(runtime, peers, timeoutMs); const result = await executeCompute(runtime, peers, timeoutMs, blobStore);
if (!result.ok) { if (!result.ok) {
sendError(senseName, result.error.message); sendError(senseName, result.error.message);
if (gracePeriodMs !== null && result.error.message.includes("timed out")) { if (gracePeriodMs !== null && result.error.message.includes("timed out")) {
@@ -193,6 +196,7 @@ function handleMessage(
group: string, group: string,
senseConfigs: Map<string, { timeout: number | null; gracePeriod: number | null }>, senseConfigs: Map<string, { timeout: number | null; gracePeriod: number | null }>,
inFlight: Map<string, Promise<void>>, inFlight: Map<string, Promise<void>>,
blobStore: ReturnType<typeof createBlobStore>,
): void { ): void {
const parseResult = parseParentMessage(raw); const parseResult = parseParentMessage(raw);
if (!parseResult.ok) { if (!parseResult.ok) {
@@ -230,7 +234,7 @@ function handleMessage(
const previous = inFlight.get(msg.sense) ?? Promise.resolve(); const previous = inFlight.get(msg.sense) ?? Promise.resolve();
const next = previous const next = previous
.then(() => runCompute(msg.sense, runtime, peers, timeoutMs, gracePeriodMs)) .then(() => runCompute(msg.sense, runtime, peers, timeoutMs, gracePeriodMs, blobStore))
.catch((e: unknown) => { .catch((e: unknown) => {
const errMsg = e instanceof Error ? e.message : String(e); const errMsg = e instanceof Error ? e.message : String(e);
sendError(msg.sense, errMsg); sendError(msg.sense, errMsg);
@@ -294,11 +298,12 @@ async function bootstrap(nerveRoot: string, group: string): Promise<void> {
} }
const inFlight = new Map<string, Promise<void>>(); const inFlight = new Map<string, Promise<void>>();
const blobStore = createBlobStore(join(nerveRoot, "data", "blobs"));
sendReady(); sendReady();
process.on("message", (raw: unknown) => { process.on("message", (raw: unknown) => {
handleMessage(raw, runtimes, peers, group, senseConfigs, inFlight); handleMessage(raw, runtimes, peers, group, senseConfigs, inFlight, blobStore);
}); });
} }
+5 -1
View File
@@ -90,7 +90,11 @@ function spawnWorkflowWorker(
stdio: ["ignore", "inherit", "inherit", "ipc"], stdio: ["ignore", "inherit", "inherit", "ipc"],
}); });
// Prevent unhandled EPIPE when writing to a child whose IPC channel closed // Prevent unhandled EPIPE when writing to a child whose IPC channel closed
child.on("error", () => {}); child.on("error", (err) => {
if ((err as NodeJS.ErrnoException).code !== "EPIPE") {
console.error("[worker] error:", err.message);
}
});
return child; return child;
} }
+8
View File
@@ -0,0 +1,8 @@
#!/bin/bash
# All packages must use pnpm publish. Block npm publish unconditionally.
if [ -z "$npm_execpath" ] || [[ "$npm_execpath" != *pnpm* ]]; then
echo "❌ Use 'pnpm publish' instead of 'npm publish'."
echo " pnpm auto-converts workspace:* dependencies to real versions."
exit 1
fi