diff --git a/packages/daemon/src/__tests__/blob-store.test.ts b/packages/daemon/src/__tests__/blob-store.test.ts new file mode 100644 index 0000000..6490307 --- /dev/null +++ b/packages/daemon/src/__tests__/blob-store.test.ts @@ -0,0 +1,105 @@ +import { createHash } from "node:crypto"; +import { existsSync, readdirSync, writeFileSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; + +import { describe, expect, it } from "vitest"; + +import { createBlobStore, normalizeBlobHash } from "../blob-store.js"; + +function makeRoot(): string { + return join(tmpdir(), `nerve-blob-${Date.now()}-${Math.random().toString(16).slice(2)}`); +} + +describe("normalizeBlobHash", () => { + it("accepts 64-char lowercase hex", () => { + const h = "a".repeat(64); + expect(normalizeBlobHash(h)).toBe(h); + }); + + it("normalizes uppercase to lowercase", () => { + const h = "A".repeat(64); + expect(normalizeBlobHash(h)).toBe("a".repeat(64)); + }); + + it("rejects wrong length and non-hex", () => { + expect(normalizeBlobHash("ab")).toBeNull(); + expect(normalizeBlobHash("g".repeat(64))).toBeNull(); + }); +}); + +describe("createBlobStore", () => { + it("write returns sha256 hex and stores under 2-char shard", () => { + const root = makeRoot(); + const store = createBlobStore(root); + const content = "hello cas"; + const hash = store.write(content); + + expect(hash).toMatch(/^[0-9a-f]{64}$/); + expect(createHash("sha256").update(content, "utf8").digest("hex")).toBe(hash); + + const shard = hash.slice(0, 2); + const rel = hash.slice(2); + const filePath = join(root, shard, rel); + expect(existsSync(filePath)).toBe(true); + }); + + it("read returns stored bytes and exists is true", () => { + const root = makeRoot(); + const store = createBlobStore(root); + const buf = Buffer.from([0, 255, 128]); + const hash = store.write(buf); + + expect(store.exists(hash)).toBe(true); + const got = store.read(hash); + expect(got).not.toBeNull(); + expect(Buffer.compare(got as Buffer, buf)).toBe(0); + }); + + it("write is idempotent for same content", () => { + const root = makeRoot(); + const store = createBlobStore(root); + const h1 = store.write("same"); + const h2 = store.write("same"); + expect(h1).toBe(h2); + + const shard = h1.slice(0, 2); + const names = readdirSync(join(root, shard)); + expect(names.filter((n: string) => !n.startsWith("."))).toHaveLength(1); + }); + + it("read returns null for missing blob", () => { + const root = makeRoot(); + const store = createBlobStore(root); + const missing = "0".repeat(64); + expect(store.read(missing)).toBeNull(); + expect(store.exists(missing)).toBe(false); + }); + + it("read and exists return null/false for invalid hash", () => { + const root = makeRoot(); + const store = createBlobStore(root); + expect(store.read("not-a-hash")).toBeNull(); + expect(store.exists("not-a-hash")).toBe(false); + }); + + it("throws when on-disk content does not match path hash", () => { + const root = makeRoot(); + const store = createBlobStore(root); + const hash = store.write("ok"); + const filePath = join(root, hash.slice(0, 2), hash.slice(2)); + writeFileSync(filePath, "tampered"); + + expect(() => store.read(hash)).toThrow(/CAS mismatch/i); + }); + + it("write throws when an existing file at the digest path has wrong content", () => { + const root = makeRoot(); + const store = createBlobStore(root); + const hash = store.write("truth"); + const filePath = join(root, hash.slice(0, 2), hash.slice(2)); + writeFileSync(filePath, "lies"); + + expect(() => store.write("truth")).toThrow(/CAS mismatch/i); + }); +}); diff --git a/packages/daemon/src/__tests__/sense-runtime.test.ts b/packages/daemon/src/__tests__/sense-runtime.test.ts index fddf8be..a162a1e 100644 --- a/packages/daemon/src/__tests__/sense-runtime.test.ts +++ b/packages/daemon/src/__tests__/sense-runtime.test.ts @@ -7,6 +7,7 @@ import { drizzle } from "drizzle-orm/better-sqlite3"; import { integer, real, sqliteTable } from "drizzle-orm/sqlite-core"; import { describe, expect, it } from "vitest"; +import { createBlobStore } from "../blob-store.js"; import { parseParentMessage } from "../ipc.js"; import { executeCompute, openPeerDb, openSenseDb, runMigrations } from "../sense-runtime.js"; import type { DrizzleDB, PeerMap, SenseRuntime } from "../sense-runtime.js"; @@ -340,6 +341,20 @@ describe("executeCompute", () => { expect(capturedSignal).toBeInstanceOf(AbortSignal); sqlite.close(); }); + + it("passes BlobStore as options.blobs when blobStore argument is provided", async () => { + const blobsRoot = mkdtempSync(join(tmpdir(), "nerve-blobs-")); + const blobStore = createBlobStore(blobsRoot); + let seen: ReturnType | undefined; + const { runtime, sqlite } = makeRuntime(async (_db, _peers, options) => { + seen = options?.blobs; + return null; + }); + + await executeCompute(runtime, emptyPeers, undefined, blobStore); + expect(seen).toBe(blobStore); + sqlite.close(); + }); }); // --------------------------------------------------------------------------- diff --git a/packages/daemon/src/blob-store.ts b/packages/daemon/src/blob-store.ts new file mode 100644 index 0000000..b99bbd5 --- /dev/null +++ b/packages/daemon/src/blob-store.ts @@ -0,0 +1,106 @@ +/** + * CAS blob store — sha256 content-addressable files under `data/blobs/`. + * + * Layout: `/<2-hex-shard>/<62-hex-rest>` (RFC-001 §8). + */ + +import { createHash, randomBytes } from "node:crypto"; +import { + existsSync, + mkdirSync, + readFileSync, + renameSync, + unlinkSync, + writeFileSync, +} from "node:fs"; +import { dirname, join } from "node:path"; + +const SHA256_HEX_LEN = 64; +const HEX_RE = /^[0-9a-f]+$/; + +export type BlobStore = { + /** Persist UTF-8 or raw bytes; returns lowercase hex sha256. Idempotent for identical content. */ + write: (content: string | Uint8Array | Buffer) => string; + /** Returns bytes or null if the hash is invalid or no blob exists. Verifies digest matches path. */ + read: (hash: string) => Buffer | null; + /** True when hash is well-formed and the blob file is present. */ + exists: (hash: string) => boolean; +}; + +function toBuffer(content: string | Uint8Array | Buffer): Buffer { + if (typeof content === "string") return Buffer.from(content, "utf8"); + if (Buffer.isBuffer(content)) return content; + return Buffer.from(content); +} + +function digestHex(buf: Buffer): string { + return createHash("sha256").update(buf).digest("hex"); +} + +/** @returns normalized lowercase hex or null if not a valid sha256 hex string */ +export function normalizeBlobHash(hash: string): string | null { + const h = hash.trim().toLowerCase(); + if (h.length !== SHA256_HEX_LEN) return null; + if (!HEX_RE.test(h)) return null; + return h; +} + +function pathForHash(blobsRoot: string, hashLower: string): string { + return join(blobsRoot, hashLower.slice(0, 2), hashLower.slice(2)); +} + +function verifyPathMatchesContent(filePath: string, expectedHash: string): Buffer { + const data = readFileSync(filePath); + const actual = digestHex(data); + if (actual !== expectedHash) { + throw new Error( + `Blob CAS mismatch at "${filePath}": file digests to ${actual}, path expects ${expectedHash}`, + ); + } + return data; +} + +export function createBlobStore(blobsRoot: string): BlobStore { + function write(content: string | Uint8Array | Buffer): string { + const buf = toBuffer(content); + const hash = digestHex(buf); + const filePath = pathForHash(blobsRoot, hash); + + if (existsSync(filePath)) { + verifyPathMatchesContent(filePath, hash); + return hash; + } + + mkdirSync(dirname(filePath), { recursive: true }); + const tmp = join(dirname(filePath), `.tmp.${randomBytes(16).toString("hex")}`); + try { + writeFileSync(tmp, buf); + renameSync(tmp, filePath); + } catch (e) { + try { + unlinkSync(tmp); + } catch { + // ignore cleanup errors + } + throw e; + } + + return hash; + } + + function read(hash: string): Buffer | null { + const h = normalizeBlobHash(hash); + if (h === null) return null; + const filePath = pathForHash(blobsRoot, h); + if (!existsSync(filePath)) return null; + return verifyPathMatchesContent(filePath, h); + } + + function exists(hash: string): boolean { + const h = normalizeBlobHash(hash); + if (h === null) return false; + return existsSync(pathForHash(blobsRoot, h)); + } + + return { write, read, exists }; +} diff --git a/packages/daemon/src/index.ts b/packages/daemon/src/index.ts index ed758a9..ba966c8 100644 --- a/packages/daemon/src/index.ts +++ b/packages/daemon/src/index.ts @@ -32,6 +32,9 @@ export type { Kernel, KernelOptions, KernelHealth } from "./kernel.js"; export { createFileWatcher } from "./file-watcher.js"; export type { FileWatcher, FileChange, FileChangeHandler } from "./file-watcher.js"; +export { createBlobStore, normalizeBlobHash } from "./blob-store.js"; +export type { BlobStore } from "./blob-store.js"; + export { createLogStore, LOG_ARCHIVE_META_KEY } from "./log-store.js"; export type { LogStore, diff --git a/packages/daemon/src/sense-runtime.ts b/packages/daemon/src/sense-runtime.ts index 5af7a51..0b7021c 100644 --- a/packages/daemon/src/sense-runtime.ts +++ b/packages/daemon/src/sense-runtime.ts @@ -8,6 +8,8 @@ import type { BetterSQLite3Database } from "drizzle-orm/better-sqlite3"; import type { Result } from "@uncaged/nerve-core"; import { err, ok } from "@uncaged/nerve-core"; +import type { BlobStore } from "./blob-store.js"; + /** A Drizzle DB instance (schema-generic) */ export type DrizzleDB = BetterSQLite3Database>; @@ -17,11 +19,14 @@ export type PeerMap = Readonly>; /** Options passed to a compute function */ export type ComputeOptions = { signal: AbortSignal; + /** CAS under `data/blobs/`; injected by the sense worker when available. */ + blobs?: BlobStore; }; /** * The shape every sense's index.ts must export. - * Engine injects `db` (read-write), `peers` (read-only), and `options`. + * Engine injects `db` (read-write), `peers` (read-only), and `options` + * (`signal`, and `blobs` when running in the sense worker — RFC-001 §8 CAS). * Returns T when a signal should be emitted, null for silence. */ export type ComputeFn = ( @@ -192,14 +197,19 @@ export async function loadComputeFn(senseIndexPath: string): Promise> { const controller = new AbortController(); - const options: ComputeOptions = { signal: controller.signal }; + const options: ComputeOptions = + blobStore !== undefined + ? { signal: controller.signal, blobs: blobStore } + : { signal: controller.signal }; let timer: ReturnType | undefined; const timeoutPromise = diff --git a/packages/daemon/src/sense-worker.ts b/packages/daemon/src/sense-worker.ts index 95f227b..4a208b3 100644 --- a/packages/daemon/src/sense-worker.ts +++ b/packages/daemon/src/sense-worker.ts @@ -10,6 +10,7 @@ * senses//index.js ← compiled compute * senses//migrations/ ← SQL migration files * data/senses/.db ← SQLite data file + * data/blobs// ← CAS (sha256), via options.blobs in compute * nerve.yaml ← config */ @@ -19,6 +20,7 @@ import { join, resolve } from "node:path"; import { parseNerveConfig } from "@uncaged/nerve-core"; import type { NerveConfig } from "@uncaged/nerve-core"; +import { createBlobStore } from "./blob-store.js"; import type { WorkerToParentMessage } from "./ipc.js"; import { parseParentMessage } from "./ipc.js"; import { executeCompute, loadComputeFn, openPeerDb, openSenseDb } from "./sense-runtime.js"; @@ -162,9 +164,10 @@ async function runCompute( peers: PeerMap, timeoutMs: number, gracePeriodMs: number | null, + blobStore: ReturnType, ): Promise { try { - const result = await executeCompute(runtime, peers, timeoutMs); + const result = await executeCompute(runtime, peers, timeoutMs, blobStore); if (!result.ok) { sendError(senseName, result.error.message); if (gracePeriodMs !== null && result.error.message.includes("timed out")) { @@ -193,6 +196,7 @@ function handleMessage( group: string, senseConfigs: Map, inFlight: Map>, + blobStore: ReturnType, ): void { const parseResult = parseParentMessage(raw); if (!parseResult.ok) { @@ -230,7 +234,7 @@ function handleMessage( const previous = inFlight.get(msg.sense) ?? Promise.resolve(); const next = previous - .then(() => runCompute(msg.sense, runtime, peers, timeoutMs, gracePeriodMs)) + .then(() => runCompute(msg.sense, runtime, peers, timeoutMs, gracePeriodMs, blobStore)) .catch((e: unknown) => { const errMsg = e instanceof Error ? e.message : String(e); sendError(msg.sense, errMsg); @@ -294,11 +298,12 @@ async function bootstrap(nerveRoot: string, group: string): Promise { } const inFlight = new Map>(); + const blobStore = createBlobStore(join(nerveRoot, "data", "blobs")); sendReady(); process.on("message", (raw: unknown) => { - handleMessage(raw, runtimes, peers, group, senseConfigs, inFlight); + handleMessage(raw, runtimes, peers, group, senseConfigs, inFlight, blobStore); }); }