From 12d58a820665003733601ec12101cda5318e8b7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Thu, 7 May 2026 10:40:14 +0000 Subject: [PATCH] feat: migrate CAS to global storage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add getGlobalCasDir() to storage-root.ts - cmd-cas.ts uses global CAS dir, threadId kept for CLI compat - thread rm no longer deletes .cas/ directories - Rename createThreadCas → createCasStore (deprecated alias kept) - 134 tests passing BREAKING: CAS moves from .cas/ to /cas/ Fixes #30 --- .../cli-workflow/__tests__/commands.test.ts | 34 +++++++++++- .../cli-workflow/__tests__/thread-cli.test.ts | 53 ++++++++++++++++++- packages/cli-workflow/src/cmd-cas.ts | 42 ++++----------- packages/cli-workflow/src/cmd-thread.ts | 4 +- packages/workflow/__tests__/cas.test.ts | 30 ++++++----- .../workflow/__tests__/storage-root.test.ts | 14 +++++ packages/workflow/src/cas.ts | 5 +- packages/workflow/src/index.ts | 4 +- packages/workflow/src/storage-root.ts | 6 +++ 9 files changed, 139 insertions(+), 53 deletions(-) create mode 100644 packages/workflow/__tests__/storage-root.test.ts diff --git a/packages/cli-workflow/__tests__/commands.test.ts b/packages/cli-workflow/__tests__/commands.test.ts index c7dfd2a..9020604 100644 --- a/packages/cli-workflow/__tests__/commands.test.ts +++ b/packages/cli-workflow/__tests__/commands.test.ts @@ -3,8 +3,9 @@ import { mkdir, mkdtemp, readFile, rm, unlink, writeFile } from "node:fs/promise import { tmpdir } from "node:os"; import { join } from "node:path"; -import { getRegisteredWorkflow, readWorkflowRegistry } from "@uncaged/workflow"; +import { getGlobalCasDir, getRegisteredWorkflow, readWorkflowRegistry } from "@uncaged/workflow"; import { cmdAdd } from "../src/cmd-add.js"; +import { cmdCasGet, cmdCasList, cmdCasPut, cmdCasRm } from "../src/cmd-cas.js"; import { cmdHistory } from "../src/cmd-history.js"; import { cmdList, formatListLines } from "../src/cmd-list.js"; import { cmdRemove } from "../src/cmd-remove.js"; @@ -371,6 +372,37 @@ export const run = async function* (input) { expect(bad.ok).toBe(false); }); + test("cas put/get/list/rm use global cas dir (thread id not required for storage)", async () => { + const put = await cmdCasPut(storageRoot, "nonexistent-thread-id", "phase doc"); + expect(put.ok).toBe(true); + if (!put.ok) { + return; + } + const hash = put.value; + const blobPath = join(getGlobalCasDir(storageRoot), `${hash}.txt`); + expect(await readFile(blobPath, "utf8")).toBe("phase doc"); + + const got = await cmdCasGet(storageRoot, "other-thread", hash); + expect(got.ok).toBe(true); + if (!got.ok) { + return; + } + expect(got.value).toBe("phase doc"); + + const listed = await cmdCasList(storageRoot, "another-thread"); + expect(listed.ok).toBe(true); + if (!listed.ok) { + return; + } + expect(listed.value).toContain(hash); + + const removed = await cmdCasRm(storageRoot, "rm-thread", hash); + expect(removed.ok).toBe(true); + + const missing = await cmdCasGet(storageRoot, "after-rm", hash); + expect(missing.ok).toBe(false); + }); + test("rollback rejects missing bundle file for target hash", async () => { const bundleDir = join(storageRoot, "src"); await mkdir(bundleDir, { recursive: true }); diff --git a/packages/cli-workflow/__tests__/thread-cli.test.ts b/packages/cli-workflow/__tests__/thread-cli.test.ts index dbc8f3f..077d51f 100644 --- a/packages/cli-workflow/__tests__/thread-cli.test.ts +++ b/packages/cli-workflow/__tests__/thread-cli.test.ts @@ -4,7 +4,9 @@ import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; import { tmpdir } from "node:os"; import { dirname, join } from "node:path"; import { fileURLToPath } from "node:url"; +import { getGlobalCasDir } from "@uncaged/workflow"; import { cmdAdd } from "../src/cmd-add.js"; +import { cmdCasPut } from "../src/cmd-cas.js"; import { cmdKill } from "../src/cmd-kill.js"; import { cmdPause } from "../src/cmd-pause.js"; import { cmdPs } from "../src/cmd-ps.js"; @@ -12,7 +14,7 @@ import { cmdResume } from "../src/cmd-resume.js"; import { cmdRun } from "../src/cmd-run.js"; import { cmdThreadRemove, cmdThreadShow } from "../src/cmd-thread.js"; import { cmdThreads } from "../src/cmd-threads.js"; -import { pathExists } from "../src/fs-utils.js"; +import { pathExists, readTextFileIfExists } from "../src/fs-utils.js"; import { addCliArgs } from "./bundle-fixture.js"; const threadFixtureDescriptor = `export const descriptor = { @@ -175,6 +177,55 @@ describe("cli thread commands", () => { expect(await pathExists(dataPath)).toBe(false); }); + test("thread rm does not delete global cas blobs for that thread id", async () => { + const bundleDir = join(storageRoot, "src"); + await mkdir(bundleDir, { recursive: true }); + const bundlePath = join(bundleDir, "demo.esm.js"); + await writeFile(bundlePath, fastBundleSource, "utf8"); + + const added = await cmdAdd(storageRoot, addCliArgs("solve-issue", bundlePath)); + expect(added.ok).toBe(true); + if (!added.ok) { + return; + } + + const ran = await cmdRun(storageRoot, "solve-issue", "hello", 5); + expect(ran.ok).toBe(true); + if (!ran.ok) { + return; + } + + const threadId = ran.value.threadId; + + let threads = await cmdThreads(storageRoot, []); + for ( + let attempt = 0; + attempt < 50 && threads.ok && !threads.value.some((l) => l.includes(threadId)); + attempt++ + ) { + await new Promise((r) => setTimeout(r, 20)); + threads = await cmdThreads(storageRoot, []); + } + + const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`); + const runningPath = join(dirname(dataPath), `${threadId}.running`); + await waitUntilRunningFileAbsent(runningPath, 120); + + const put = await cmdCasPut(storageRoot, threadId, "keep-after-thread-rm"); + expect(put.ok).toBe(true); + if (!put.ok) { + return; + } + const hash = put.value; + const casBlob = join(getGlobalCasDir(storageRoot), `${hash}.txt`); + + const removed = await cmdThreadRemove(storageRoot, threadId); + expect(removed.ok).toBe(true); + + const stillThere = await readTextFileIfExists(casBlob); + expect(stillThere).toBe("keep-after-thread-rm"); + }); + test("cli entrypoint dispatches threads / ps (spawn)", () => { const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; const threads = spawnSync(process.execPath, [cliEntryPath, "threads"], { diff --git a/packages/cli-workflow/src/cmd-cas.ts b/packages/cli-workflow/src/cmd-cas.ts index b62a427..8774d4b 100644 --- a/packages/cli-workflow/src/cmd-cas.ts +++ b/packages/cli-workflow/src/cmd-cas.ts @@ -1,23 +1,11 @@ -import { dirname, join } from "node:path"; - -import { createThreadCas, err, ok, type Result } from "@uncaged/workflow"; - -import { resolveThreadDataPath } from "./thread-scan.js"; - -function resolveCasDir(threadDataPath: string, threadId: string): string { - return join(dirname(threadDataPath), `${threadId}.cas`); -} +import { createCasStore, err, getGlobalCasDir, ok, type Result } from "@uncaged/workflow"; export async function cmdCasGet( storageRoot: string, - threadId: string, + _threadId: string, hash: string, ): Promise> { - const dataPath = await resolveThreadDataPath(storageRoot, threadId); - if (dataPath === null) { - return err(`thread not found: ${threadId}`); - } - const cas = createThreadCas(resolveCasDir(dataPath, threadId)); + const cas = createCasStore(getGlobalCasDir(storageRoot)); const content = await cas.get(hash); if (content === null) { return err(`cas entry not found: ${hash}`); @@ -27,41 +15,29 @@ export async function cmdCasGet( export async function cmdCasPut( storageRoot: string, - threadId: string, + _threadId: string, content: string, ): Promise> { - const dataPath = await resolveThreadDataPath(storageRoot, threadId); - if (dataPath === null) { - return err(`thread not found: ${threadId}`); - } - const cas = createThreadCas(resolveCasDir(dataPath, threadId)); + const cas = createCasStore(getGlobalCasDir(storageRoot)); const hash = await cas.put(content); return ok(hash); } export async function cmdCasList( storageRoot: string, - threadId: string, + _threadId: string, ): Promise> { - const dataPath = await resolveThreadDataPath(storageRoot, threadId); - if (dataPath === null) { - return err(`thread not found: ${threadId}`); - } - const cas = createThreadCas(resolveCasDir(dataPath, threadId)); + const cas = createCasStore(getGlobalCasDir(storageRoot)); const hashes = await cas.list(); return ok(hashes); } export async function cmdCasRm( storageRoot: string, - threadId: string, + _threadId: string, hash: string, ): Promise> { - const dataPath = await resolveThreadDataPath(storageRoot, threadId); - if (dataPath === null) { - return err(`thread not found: ${threadId}`); - } - const cas = createThreadCas(resolveCasDir(dataPath, threadId)); + const cas = createCasStore(getGlobalCasDir(storageRoot)); await cas.delete(hash); return ok(undefined); } diff --git a/packages/cli-workflow/src/cmd-thread.ts b/packages/cli-workflow/src/cmd-thread.ts index 1422d89..af5c6b1 100644 --- a/packages/cli-workflow/src/cmd-thread.ts +++ b/packages/cli-workflow/src/cmd-thread.ts @@ -1,4 +1,4 @@ -import { rm, unlink } from "node:fs/promises"; +import { unlink } from "node:fs/promises"; import { dirname, join } from "node:path"; import { err, ok, type Result } from "@uncaged/workflow"; @@ -33,12 +33,10 @@ export async function cmdThreadRemove( const dir = dirname(dataPath); const infoPath = join(dir, `${threadId}.info.jsonl`); const runningPath = join(dir, `${threadId}.running`); - const casPath = join(dir, `${threadId}.cas`); await unlink(dataPath); await unlink(infoPath).catch(() => {}); await unlink(runningPath).catch(() => {}); - await rm(casPath, { recursive: true, force: true }); return ok(undefined); } diff --git a/packages/workflow/__tests__/cas.test.ts b/packages/workflow/__tests__/cas.test.ts index 679e242..ea17771 100644 --- a/packages/workflow/__tests__/cas.test.ts +++ b/packages/workflow/__tests__/cas.test.ts @@ -3,10 +3,16 @@ import { mkdtemp, rm } from "node:fs/promises"; import { tmpdir } from "node:os"; import { join } from "node:path"; -import { createThreadCas } from "../src/cas.js"; +import { createCasStore, createThreadCas } from "../src/cas.js"; import { hashString } from "../src/hash.js"; -describe("createThreadCas", () => { +describe("cas module exports", () => { + test("createThreadCas is a deprecated alias of createCasStore", () => { + expect(createThreadCas).toBe(createCasStore); + }); +}); + +describe("createCasStore", () => { let casDir: string; beforeEach(async () => { @@ -18,7 +24,7 @@ describe("createThreadCas", () => { }); test("put returns consistent hash for same content", async () => { - const cas = createThreadCas(casDir); + const cas = createCasStore(casDir); const h1 = await cas.put("hello world"); const h2 = await cas.put("hello world"); expect(h1).toBe(h2); @@ -26,14 +32,14 @@ describe("createThreadCas", () => { }); test("put returns hash matching hashString", async () => { - const cas = createThreadCas(casDir); + const cas = createCasStore(casDir); const content = "some content to store"; const h = await cas.put(content); expect(h).toBe(hashString(content)); }); test("get returns stored content", async () => { - const cas = createThreadCas(casDir); + const cas = createCasStore(casDir); const content = "line1\nline2\nline3"; const h = await cas.put(content); const retrieved = await cas.get(h); @@ -41,13 +47,13 @@ describe("createThreadCas", () => { }); test("get returns null for missing hash", async () => { - const cas = createThreadCas(casDir); + const cas = createCasStore(casDir); const result = await cas.get("0000000000000"); expect(result).toBeNull(); }); test("delete removes entry", async () => { - const cas = createThreadCas(casDir); + const cas = createCasStore(casDir); const h = await cas.put("to be deleted"); await cas.delete(h); const result = await cas.get(h); @@ -55,12 +61,12 @@ describe("createThreadCas", () => { }); test("delete on missing hash does not throw", async () => { - const cas = createThreadCas(casDir); + const cas = createCasStore(casDir); await cas.delete("0000000000000"); }); test("list returns all stored hashes", async () => { - const cas = createThreadCas(casDir); + const cas = createCasStore(casDir); const h1 = await cas.put("aaa"); const h2 = await cas.put("bbb"); const h3 = await cas.put("ccc"); @@ -69,13 +75,13 @@ describe("createThreadCas", () => { }); test("list returns empty array when cas dir does not exist", async () => { - const cas = createThreadCas(join(casDir, "nonexistent")); + const cas = createCasStore(join(casDir, "nonexistent")); const hashes = await cas.list(); expect(hashes).toEqual([]); }); test("put is idempotent — same content written twice causes no error", async () => { - const cas = createThreadCas(casDir); + const cas = createCasStore(casDir); const h1 = await cas.put("idempotent"); const h2 = await cas.put("idempotent"); expect(h1).toBe(h2); @@ -84,7 +90,7 @@ describe("createThreadCas", () => { }); test("different content produces different hashes", async () => { - const cas = createThreadCas(casDir); + const cas = createCasStore(casDir); const h1 = await cas.put("alpha"); const h2 = await cas.put("beta"); expect(h1).not.toBe(h2); diff --git a/packages/workflow/__tests__/storage-root.test.ts b/packages/workflow/__tests__/storage-root.test.ts new file mode 100644 index 0000000..6bcb765 --- /dev/null +++ b/packages/workflow/__tests__/storage-root.test.ts @@ -0,0 +1,14 @@ +import { describe, expect, test } from "bun:test"; +import { join } from "node:path"; + +import { getDefaultWorkflowStorageRoot, getGlobalCasDir } from "../src/storage-root.js"; + +describe("getGlobalCasDir", () => { + test("joins cas segment under explicit storage root", () => { + expect(getGlobalCasDir("/tmp/wf-root")).toBe(join("/tmp/wf-root", "cas")); + }); + + test("defaults to default workflow root when storage root is undefined", () => { + expect(getGlobalCasDir(undefined)).toBe(join(getDefaultWorkflowStorageRoot(), "cas")); + }); +}); diff --git a/packages/workflow/src/cas.ts b/packages/workflow/src/cas.ts index 8e422ea..513e6b1 100644 --- a/packages/workflow/src/cas.ts +++ b/packages/workflow/src/cas.ts @@ -10,7 +10,7 @@ export type CasStore = { list(): Promise; }; -export function createThreadCas(casDir: string): CasStore { +export function createCasStore(casDir: string): CasStore { async function ensureDir(): Promise { await mkdir(casDir, { recursive: true }); } @@ -68,3 +68,6 @@ export function createThreadCas(casDir: string): CasStore { }, }; } + +/** @deprecated Use {@link createCasStore} — CAS is global, not per-thread. */ +export const createThreadCas = createCasStore; diff --git a/packages/workflow/src/index.ts b/packages/workflow/src/index.ts index 131de9c..6fdfae4 100644 --- a/packages/workflow/src/index.ts +++ b/packages/workflow/src/index.ts @@ -7,7 +7,7 @@ export { } from "./base32.js"; export { buildDescriptor } from "./build-descriptor.js"; export { validateWorkflowBundle, type WorkflowBundleValidationInput } from "./bundle-validator.js"; -export { type CasStore, createThreadCas } from "./cas.js"; +export { type CasStore, createCasStore, createThreadCas } from "./cas.js"; export { createWorkflow } from "./create-workflow.js"; export { type ExecuteThreadIo, @@ -55,7 +55,7 @@ export { writeWorkflowRegistry, } from "./registry.js"; export { err, ok, type Result } from "./result.js"; -export { getDefaultWorkflowStorageRoot } from "./storage-root.js"; +export { getDefaultWorkflowStorageRoot, getGlobalCasDir } from "./storage-root.js"; export { createThreadPauseGate, type ThreadPauseGate } from "./thread-pause-gate.js"; export { type AgentBinding, diff --git a/packages/workflow/src/storage-root.ts b/packages/workflow/src/storage-root.ts index 81c20d8..f270645 100644 --- a/packages/workflow/src/storage-root.ts +++ b/packages/workflow/src/storage-root.ts @@ -5,3 +5,9 @@ import { join } from "node:path"; export function getDefaultWorkflowStorageRoot(): string { return join(homedir(), ".uncaged", "workflow"); } + +/** Global content-addressed store directory under the workflow storage root (`/cas`). */ +export function getGlobalCasDir(storageRoot: string | undefined): string { + const root = storageRoot ?? getDefaultWorkflowStorageRoot(); + return join(root, "cas"); +}