From 6488b7bbb49a14e946e2c324bc054d0ebb72debc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E6=A9=98?= Date: Thu, 7 May 2026 10:47:52 +0000 Subject: [PATCH] feat: CAS garbage collection - garbageCollectCas() mark-and-sweep: scan .data.jsonl refs, delete orphans - 'uncaged-workflow gc' CLI command - thread rm triggers GC automatically - 141 tests passing Fixes #32 --- .../cli-workflow/__tests__/gc-cli.test.ts | 144 ++++++++++++++++++ .../cli-workflow/__tests__/thread-cli.test.ts | 4 +- packages/cli-workflow/src/cli-dispatch.ts | 20 +++ packages/cli-workflow/src/cmd-gc.ts | 5 + packages/cli-workflow/src/cmd-thread.ts | 4 +- packages/workflow/src/gc.ts | 131 ++++++++++++++++ packages/workflow/src/index.ts | 1 + 7 files changed, 306 insertions(+), 3 deletions(-) create mode 100644 packages/cli-workflow/__tests__/gc-cli.test.ts create mode 100644 packages/cli-workflow/src/cmd-gc.ts create mode 100644 packages/workflow/src/gc.ts diff --git a/packages/cli-workflow/__tests__/gc-cli.test.ts b/packages/cli-workflow/__tests__/gc-cli.test.ts new file mode 100644 index 0000000..2a7362e --- /dev/null +++ b/packages/cli-workflow/__tests__/gc-cli.test.ts @@ -0,0 +1,144 @@ +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import { spawnSync } from "node:child_process"; +import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { fileURLToPath } from "node:url"; +import { createCasStore, garbageCollectCas, getGlobalCasDir } from "@uncaged/workflow"; +import { cmdThreadRemove } from "../src/cmd-thread.js"; +import { pathExists } from "../src/fs-utils.js"; + +const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url)); + +/** Minimal valid `.data.jsonl` with one role step referencing `activeHash` in `refs`. */ +function makeDataJsonl(threadId: string, bundleHash: string, activeHash: string): string { + return [ + JSON.stringify({ + name: "demo", + hash: bundleHash, + threadId, + parameters: { prompt: "hi", options: { maxRounds: 5 } }, + timestamp: 100, + }), + JSON.stringify({ + role: "planner", + content: "p", + meta: {}, + refs: [activeHash], + timestamp: 101, + }), + "", + ].join("\n"); +} + +describe("gc cli and garbageCollectCas", () => { + let prevEnv: string | undefined; + let storageRoot: string; + + beforeEach(async () => { + prevEnv = process.env.UNCAGED_WORKFLOW_STORAGE_ROOT; + storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-gc-")); + process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = storageRoot; + }); + + afterEach(async () => { + if (prevEnv === undefined) { + delete process.env.UNCAGED_WORKFLOW_STORAGE_ROOT; + } else { + process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = prevEnv; + } + await rm(storageRoot, { recursive: true, force: true }); + }); + + test("garbageCollectCas keeps CAS entries referenced by thread refs", async () => { + const bundleHash = "C9NMV6V2TQT81"; + const threadId = "01AAA1111111111111111111"; + const logsDir = join(storageRoot, "logs", bundleHash); + await mkdir(logsDir, { recursive: true }); + + const cas = createCasStore(getGlobalCasDir(storageRoot)); + const activeHash = await cas.put("active-blob"); + const orphanHash = await cas.put("orphan-blob"); + + await writeFile( + join(logsDir, `${threadId}.data.jsonl`), + makeDataJsonl(threadId, bundleHash, activeHash), + "utf8", + ); + + const gc = await garbageCollectCas(storageRoot); + expect(gc.ok).toBe(true); + if (!gc.ok) { + return; + } + expect(gc.value.scannedThreads).toBe(1); + expect(gc.value.activeRefs).toBe(1); + expect(gc.value.deletedEntries).toBe(1); + expect(gc.value.deletedHashes).toEqual([orphanHash]); + + expect(await pathExists(join(getGlobalCasDir(storageRoot), `${activeHash}.txt`))).toBe(true); + expect(await pathExists(join(getGlobalCasDir(storageRoot), `${orphanHash}.txt`))).toBe(false); + }); + + test("garbageCollectCas deletes orphaned CAS when no threads reference them", async () => { + const cas = createCasStore(getGlobalCasDir(storageRoot)); + const orphanHash = await cas.put("lonely"); + + const gc = await garbageCollectCas(storageRoot); + expect(gc.ok).toBe(true); + if (!gc.ok) { + return; + } + expect(gc.value.scannedThreads).toBe(0); + expect(gc.value.activeRefs).toBe(0); + expect(gc.value.deletedEntries).toBe(1); + expect(gc.value.deletedHashes).toEqual([orphanHash]); + expect(await pathExists(join(getGlobalCasDir(storageRoot), `${orphanHash}.txt`))).toBe(false); + }); + + test("cli gc prints stats", async () => { + const bundleHash = "C9NMV6V2TQT81"; + const threadId = "01BBB2222222222222222222"; + const logsDir = join(storageRoot, "logs", bundleHash); + await mkdir(logsDir, { recursive: true }); + + const cas = createCasStore(getGlobalCasDir(storageRoot)); + const activeHash = await cas.put("keep-me"); + await cas.put("drop-me"); + + await writeFile( + join(logsDir, `${threadId}.data.jsonl`), + makeDataJsonl(threadId, bundleHash, activeHash), + "utf8", + ); + + const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot }; + const proc = spawnSync(process.execPath, [cliEntryPath, "gc"], { env, encoding: "utf8" }); + expect(proc.status).toBe(0); + expect(String(proc.stdout).trim()).toBe("scanned 1 threads, 1 active refs, deleted 1 entries"); + }); + + test("thread rm triggers gc so unreferenced CAS is removed", async () => { + const bundleHash = "C9NMV6V2TQT81"; + const threadId = "01CCC3333333333333333333"; + const logsDir = join(storageRoot, "logs", bundleHash); + await mkdir(logsDir, { recursive: true }); + + const cas = createCasStore(getGlobalCasDir(storageRoot)); + const activeHash = await cas.put("pinned-by-ref"); + await writeFile( + join(logsDir, `${threadId}.data.jsonl`), + makeDataJsonl(threadId, bundleHash, activeHash), + "utf8", + ); + + const orphanHash = await cas.put("orphan-after-rm"); + const orphanPath = join(getGlobalCasDir(storageRoot), `${orphanHash}.txt`); + + const removed = await cmdThreadRemove(storageRoot, threadId); + expect(removed.ok).toBe(true); + + expect(await pathExists(orphanPath)).toBe(false); + expect(await pathExists(join(getGlobalCasDir(storageRoot), `${activeHash}.txt`))).toBe(false); + }); +}); diff --git a/packages/cli-workflow/__tests__/thread-cli.test.ts b/packages/cli-workflow/__tests__/thread-cli.test.ts index 077d51f..f77a924 100644 --- a/packages/cli-workflow/__tests__/thread-cli.test.ts +++ b/packages/cli-workflow/__tests__/thread-cli.test.ts @@ -177,7 +177,7 @@ describe("cli thread commands", () => { expect(await pathExists(dataPath)).toBe(false); }); - test("thread rm does not delete global cas blobs for that thread id", async () => { + test("thread rm runs GC and removes CAS blobs not referenced by any remaining thread", async () => { const bundleDir = join(storageRoot, "src"); await mkdir(bundleDir, { recursive: true }); const bundlePath = join(bundleDir, "demo.esm.js"); @@ -223,7 +223,7 @@ describe("cli thread commands", () => { expect(removed.ok).toBe(true); const stillThere = await readTextFileIfExists(casBlob); - expect(stillThere).toBe("keep-after-thread-rm"); + expect(stillThere).toBeNull(); }); test("cli entrypoint dispatches threads / ps (spawn)", () => { diff --git a/packages/cli-workflow/src/cli-dispatch.ts b/packages/cli-workflow/src/cli-dispatch.ts index b88a54a..044d20f 100644 --- a/packages/cli-workflow/src/cli-dispatch.ts +++ b/packages/cli-workflow/src/cli-dispatch.ts @@ -2,6 +2,7 @@ import { printCliError, printCliLine, printCliWarn } from "./cli-output.js"; import { cmdAdd, formatAddSuccess, parseAddArgv } from "./cmd-add.js"; import { cmdCasGet, cmdCasList, cmdCasPut, cmdCasRm } from "./cmd-cas.js"; import { cmdFork, parseForkArgv } from "./cmd-fork.js"; +import { cmdGc } from "./cmd-gc.js"; import { cmdHistory } from "./cmd-history.js"; import { cmdKill } from "./cmd-kill.js"; import { cmdList, formatListLines } from "./cmd-list.js"; @@ -34,6 +35,7 @@ function usage(): string { " uncaged-workflow thread ", " uncaged-workflow thread rm ", " uncaged-workflow fork [--from-role ]", + " uncaged-workflow gc", " uncaged-workflow cas get ", " uncaged-workflow cas put ", " uncaged-workflow cas list ", @@ -266,6 +268,23 @@ async function dispatchThreadBranch(storageRoot: string, rest: string[]): Promis return dispatchThread(storageRoot, rest); } +async function dispatchGc(storageRoot: string, argv: string[]): Promise { + if (argv.length > 0) { + printCliError(`${usage()}\n\nerror: gc takes no arguments`); + return 1; + } + const result = await cmdGc(storageRoot); + if (!result.ok) { + printCliError(result.error); + return 1; + } + const stats = result.value; + printCliLine( + `scanned ${stats.scannedThreads} threads, ${stats.activeRefs} active refs, deleted ${stats.deletedEntries} entries`, + ); + return 0; +} + async function dispatchFork(storageRoot: string, argv: string[]): Promise { const parsed = parseForkArgv(argv); if (!parsed.ok) { @@ -387,6 +406,7 @@ const COMMAND_TABLE: Record = { threads: dispatchThreads, thread: dispatchThreadBranch, fork: dispatchFork, + gc: dispatchGc, cas: dispatchCas, }; diff --git a/packages/cli-workflow/src/cmd-gc.ts b/packages/cli-workflow/src/cmd-gc.ts new file mode 100644 index 0000000..cf1aab9 --- /dev/null +++ b/packages/cli-workflow/src/cmd-gc.ts @@ -0,0 +1,5 @@ +import { type GcResult, garbageCollectCas, type Result } from "@uncaged/workflow"; + +export async function cmdGc(storageRoot: string): Promise> { + return garbageCollectCas(storageRoot); +} diff --git a/packages/cli-workflow/src/cmd-thread.ts b/packages/cli-workflow/src/cmd-thread.ts index af5c6b1..ec01385 100644 --- a/packages/cli-workflow/src/cmd-thread.ts +++ b/packages/cli-workflow/src/cmd-thread.ts @@ -1,7 +1,7 @@ import { unlink } from "node:fs/promises"; import { dirname, join } from "node:path"; -import { err, ok, type Result } from "@uncaged/workflow"; +import { err, garbageCollectCas, ok, type Result } from "@uncaged/workflow"; import { readTextFileIfExists } from "./fs-utils.js"; import { resolveThreadDataPath } from "./thread-scan.js"; @@ -38,5 +38,7 @@ export async function cmdThreadRemove( await unlink(infoPath).catch(() => {}); await unlink(runningPath).catch(() => {}); + await garbageCollectCas(storageRoot); + return ok(undefined); } diff --git a/packages/workflow/src/gc.ts b/packages/workflow/src/gc.ts new file mode 100644 index 0000000..ef444a3 --- /dev/null +++ b/packages/workflow/src/gc.ts @@ -0,0 +1,131 @@ +import { readdir, readFile } from "node:fs/promises"; +import { join } from "node:path"; + +import { type CasStore, createCasStore } from "./cas.js"; +import { parseThreadDataJsonl } from "./fork-thread.js"; +import { err, ok, type Result } from "./result.js"; +import { getGlobalCasDir } from "./storage-root.js"; + +export type GcResult = { + scannedThreads: number; + activeRefs: number; + deletedEntries: number; + deletedHashes: string[]; +}; + +async function listThreadDataJsonlPaths(storageRoot: string): Promise> { + const logsRoot = join(storageRoot, "logs"); + const paths: string[] = []; + let hashes: string[]; + try { + hashes = await readdir(logsRoot); + } catch (e) { + const errObj = e as NodeJS.ErrnoException; + if (errObj.code === "ENOENT") { + return ok([]); + } + return err(`failed to read logs directory: ${String(e)}`); + } + + for (const hash of hashes) { + const dir = join(logsRoot, hash); + let entries: string[]; + try { + entries = await readdir(dir); + } catch { + continue; + } + for (const fileName of entries) { + if (fileName.endsWith(".data.jsonl")) { + paths.push(join(dir, fileName)); + } + } + } + + paths.sort(); + return ok(paths); +} + +async function collectActiveRefsFromDataPaths( + dataPaths: string[], +): Promise, string>> { + const activeRefs = new Set(); + for (const dataPath of dataPaths) { + let text: string; + try { + text = await readFile(dataPath, "utf8"); + } catch (e) { + return err(`failed to read ${dataPath}: ${String(e)}`); + } + const parsed = parseThreadDataJsonl(text); + if (!parsed.ok) { + return err(`${dataPath}: ${parsed.error}`); + } + for (const step of parsed.value.roleSteps) { + for (const ref of step.refs) { + activeRefs.add(ref); + } + } + } + return ok(activeRefs); +} + +async function deleteCasNotInSet( + cas: CasStore, + activeRefs: Set, +): Promise> { + let listed: string[]; + try { + listed = await cas.list(); + } catch (e) { + return err(`failed to list cas entries: ${String(e)}`); + } + + const deletedHashes: string[] = []; + for (const hash of listed) { + if (activeRefs.has(hash)) { + continue; + } + try { + await cas.delete(hash); + } catch (e) { + return err(`failed to delete cas ${hash}: ${String(e)}`); + } + deletedHashes.push(hash); + } + + deletedHashes.sort(); + return ok(deletedHashes); +} + +/** + * Mark-and-sweep CAS GC: collect `refs` from all thread `.data.jsonl` files under `storageRoot`, + * then delete CAS blobs not referenced by any surviving thread data. + */ +export async function garbageCollectCas(storageRoot: string): Promise> { + const pathsResult = await listThreadDataJsonlPaths(storageRoot); + if (!pathsResult.ok) { + return pathsResult; + } + const paths = pathsResult.value; + + const refsResult = await collectActiveRefsFromDataPaths(paths); + if (!refsResult.ok) { + return refsResult; + } + const activeRefs = refsResult.value; + + const cas = createCasStore(getGlobalCasDir(storageRoot)); + const deletedResult = await deleteCasNotInSet(cas, activeRefs); + if (!deletedResult.ok) { + return deletedResult; + } + const deletedHashes = deletedResult.value; + + return ok({ + scannedThreads: paths.length, + activeRefs: activeRefs.size, + deletedEntries: deletedHashes.length, + deletedHashes, + }); +} diff --git a/packages/workflow/src/index.ts b/packages/workflow/src/index.ts index 6fdfae4..9c039be 100644 --- a/packages/workflow/src/index.ts +++ b/packages/workflow/src/index.ts @@ -25,6 +25,7 @@ export { parseThreadDataJsonl, selectForkHistoricalSteps, } from "./fork-thread.js"; +export { type GcResult, garbageCollectCas } from "./gc.js"; export { stringifyWorkflowDescriptor } from "./generate-descriptor.js"; export { hashString, hashWorkflowBundleBytes } from "./hash.js"; export {