feat: CAS garbage collection

- garbageCollectCas() mark-and-sweep: scan .data.jsonl refs, delete orphans
- 'uncaged-workflow gc' CLI command
- thread rm triggers GC automatically
- 141 tests passing

Fixes #32
This commit is contained in:
2026-05-07 10:47:52 +00:00
parent 15d39c96a7
commit 6488b7bbb4
7 changed files with 306 additions and 3 deletions
@@ -0,0 +1,144 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { spawnSync } from "node:child_process";
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { fileURLToPath } from "node:url";
import { createCasStore, garbageCollectCas, getGlobalCasDir } from "@uncaged/workflow";
import { cmdThreadRemove } from "../src/cmd-thread.js";
import { pathExists } from "../src/fs-utils.js";
const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url));
/** Minimal valid `.data.jsonl` with one role step referencing `activeHash` in `refs`. */
function makeDataJsonl(threadId: string, bundleHash: string, activeHash: string): string {
return [
JSON.stringify({
name: "demo",
hash: bundleHash,
threadId,
parameters: { prompt: "hi", options: { maxRounds: 5 } },
timestamp: 100,
}),
JSON.stringify({
role: "planner",
content: "p",
meta: {},
refs: [activeHash],
timestamp: 101,
}),
"",
].join("\n");
}
describe("gc cli and garbageCollectCas", () => {
let prevEnv: string | undefined;
let storageRoot: string;
beforeEach(async () => {
prevEnv = process.env.UNCAGED_WORKFLOW_STORAGE_ROOT;
storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-gc-"));
process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = storageRoot;
});
afterEach(async () => {
if (prevEnv === undefined) {
delete process.env.UNCAGED_WORKFLOW_STORAGE_ROOT;
} else {
process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = prevEnv;
}
await rm(storageRoot, { recursive: true, force: true });
});
test("garbageCollectCas keeps CAS entries referenced by thread refs", async () => {
const bundleHash = "C9NMV6V2TQT81";
const threadId = "01AAA1111111111111111111";
const logsDir = join(storageRoot, "logs", bundleHash);
await mkdir(logsDir, { recursive: true });
const cas = createCasStore(getGlobalCasDir(storageRoot));
const activeHash = await cas.put("active-blob");
const orphanHash = await cas.put("orphan-blob");
await writeFile(
join(logsDir, `${threadId}.data.jsonl`),
makeDataJsonl(threadId, bundleHash, activeHash),
"utf8",
);
const gc = await garbageCollectCas(storageRoot);
expect(gc.ok).toBe(true);
if (!gc.ok) {
return;
}
expect(gc.value.scannedThreads).toBe(1);
expect(gc.value.activeRefs).toBe(1);
expect(gc.value.deletedEntries).toBe(1);
expect(gc.value.deletedHashes).toEqual([orphanHash]);
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${activeHash}.txt`))).toBe(true);
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${orphanHash}.txt`))).toBe(false);
});
test("garbageCollectCas deletes orphaned CAS when no threads reference them", async () => {
const cas = createCasStore(getGlobalCasDir(storageRoot));
const orphanHash = await cas.put("lonely");
const gc = await garbageCollectCas(storageRoot);
expect(gc.ok).toBe(true);
if (!gc.ok) {
return;
}
expect(gc.value.scannedThreads).toBe(0);
expect(gc.value.activeRefs).toBe(0);
expect(gc.value.deletedEntries).toBe(1);
expect(gc.value.deletedHashes).toEqual([orphanHash]);
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${orphanHash}.txt`))).toBe(false);
});
test("cli gc prints stats", async () => {
const bundleHash = "C9NMV6V2TQT81";
const threadId = "01BBB2222222222222222222";
const logsDir = join(storageRoot, "logs", bundleHash);
await mkdir(logsDir, { recursive: true });
const cas = createCasStore(getGlobalCasDir(storageRoot));
const activeHash = await cas.put("keep-me");
await cas.put("drop-me");
await writeFile(
join(logsDir, `${threadId}.data.jsonl`),
makeDataJsonl(threadId, bundleHash, activeHash),
"utf8",
);
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const proc = spawnSync(process.execPath, [cliEntryPath, "gc"], { env, encoding: "utf8" });
expect(proc.status).toBe(0);
expect(String(proc.stdout).trim()).toBe("scanned 1 threads, 1 active refs, deleted 1 entries");
});
test("thread rm triggers gc so unreferenced CAS is removed", async () => {
const bundleHash = "C9NMV6V2TQT81";
const threadId = "01CCC3333333333333333333";
const logsDir = join(storageRoot, "logs", bundleHash);
await mkdir(logsDir, { recursive: true });
const cas = createCasStore(getGlobalCasDir(storageRoot));
const activeHash = await cas.put("pinned-by-ref");
await writeFile(
join(logsDir, `${threadId}.data.jsonl`),
makeDataJsonl(threadId, bundleHash, activeHash),
"utf8",
);
const orphanHash = await cas.put("orphan-after-rm");
const orphanPath = join(getGlobalCasDir(storageRoot), `${orphanHash}.txt`);
const removed = await cmdThreadRemove(storageRoot, threadId);
expect(removed.ok).toBe(true);
expect(await pathExists(orphanPath)).toBe(false);
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${activeHash}.txt`))).toBe(false);
});
});
@@ -177,7 +177,7 @@ describe("cli thread commands", () => {
expect(await pathExists(dataPath)).toBe(false);
});
test("thread rm does not delete global cas blobs for that thread id", async () => {
test("thread rm runs GC and removes CAS blobs not referenced by any remaining thread", async () => {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
@@ -223,7 +223,7 @@ describe("cli thread commands", () => {
expect(removed.ok).toBe(true);
const stillThere = await readTextFileIfExists(casBlob);
expect(stillThere).toBe("keep-after-thread-rm");
expect(stillThere).toBeNull();
});
test("cli entrypoint dispatches threads / ps (spawn)", () => {
+20
View File
@@ -2,6 +2,7 @@ import { printCliError, printCliLine, printCliWarn } from "./cli-output.js";
import { cmdAdd, formatAddSuccess, parseAddArgv } from "./cmd-add.js";
import { cmdCasGet, cmdCasList, cmdCasPut, cmdCasRm } from "./cmd-cas.js";
import { cmdFork, parseForkArgv } from "./cmd-fork.js";
import { cmdGc } from "./cmd-gc.js";
import { cmdHistory } from "./cmd-history.js";
import { cmdKill } from "./cmd-kill.js";
import { cmdList, formatListLines } from "./cmd-list.js";
@@ -34,6 +35,7 @@ function usage(): string {
" uncaged-workflow thread <id>",
" uncaged-workflow thread rm <id>",
" uncaged-workflow fork <thread-id> [--from-role <role>]",
" uncaged-workflow gc",
" uncaged-workflow cas get <thread-id> <hash>",
" uncaged-workflow cas put <thread-id> <content>",
" uncaged-workflow cas list <thread-id>",
@@ -266,6 +268,23 @@ async function dispatchThreadBranch(storageRoot: string, rest: string[]): Promis
return dispatchThread(storageRoot, rest);
}
async function dispatchGc(storageRoot: string, argv: string[]): Promise<number> {
if (argv.length > 0) {
printCliError(`${usage()}\n\nerror: gc takes no arguments`);
return 1;
}
const result = await cmdGc(storageRoot);
if (!result.ok) {
printCliError(result.error);
return 1;
}
const stats = result.value;
printCliLine(
`scanned ${stats.scannedThreads} threads, ${stats.activeRefs} active refs, deleted ${stats.deletedEntries} entries`,
);
return 0;
}
async function dispatchFork(storageRoot: string, argv: string[]): Promise<number> {
const parsed = parseForkArgv(argv);
if (!parsed.ok) {
@@ -387,6 +406,7 @@ const COMMAND_TABLE: Record<string, DispatchFn> = {
threads: dispatchThreads,
thread: dispatchThreadBranch,
fork: dispatchFork,
gc: dispatchGc,
cas: dispatchCas,
};
+5
View File
@@ -0,0 +1,5 @@
import { type GcResult, garbageCollectCas, type Result } from "@uncaged/workflow";
export async function cmdGc(storageRoot: string): Promise<Result<GcResult, string>> {
return garbageCollectCas(storageRoot);
}
+3 -1
View File
@@ -1,7 +1,7 @@
import { unlink } from "node:fs/promises";
import { dirname, join } from "node:path";
import { err, ok, type Result } from "@uncaged/workflow";
import { err, garbageCollectCas, ok, type Result } from "@uncaged/workflow";
import { readTextFileIfExists } from "./fs-utils.js";
import { resolveThreadDataPath } from "./thread-scan.js";
@@ -38,5 +38,7 @@ export async function cmdThreadRemove(
await unlink(infoPath).catch(() => {});
await unlink(runningPath).catch(() => {});
await garbageCollectCas(storageRoot);
return ok(undefined);
}
+131
View File
@@ -0,0 +1,131 @@
import { readdir, readFile } from "node:fs/promises";
import { join } from "node:path";
import { type CasStore, createCasStore } from "./cas.js";
import { parseThreadDataJsonl } from "./fork-thread.js";
import { err, ok, type Result } from "./result.js";
import { getGlobalCasDir } from "./storage-root.js";
export type GcResult = {
scannedThreads: number;
activeRefs: number;
deletedEntries: number;
deletedHashes: string[];
};
async function listThreadDataJsonlPaths(storageRoot: string): Promise<Result<string[], string>> {
const logsRoot = join(storageRoot, "logs");
const paths: string[] = [];
let hashes: string[];
try {
hashes = await readdir(logsRoot);
} catch (e) {
const errObj = e as NodeJS.ErrnoException;
if (errObj.code === "ENOENT") {
return ok([]);
}
return err(`failed to read logs directory: ${String(e)}`);
}
for (const hash of hashes) {
const dir = join(logsRoot, hash);
let entries: string[];
try {
entries = await readdir(dir);
} catch {
continue;
}
for (const fileName of entries) {
if (fileName.endsWith(".data.jsonl")) {
paths.push(join(dir, fileName));
}
}
}
paths.sort();
return ok(paths);
}
async function collectActiveRefsFromDataPaths(
dataPaths: string[],
): Promise<Result<Set<string>, string>> {
const activeRefs = new Set<string>();
for (const dataPath of dataPaths) {
let text: string;
try {
text = await readFile(dataPath, "utf8");
} catch (e) {
return err(`failed to read ${dataPath}: ${String(e)}`);
}
const parsed = parseThreadDataJsonl(text);
if (!parsed.ok) {
return err(`${dataPath}: ${parsed.error}`);
}
for (const step of parsed.value.roleSteps) {
for (const ref of step.refs) {
activeRefs.add(ref);
}
}
}
return ok(activeRefs);
}
async function deleteCasNotInSet(
cas: CasStore,
activeRefs: Set<string>,
): Promise<Result<string[], string>> {
let listed: string[];
try {
listed = await cas.list();
} catch (e) {
return err(`failed to list cas entries: ${String(e)}`);
}
const deletedHashes: string[] = [];
for (const hash of listed) {
if (activeRefs.has(hash)) {
continue;
}
try {
await cas.delete(hash);
} catch (e) {
return err(`failed to delete cas ${hash}: ${String(e)}`);
}
deletedHashes.push(hash);
}
deletedHashes.sort();
return ok(deletedHashes);
}
/**
* Mark-and-sweep CAS GC: collect `refs` from all thread `.data.jsonl` files under `storageRoot`,
* then delete CAS blobs not referenced by any surviving thread data.
*/
export async function garbageCollectCas(storageRoot: string): Promise<Result<GcResult, string>> {
const pathsResult = await listThreadDataJsonlPaths(storageRoot);
if (!pathsResult.ok) {
return pathsResult;
}
const paths = pathsResult.value;
const refsResult = await collectActiveRefsFromDataPaths(paths);
if (!refsResult.ok) {
return refsResult;
}
const activeRefs = refsResult.value;
const cas = createCasStore(getGlobalCasDir(storageRoot));
const deletedResult = await deleteCasNotInSet(cas, activeRefs);
if (!deletedResult.ok) {
return deletedResult;
}
const deletedHashes = deletedResult.value;
return ok({
scannedThreads: paths.length,
activeRefs: activeRefs.size,
deletedEntries: deletedHashes.length,
deletedHashes,
});
}
+1
View File
@@ -25,6 +25,7 @@ export {
parseThreadDataJsonl,
selectForkHistoricalSteps,
} from "./fork-thread.js";
export { type GcResult, garbageCollectCas } from "./gc.js";
export { stringifyWorkflowDescriptor } from "./generate-descriptor.js";
export { hashString, hashWorkflowBundleBytes } from "./hash.js";
export {