feat: migrate CAS to global storage

- Add getGlobalCasDir() to storage-root.ts
- cmd-cas.ts uses global CAS dir, threadId kept for CLI compat
- thread rm no longer deletes .cas/ directories
- Rename createThreadCas → createCasStore (deprecated alias kept)
- 134 tests passing

BREAKING: CAS moves from <thread>.cas/ to <storageRoot>/cas/

Fixes #30
This commit is contained in:
2026-05-07 10:40:14 +00:00
parent c096f4d94e
commit 12d58a8206
9 changed files with 139 additions and 53 deletions
@@ -3,8 +3,9 @@ import { mkdir, mkdtemp, readFile, rm, unlink, writeFile } from "node:fs/promise
import { tmpdir } from "node:os";
import { join } from "node:path";
import { getRegisteredWorkflow, readWorkflowRegistry } from "@uncaged/workflow";
import { getGlobalCasDir, getRegisteredWorkflow, readWorkflowRegistry } from "@uncaged/workflow";
import { cmdAdd } from "../src/cmd-add.js";
import { cmdCasGet, cmdCasList, cmdCasPut, cmdCasRm } from "../src/cmd-cas.js";
import { cmdHistory } from "../src/cmd-history.js";
import { cmdList, formatListLines } from "../src/cmd-list.js";
import { cmdRemove } from "../src/cmd-remove.js";
@@ -371,6 +372,37 @@ export const run = async function* (input) {
expect(bad.ok).toBe(false);
});
test("cas put/get/list/rm use global cas dir (thread id not required for storage)", async () => {
const put = await cmdCasPut(storageRoot, "nonexistent-thread-id", "phase doc");
expect(put.ok).toBe(true);
if (!put.ok) {
return;
}
const hash = put.value;
const blobPath = join(getGlobalCasDir(storageRoot), `${hash}.txt`);
expect(await readFile(blobPath, "utf8")).toBe("phase doc");
const got = await cmdCasGet(storageRoot, "other-thread", hash);
expect(got.ok).toBe(true);
if (!got.ok) {
return;
}
expect(got.value).toBe("phase doc");
const listed = await cmdCasList(storageRoot, "another-thread");
expect(listed.ok).toBe(true);
if (!listed.ok) {
return;
}
expect(listed.value).toContain(hash);
const removed = await cmdCasRm(storageRoot, "rm-thread", hash);
expect(removed.ok).toBe(true);
const missing = await cmdCasGet(storageRoot, "after-rm", hash);
expect(missing.ok).toBe(false);
});
test("rollback rejects missing bundle file for target hash", async () => {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
@@ -4,7 +4,9 @@ import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import { getGlobalCasDir } from "@uncaged/workflow";
import { cmdAdd } from "../src/cmd-add.js";
import { cmdCasPut } from "../src/cmd-cas.js";
import { cmdKill } from "../src/cmd-kill.js";
import { cmdPause } from "../src/cmd-pause.js";
import { cmdPs } from "../src/cmd-ps.js";
@@ -12,7 +14,7 @@ import { cmdResume } from "../src/cmd-resume.js";
import { cmdRun } from "../src/cmd-run.js";
import { cmdThreadRemove, cmdThreadShow } from "../src/cmd-thread.js";
import { cmdThreads } from "../src/cmd-threads.js";
import { pathExists } from "../src/fs-utils.js";
import { pathExists, readTextFileIfExists } from "../src/fs-utils.js";
import { addCliArgs } from "./bundle-fixture.js";
const threadFixtureDescriptor = `export const descriptor = {
@@ -175,6 +177,55 @@ describe("cli thread commands", () => {
expect(await pathExists(dataPath)).toBe(false);
});
test("thread rm does not delete global cas blobs for that thread id", async () => {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
await writeFile(bundlePath, fastBundleSource, "utf8");
const added = await cmdAdd(storageRoot, addCliArgs("solve-issue", bundlePath));
expect(added.ok).toBe(true);
if (!added.ok) {
return;
}
const ran = await cmdRun(storageRoot, "solve-issue", "hello", 5);
expect(ran.ok).toBe(true);
if (!ran.ok) {
return;
}
const threadId = ran.value.threadId;
let threads = await cmdThreads(storageRoot, []);
for (
let attempt = 0;
attempt < 50 && threads.ok && !threads.value.some((l) => l.includes(threadId));
attempt++
) {
await new Promise((r) => setTimeout(r, 20));
threads = await cmdThreads(storageRoot, []);
}
const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`);
const runningPath = join(dirname(dataPath), `${threadId}.running`);
await waitUntilRunningFileAbsent(runningPath, 120);
const put = await cmdCasPut(storageRoot, threadId, "keep-after-thread-rm");
expect(put.ok).toBe(true);
if (!put.ok) {
return;
}
const hash = put.value;
const casBlob = join(getGlobalCasDir(storageRoot), `${hash}.txt`);
const removed = await cmdThreadRemove(storageRoot, threadId);
expect(removed.ok).toBe(true);
const stillThere = await readTextFileIfExists(casBlob);
expect(stillThere).toBe("keep-after-thread-rm");
});
test("cli entrypoint dispatches threads / ps (spawn)", () => {
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const threads = spawnSync(process.execPath, [cliEntryPath, "threads"], {
+9 -33
View File
@@ -1,23 +1,11 @@
import { dirname, join } from "node:path";
import { createThreadCas, err, ok, type Result } from "@uncaged/workflow";
import { resolveThreadDataPath } from "./thread-scan.js";
function resolveCasDir(threadDataPath: string, threadId: string): string {
return join(dirname(threadDataPath), `${threadId}.cas`);
}
import { createCasStore, err, getGlobalCasDir, ok, type Result } from "@uncaged/workflow";
export async function cmdCasGet(
storageRoot: string,
threadId: string,
_threadId: string,
hash: string,
): Promise<Result<string, string>> {
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
return err(`thread not found: ${threadId}`);
}
const cas = createThreadCas(resolveCasDir(dataPath, threadId));
const cas = createCasStore(getGlobalCasDir(storageRoot));
const content = await cas.get(hash);
if (content === null) {
return err(`cas entry not found: ${hash}`);
@@ -27,41 +15,29 @@ export async function cmdCasGet(
export async function cmdCasPut(
storageRoot: string,
threadId: string,
_threadId: string,
content: string,
): Promise<Result<string, string>> {
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
return err(`thread not found: ${threadId}`);
}
const cas = createThreadCas(resolveCasDir(dataPath, threadId));
const cas = createCasStore(getGlobalCasDir(storageRoot));
const hash = await cas.put(content);
return ok(hash);
}
export async function cmdCasList(
storageRoot: string,
threadId: string,
_threadId: string,
): Promise<Result<string[], string>> {
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
return err(`thread not found: ${threadId}`);
}
const cas = createThreadCas(resolveCasDir(dataPath, threadId));
const cas = createCasStore(getGlobalCasDir(storageRoot));
const hashes = await cas.list();
return ok(hashes);
}
export async function cmdCasRm(
storageRoot: string,
threadId: string,
_threadId: string,
hash: string,
): Promise<Result<void, string>> {
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
return err(`thread not found: ${threadId}`);
}
const cas = createThreadCas(resolveCasDir(dataPath, threadId));
const cas = createCasStore(getGlobalCasDir(storageRoot));
await cas.delete(hash);
return ok(undefined);
}
+1 -3
View File
@@ -1,4 +1,4 @@
import { rm, unlink } from "node:fs/promises";
import { unlink } from "node:fs/promises";
import { dirname, join } from "node:path";
import { err, ok, type Result } from "@uncaged/workflow";
@@ -33,12 +33,10 @@ export async function cmdThreadRemove(
const dir = dirname(dataPath);
const infoPath = join(dir, `${threadId}.info.jsonl`);
const runningPath = join(dir, `${threadId}.running`);
const casPath = join(dir, `${threadId}.cas`);
await unlink(dataPath);
await unlink(infoPath).catch(() => {});
await unlink(runningPath).catch(() => {});
await rm(casPath, { recursive: true, force: true });
return ok(undefined);
}
+18 -12
View File
@@ -3,10 +3,16 @@ import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createThreadCas } from "../src/cas.js";
import { createCasStore, createThreadCas } from "../src/cas.js";
import { hashString } from "../src/hash.js";
describe("createThreadCas", () => {
describe("cas module exports", () => {
test("createThreadCas is a deprecated alias of createCasStore", () => {
expect(createThreadCas).toBe(createCasStore);
});
});
describe("createCasStore", () => {
let casDir: string;
beforeEach(async () => {
@@ -18,7 +24,7 @@ describe("createThreadCas", () => {
});
test("put returns consistent hash for same content", async () => {
const cas = createThreadCas(casDir);
const cas = createCasStore(casDir);
const h1 = await cas.put("hello world");
const h2 = await cas.put("hello world");
expect(h1).toBe(h2);
@@ -26,14 +32,14 @@ describe("createThreadCas", () => {
});
test("put returns hash matching hashString", async () => {
const cas = createThreadCas(casDir);
const cas = createCasStore(casDir);
const content = "some content to store";
const h = await cas.put(content);
expect(h).toBe(hashString(content));
});
test("get returns stored content", async () => {
const cas = createThreadCas(casDir);
const cas = createCasStore(casDir);
const content = "line1\nline2\nline3";
const h = await cas.put(content);
const retrieved = await cas.get(h);
@@ -41,13 +47,13 @@ describe("createThreadCas", () => {
});
test("get returns null for missing hash", async () => {
const cas = createThreadCas(casDir);
const cas = createCasStore(casDir);
const result = await cas.get("0000000000000");
expect(result).toBeNull();
});
test("delete removes entry", async () => {
const cas = createThreadCas(casDir);
const cas = createCasStore(casDir);
const h = await cas.put("to be deleted");
await cas.delete(h);
const result = await cas.get(h);
@@ -55,12 +61,12 @@ describe("createThreadCas", () => {
});
test("delete on missing hash does not throw", async () => {
const cas = createThreadCas(casDir);
const cas = createCasStore(casDir);
await cas.delete("0000000000000");
});
test("list returns all stored hashes", async () => {
const cas = createThreadCas(casDir);
const cas = createCasStore(casDir);
const h1 = await cas.put("aaa");
const h2 = await cas.put("bbb");
const h3 = await cas.put("ccc");
@@ -69,13 +75,13 @@ describe("createThreadCas", () => {
});
test("list returns empty array when cas dir does not exist", async () => {
const cas = createThreadCas(join(casDir, "nonexistent"));
const cas = createCasStore(join(casDir, "nonexistent"));
const hashes = await cas.list();
expect(hashes).toEqual([]);
});
test("put is idempotent — same content written twice causes no error", async () => {
const cas = createThreadCas(casDir);
const cas = createCasStore(casDir);
const h1 = await cas.put("idempotent");
const h2 = await cas.put("idempotent");
expect(h1).toBe(h2);
@@ -84,7 +90,7 @@ describe("createThreadCas", () => {
});
test("different content produces different hashes", async () => {
const cas = createThreadCas(casDir);
const cas = createCasStore(casDir);
const h1 = await cas.put("alpha");
const h2 = await cas.put("beta");
expect(h1).not.toBe(h2);
@@ -0,0 +1,14 @@
import { describe, expect, test } from "bun:test";
import { join } from "node:path";
import { getDefaultWorkflowStorageRoot, getGlobalCasDir } from "../src/storage-root.js";
describe("getGlobalCasDir", () => {
test("joins cas segment under explicit storage root", () => {
expect(getGlobalCasDir("/tmp/wf-root")).toBe(join("/tmp/wf-root", "cas"));
});
test("defaults to default workflow root when storage root is undefined", () => {
expect(getGlobalCasDir(undefined)).toBe(join(getDefaultWorkflowStorageRoot(), "cas"));
});
});
+4 -1
View File
@@ -10,7 +10,7 @@ export type CasStore = {
list(): Promise<string[]>;
};
export function createThreadCas(casDir: string): CasStore {
export function createCasStore(casDir: string): CasStore {
async function ensureDir(): Promise<void> {
await mkdir(casDir, { recursive: true });
}
@@ -68,3 +68,6 @@ export function createThreadCas(casDir: string): CasStore {
},
};
}
/** @deprecated Use {@link createCasStore} — CAS is global, not per-thread. */
export const createThreadCas = createCasStore;
+2 -2
View File
@@ -7,7 +7,7 @@ export {
} from "./base32.js";
export { buildDescriptor } from "./build-descriptor.js";
export { validateWorkflowBundle, type WorkflowBundleValidationInput } from "./bundle-validator.js";
export { type CasStore, createThreadCas } from "./cas.js";
export { type CasStore, createCasStore, createThreadCas } from "./cas.js";
export { createWorkflow } from "./create-workflow.js";
export {
type ExecuteThreadIo,
@@ -55,7 +55,7 @@ export {
writeWorkflowRegistry,
} from "./registry.js";
export { err, ok, type Result } from "./result.js";
export { getDefaultWorkflowStorageRoot } from "./storage-root.js";
export { getDefaultWorkflowStorageRoot, getGlobalCasDir } from "./storage-root.js";
export { createThreadPauseGate, type ThreadPauseGate } from "./thread-pause-gate.js";
export {
type AgentBinding,
+6
View File
@@ -5,3 +5,9 @@ import { join } from "node:path";
export function getDefaultWorkflowStorageRoot(): string {
return join(homedir(), ".uncaged", "workflow");
}
/** Global content-addressed store directory under the workflow storage root (`<root>/cas`). */
export function getGlobalCasDir(storageRoot: string | undefined): string {
const root = storageRoot ?? getDefaultWorkflowStorageRoot();
return join(root, "cas");
}