feat(workflow): add thread-scoped CAS (Content-Addressable Storage)

Phase 1 of #23:
- createThreadCas() core API: put/get/delete/list with XXH64 hashing
- hashString() utility for string → 13-char Crockford Base32
- CLI: uncaged-workflow cas get/put/list/rm subcommands
- thread rm now cleans up .cas/ directory
- 10 new tests for CAS operations

Refs #23
This commit is contained in:
2026-05-07 04:30:19 +00:00
parent 172e9b34cc
commit 4b44665c7e
8 changed files with 422 additions and 14 deletions
+95
View File
@@ -1,5 +1,6 @@
import { printCliError, printCliLine, printCliWarn } from "./cli-output.js";
import { cmdAdd, formatAddSuccess, parseAddArgv } from "./cmd-add.js";
import { cmdCasGet, cmdCasList, cmdCasPut, cmdCasRm } from "./cmd-cas.js";
import { cmdFork, parseForkArgv } from "./cmd-fork.js";
import { cmdHistory } from "./cmd-history.js";
import { cmdKill } from "./cmd-kill.js";
@@ -33,6 +34,10 @@ function usage(): string {
" uncaged-workflow thread <id>",
" uncaged-workflow thread rm <id>",
" uncaged-workflow fork <thread-id> [--from-role <role>]",
" uncaged-workflow cas get <thread-id> <hash>",
" uncaged-workflow cas put <thread-id> <content>",
" uncaged-workflow cas list <thread-id>",
" uncaged-workflow cas rm <thread-id> <hash>",
].join("\n");
}
@@ -276,6 +281,95 @@ async function dispatchFork(storageRoot: string, argv: string[]): Promise<number
return 0;
}
async function dispatchCasGet(storageRoot: string, rest: string[]): Promise<number> {
const threadId = rest[0];
const hash = rest[1];
if (threadId === undefined || hash === undefined || rest.length > 2) {
printCliError(`${usage()}\n\nerror: cas get requires <thread-id> <hash>`);
return 1;
}
const result = await cmdCasGet(storageRoot, threadId, hash);
if (!result.ok) {
printCliError(result.error);
return 1;
}
printCliLine(result.value);
return 0;
}
async function dispatchCasPut(storageRoot: string, rest: string[]): Promise<number> {
const threadId = rest[0];
const content = rest[1];
if (threadId === undefined || content === undefined || rest.length > 2) {
printCliError(`${usage()}\n\nerror: cas put requires <thread-id> <content>`);
return 1;
}
const result = await cmdCasPut(storageRoot, threadId, content);
if (!result.ok) {
printCliError(result.error);
return 1;
}
printCliLine(result.value);
return 0;
}
async function dispatchCasList(storageRoot: string, rest: string[]): Promise<number> {
const threadId = rest[0];
if (threadId === undefined || rest.length > 1) {
printCliError(`${usage()}\n\nerror: cas list requires <thread-id>`);
return 1;
}
const result = await cmdCasList(storageRoot, threadId);
if (!result.ok) {
printCliError(result.error);
return 1;
}
for (const hash of result.value) {
printCliLine(hash);
}
return 0;
}
async function dispatchCasRm(storageRoot: string, rest: string[]): Promise<number> {
const threadId = rest[0];
const hash = rest[1];
if (threadId === undefined || hash === undefined || rest.length > 2) {
printCliError(`${usage()}\n\nerror: cas rm requires <thread-id> <hash>`);
return 1;
}
const result = await cmdCasRm(storageRoot, threadId, hash);
if (!result.ok) {
printCliError(result.error);
return 1;
}
printCliLine(`removed cas entry ${hash}`);
return 0;
}
const CAS_SUBCOMMAND_TABLE: Record<
string,
(storageRoot: string, rest: string[]) => Promise<number>
> = {
get: dispatchCasGet,
put: dispatchCasPut,
list: dispatchCasList,
rm: dispatchCasRm,
};
async function dispatchCas(storageRoot: string, argv: string[]): Promise<number> {
const sub = argv[0];
if (sub === undefined) {
printCliError(`${usage()}\n\nerror: unknown cas subcommand: (none)`);
return 1;
}
const handler = CAS_SUBCOMMAND_TABLE[sub];
if (handler === undefined) {
printCliError(`${usage()}\n\nerror: unknown cas subcommand: ${sub}`);
return 1;
}
return handler(storageRoot, argv.slice(1));
}
type DispatchFn = (storageRoot: string, argv: string[]) => Promise<number>;
const COMMAND_TABLE: Record<string, DispatchFn> = {
@@ -293,6 +387,7 @@ const COMMAND_TABLE: Record<string, DispatchFn> = {
threads: dispatchThreads,
thread: dispatchThreadBranch,
fork: dispatchFork,
cas: dispatchCas,
};
export async function runCli(storageRoot: string, argv: string[]): Promise<number> {
+67
View File
@@ -0,0 +1,67 @@
import { dirname, join } from "node:path";
import { createThreadCas, err, ok, type Result } from "@uncaged/workflow";
import { resolveThreadDataPath } from "./thread-scan.js";
function resolveCasDir(threadDataPath: string, threadId: string): string {
return join(dirname(threadDataPath), `${threadId}.cas`);
}
export async function cmdCasGet(
storageRoot: string,
threadId: string,
hash: string,
): Promise<Result<string, string>> {
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
return err(`thread not found: ${threadId}`);
}
const cas = createThreadCas(resolveCasDir(dataPath, threadId));
const content = await cas.get(hash);
if (content === null) {
return err(`cas entry not found: ${hash}`);
}
return ok(content);
}
export async function cmdCasPut(
storageRoot: string,
threadId: string,
content: string,
): Promise<Result<string, string>> {
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
return err(`thread not found: ${threadId}`);
}
const cas = createThreadCas(resolveCasDir(dataPath, threadId));
const hash = await cas.put(content);
return ok(hash);
}
export async function cmdCasList(
storageRoot: string,
threadId: string,
): Promise<Result<string[], string>> {
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
return err(`thread not found: ${threadId}`);
}
const cas = createThreadCas(resolveCasDir(dataPath, threadId));
const hashes = await cas.list();
return ok(hashes);
}
export async function cmdCasRm(
storageRoot: string,
threadId: string,
hash: string,
): Promise<Result<void, string>> {
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
return err(`thread not found: ${threadId}`);
}
const cas = createThreadCas(resolveCasDir(dataPath, threadId));
await cas.delete(hash);
return ok(undefined);
}
+3 -1
View File
@@ -1,4 +1,4 @@
import { unlink } from "node:fs/promises";
import { rm, unlink } from "node:fs/promises";
import { dirname, join } from "node:path";
import { err, ok, type Result } from "@uncaged/workflow";
@@ -33,10 +33,12 @@ export async function cmdThreadRemove(
const dir = dirname(dataPath);
const infoPath = join(dir, `${threadId}.info.jsonl`);
const runningPath = join(dir, `${threadId}.running`);
const casPath = join(dir, `${threadId}.cas`);
await unlink(dataPath);
await unlink(infoPath).catch(() => {});
await unlink(runningPath).catch(() => {});
await rm(casPath, { recursive: true, force: true });
return ok(undefined);
}
@@ -16,11 +16,25 @@ import { createSolveIssueRun, solveIssueModerator } from "../src/index.js";
import type { SolveIssueMeta } from "../src/roles.js";
const DEFAULT_PHASES: PlannerMeta["phases"] = [
{ hash: "4KNMR2PX", title: "Do the work", name: "phase-a", description: "Do the work", acceptance: "Done" },
{
hash: "4KNMR2PX",
title: "Do the work",
name: "phase-a",
description: "Do the work",
acceptance: "Done",
},
];
const EXPECT_PLANNER_META: PlannerMeta = {
phases: [{ hash: "7BQST3VW", title: "placeholder phase", name: "phase-1", description: "placeholder", acceptance: "placeholder" }],
phases: [
{
hash: "7BQST3VW",
title: "placeholder phase",
name: "phase-1",
description: "placeholder",
acceptance: "placeholder",
},
],
};
const EXPECT_CODER_META: CoderMeta = {
@@ -179,8 +193,20 @@ describe("solveIssueModerator", () => {
test("multiple planner phases → coder until all complete, then reviewer", () => {
const phases: PlannerMeta["phases"] = [
{ hash: "AA000001", title: "first phase", name: "p1", description: "first", acceptance: "a1" },
{ hash: "AA000002", title: "second phase", name: "p2", description: "second", acceptance: "a2" },
{
hash: "AA000001",
title: "first phase",
name: "p1",
description: "first",
acceptance: "a1",
},
{
hash: "AA000002",
title: "second phase",
name: "p2",
description: "second",
acceptance: "a2",
},
];
expect(solveIssueModerator(makeCtx(20, [plannerStep(phases)]))).toBe("coder");
expect(solveIssueModerator(makeCtx(20, [plannerStep(phases), coderStep("p1")]))).toBe("coder");
@@ -191,10 +217,34 @@ describe("solveIssueModerator", () => {
test("one-shot coder reports only last phase name → reviewer (moderator treats as all phases done)", () => {
const phases: PlannerMeta["phases"] = [
{ hash: "BB000001", title: "setup branch", name: "setup-branch", description: "branch", acceptance: "branch exists" },
{ hash: "BB000002", title: "write tests", name: "write-tests", description: "tests", acceptance: "tests pass" },
{ hash: "BB000003", title: "verify", name: "verify", description: "verify", acceptance: "ok" },
{ hash: "BB000004", title: "commit and pr", name: "commit-and-pr", description: "pr", acceptance: "pr open" },
{
hash: "BB000001",
title: "setup branch",
name: "setup-branch",
description: "branch",
acceptance: "branch exists",
},
{
hash: "BB000002",
title: "write tests",
name: "write-tests",
description: "tests",
acceptance: "tests pass",
},
{
hash: "BB000003",
title: "verify",
name: "verify",
description: "verify",
acceptance: "ok",
},
{
hash: "BB000004",
title: "commit and pr",
name: "commit-and-pr",
description: "pr",
acceptance: "pr open",
},
];
expect(
solveIssueModerator(makeCtx(20, [plannerStep(phases), coderStep("commit-and-pr")])),
@@ -203,8 +253,20 @@ describe("solveIssueModerator", () => {
test("completedPhase sentinel when not a planned name → reviewer", () => {
const phases: PlannerMeta["phases"] = [
{ hash: "CC000001", title: "first phase", name: "p1", description: "first", acceptance: "a1" },
{ hash: "CC000002", title: "second phase", name: "p2", description: "second", acceptance: "a2" },
{
hash: "CC000001",
title: "first phase",
name: "p1",
description: "first",
acceptance: "a1",
},
{
hash: "CC000002",
title: "second phase",
name: "p2",
description: "second",
acceptance: "a2",
},
];
expect(solveIssueModerator(makeCtx(20, [plannerStep(phases), coderStep("all-done")]))).toBe(
"reviewer",
@@ -213,8 +275,20 @@ describe("solveIssueModerator", () => {
test("incomplete phases → END when max rounds exhausted", () => {
const phases: PlannerMeta["phases"] = [
{ hash: "DD000001", title: "first phase", name: "p1", description: "first", acceptance: "a1" },
{ hash: "DD000002", title: "second phase", name: "p2", description: "second", acceptance: "a2" },
{
hash: "DD000001",
title: "first phase",
name: "p1",
description: "first",
acceptance: "a1",
},
{
hash: "DD000002",
title: "second phase",
name: "p2",
description: "second",
acceptance: "a2",
},
];
const steps: ModeratorContext<SolveIssueMeta>["steps"] = [plannerStep(phases), coderStep("p1")];
expect(solveIssueModerator(makeCtx(3, steps))).toBe(END);
+92
View File
@@ -0,0 +1,92 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createThreadCas } from "../src/cas.js";
import { hashString } from "../src/hash.js";
describe("createThreadCas", () => {
let casDir: string;
beforeEach(async () => {
casDir = await mkdtemp(join(tmpdir(), "cas-test-"));
});
afterEach(async () => {
await rm(casDir, { recursive: true, force: true });
});
test("put returns consistent hash for same content", async () => {
const cas = createThreadCas(casDir);
const h1 = await cas.put("hello world");
const h2 = await cas.put("hello world");
expect(h1).toBe(h2);
expect(h1).toHaveLength(13);
});
test("put returns hash matching hashString", async () => {
const cas = createThreadCas(casDir);
const content = "some content to store";
const h = await cas.put(content);
expect(h).toBe(hashString(content));
});
test("get returns stored content", async () => {
const cas = createThreadCas(casDir);
const content = "line1\nline2\nline3";
const h = await cas.put(content);
const retrieved = await cas.get(h);
expect(retrieved).toBe(content);
});
test("get returns null for missing hash", async () => {
const cas = createThreadCas(casDir);
const result = await cas.get("0000000000000");
expect(result).toBeNull();
});
test("delete removes entry", async () => {
const cas = createThreadCas(casDir);
const h = await cas.put("to be deleted");
await cas.delete(h);
const result = await cas.get(h);
expect(result).toBeNull();
});
test("delete on missing hash does not throw", async () => {
const cas = createThreadCas(casDir);
await cas.delete("0000000000000");
});
test("list returns all stored hashes", async () => {
const cas = createThreadCas(casDir);
const h1 = await cas.put("aaa");
const h2 = await cas.put("bbb");
const h3 = await cas.put("ccc");
const hashes = await cas.list();
expect(hashes.sort()).toEqual([h1, h2, h3].sort());
});
test("list returns empty array when cas dir does not exist", async () => {
const cas = createThreadCas(join(casDir, "nonexistent"));
const hashes = await cas.list();
expect(hashes).toEqual([]);
});
test("put is idempotent — same content written twice causes no error", async () => {
const cas = createThreadCas(casDir);
const h1 = await cas.put("idempotent");
const h2 = await cas.put("idempotent");
expect(h1).toBe(h2);
const content = await cas.get(h1);
expect(content).toBe("idempotent");
});
test("different content produces different hashes", async () => {
const cas = createThreadCas(casDir);
const h1 = await cas.put("alpha");
const h2 = await cas.put("beta");
expect(h1).not.toBe(h2);
});
});
+70
View File
@@ -0,0 +1,70 @@
import { mkdir, readdir, readFile, rename, unlink, writeFile } from "node:fs/promises";
import { join } from "node:path";
import { hashString } from "./hash.js";
export type CasStore = {
put(content: string): Promise<string>;
get(hash: string): Promise<string | null>;
delete(hash: string): Promise<void>;
list(): Promise<string[]>;
};
export function createThreadCas(casDir: string): CasStore {
async function ensureDir(): Promise<void> {
await mkdir(casDir, { recursive: true });
}
function filePath(hash: string): string {
return join(casDir, `${hash}.txt`);
}
return {
async put(content: string): Promise<string> {
const hash = hashString(content);
await ensureDir();
const target = filePath(hash);
const tmp = `${target}.tmp.${Date.now()}`;
await writeFile(tmp, content, "utf8");
await rename(tmp, target);
return hash;
},
async get(hash: string): Promise<string | null> {
try {
return await readFile(filePath(hash), "utf8");
} catch (e) {
const errObj = e as NodeJS.ErrnoException;
if (errObj.code === "ENOENT") {
return null;
}
throw e;
}
},
async delete(hash: string): Promise<void> {
try {
await unlink(filePath(hash));
} catch (e) {
const errObj = e as NodeJS.ErrnoException;
if (errObj.code === "ENOENT") {
return;
}
throw e;
}
},
async list(): Promise<string[]> {
try {
const entries = await readdir(casDir);
return entries.filter((name) => name.endsWith(".txt")).map((name) => name.slice(0, -4));
} catch (e) {
const errObj = e as NodeJS.ErrnoException;
if (errObj.code === "ENOENT") {
return [];
}
throw e;
}
},
};
}
+7
View File
@@ -15,3 +15,10 @@ export function hashWorkflowBundleBytes(data: Uint8Array): string {
const digest = XXH.h64(0).update(buf).digest();
return encodeUint64AsCrockford(digestToUint64(digest));
}
/** XXH64 (seed 0) over a UTF-8 string, encoded as 13-char Crockford Base32. */
export function hashString(content: string): string {
const buf = Buffer.from(content, "utf8");
const digest = XXH.h64(0).update(buf).digest();
return encodeUint64AsCrockford(digestToUint64(digest));
}
+2 -1
View File
@@ -7,6 +7,7 @@ export {
} from "./base32.js";
export { buildDescriptor } from "./build-descriptor.js";
export { validateWorkflowBundle, type WorkflowBundleValidationInput } from "./bundle-validator.js";
export { type CasStore, createThreadCas } from "./cas.js";
export { createWorkflow } from "./create-workflow.js";
export {
type ExecuteThreadIo,
@@ -25,7 +26,7 @@ export {
selectForkHistoricalSteps,
} from "./fork-thread.js";
export { stringifyWorkflowDescriptor } from "./generate-descriptor.js";
export { hashWorkflowBundleBytes } from "./hash.js";
export { hashString, hashWorkflowBundleBytes } from "./hash.js";
export {
type LlmError,
llmErrorToCause,