diff --git a/packages/cli-workflow/src/cli-dispatch.ts b/packages/cli-workflow/src/cli-dispatch.ts index 2032eac..b88a54a 100644 --- a/packages/cli-workflow/src/cli-dispatch.ts +++ b/packages/cli-workflow/src/cli-dispatch.ts @@ -1,5 +1,6 @@ import { printCliError, printCliLine, printCliWarn } from "./cli-output.js"; import { cmdAdd, formatAddSuccess, parseAddArgv } from "./cmd-add.js"; +import { cmdCasGet, cmdCasList, cmdCasPut, cmdCasRm } from "./cmd-cas.js"; import { cmdFork, parseForkArgv } from "./cmd-fork.js"; import { cmdHistory } from "./cmd-history.js"; import { cmdKill } from "./cmd-kill.js"; @@ -33,6 +34,10 @@ function usage(): string { " uncaged-workflow thread ", " uncaged-workflow thread rm ", " uncaged-workflow fork [--from-role ]", + " uncaged-workflow cas get ", + " uncaged-workflow cas put ", + " uncaged-workflow cas list ", + " uncaged-workflow cas rm ", ].join("\n"); } @@ -276,6 +281,95 @@ async function dispatchFork(storageRoot: string, argv: string[]): Promise { + const threadId = rest[0]; + const hash = rest[1]; + if (threadId === undefined || hash === undefined || rest.length > 2) { + printCliError(`${usage()}\n\nerror: cas get requires `); + return 1; + } + const result = await cmdCasGet(storageRoot, threadId, hash); + if (!result.ok) { + printCliError(result.error); + return 1; + } + printCliLine(result.value); + return 0; +} + +async function dispatchCasPut(storageRoot: string, rest: string[]): Promise { + const threadId = rest[0]; + const content = rest[1]; + if (threadId === undefined || content === undefined || rest.length > 2) { + printCliError(`${usage()}\n\nerror: cas put requires `); + return 1; + } + const result = await cmdCasPut(storageRoot, threadId, content); + if (!result.ok) { + printCliError(result.error); + return 1; + } + printCliLine(result.value); + return 0; +} + +async function dispatchCasList(storageRoot: string, rest: string[]): Promise { + const threadId = rest[0]; + if (threadId === undefined || rest.length > 1) { + printCliError(`${usage()}\n\nerror: cas list requires `); + return 1; + } + const result = await cmdCasList(storageRoot, threadId); + if (!result.ok) { + printCliError(result.error); + return 1; + } + for (const hash of result.value) { + printCliLine(hash); + } + return 0; +} + +async function dispatchCasRm(storageRoot: string, rest: string[]): Promise { + const threadId = rest[0]; + const hash = rest[1]; + if (threadId === undefined || hash === undefined || rest.length > 2) { + printCliError(`${usage()}\n\nerror: cas rm requires `); + return 1; + } + const result = await cmdCasRm(storageRoot, threadId, hash); + if (!result.ok) { + printCliError(result.error); + return 1; + } + printCliLine(`removed cas entry ${hash}`); + return 0; +} + +const CAS_SUBCOMMAND_TABLE: Record< + string, + (storageRoot: string, rest: string[]) => Promise +> = { + get: dispatchCasGet, + put: dispatchCasPut, + list: dispatchCasList, + rm: dispatchCasRm, +}; + +async function dispatchCas(storageRoot: string, argv: string[]): Promise { + const sub = argv[0]; + if (sub === undefined) { + printCliError(`${usage()}\n\nerror: unknown cas subcommand: (none)`); + return 1; + } + const handler = CAS_SUBCOMMAND_TABLE[sub]; + if (handler === undefined) { + printCliError(`${usage()}\n\nerror: unknown cas subcommand: ${sub}`); + return 1; + } + return handler(storageRoot, argv.slice(1)); +} + type DispatchFn = (storageRoot: string, argv: string[]) => Promise; const COMMAND_TABLE: Record = { @@ -293,6 +387,7 @@ const COMMAND_TABLE: Record = { threads: dispatchThreads, thread: dispatchThreadBranch, fork: dispatchFork, + cas: dispatchCas, }; export async function runCli(storageRoot: string, argv: string[]): Promise { diff --git a/packages/cli-workflow/src/cmd-cas.ts b/packages/cli-workflow/src/cmd-cas.ts new file mode 100644 index 0000000..b62a427 --- /dev/null +++ b/packages/cli-workflow/src/cmd-cas.ts @@ -0,0 +1,67 @@ +import { dirname, join } from "node:path"; + +import { createThreadCas, err, ok, type Result } from "@uncaged/workflow"; + +import { resolveThreadDataPath } from "./thread-scan.js"; + +function resolveCasDir(threadDataPath: string, threadId: string): string { + return join(dirname(threadDataPath), `${threadId}.cas`); +} + +export async function cmdCasGet( + storageRoot: string, + threadId: string, + hash: string, +): Promise> { + const dataPath = await resolveThreadDataPath(storageRoot, threadId); + if (dataPath === null) { + return err(`thread not found: ${threadId}`); + } + const cas = createThreadCas(resolveCasDir(dataPath, threadId)); + const content = await cas.get(hash); + if (content === null) { + return err(`cas entry not found: ${hash}`); + } + return ok(content); +} + +export async function cmdCasPut( + storageRoot: string, + threadId: string, + content: string, +): Promise> { + const dataPath = await resolveThreadDataPath(storageRoot, threadId); + if (dataPath === null) { + return err(`thread not found: ${threadId}`); + } + const cas = createThreadCas(resolveCasDir(dataPath, threadId)); + const hash = await cas.put(content); + return ok(hash); +} + +export async function cmdCasList( + storageRoot: string, + threadId: string, +): Promise> { + const dataPath = await resolveThreadDataPath(storageRoot, threadId); + if (dataPath === null) { + return err(`thread not found: ${threadId}`); + } + const cas = createThreadCas(resolveCasDir(dataPath, threadId)); + const hashes = await cas.list(); + return ok(hashes); +} + +export async function cmdCasRm( + storageRoot: string, + threadId: string, + hash: string, +): Promise> { + const dataPath = await resolveThreadDataPath(storageRoot, threadId); + if (dataPath === null) { + return err(`thread not found: ${threadId}`); + } + const cas = createThreadCas(resolveCasDir(dataPath, threadId)); + await cas.delete(hash); + return ok(undefined); +} diff --git a/packages/cli-workflow/src/cmd-thread.ts b/packages/cli-workflow/src/cmd-thread.ts index af5c6b1..1422d89 100644 --- a/packages/cli-workflow/src/cmd-thread.ts +++ b/packages/cli-workflow/src/cmd-thread.ts @@ -1,4 +1,4 @@ -import { unlink } from "node:fs/promises"; +import { rm, unlink } from "node:fs/promises"; import { dirname, join } from "node:path"; import { err, ok, type Result } from "@uncaged/workflow"; @@ -33,10 +33,12 @@ export async function cmdThreadRemove( const dir = dirname(dataPath); const infoPath = join(dir, `${threadId}.info.jsonl`); const runningPath = join(dir, `${threadId}.running`); + const casPath = join(dir, `${threadId}.cas`); await unlink(dataPath); await unlink(infoPath).catch(() => {}); await unlink(runningPath).catch(() => {}); + await rm(casPath, { recursive: true, force: true }); return ok(undefined); } diff --git a/packages/workflow-template-solve-issue/__tests__/solve-issue-template.test.ts b/packages/workflow-template-solve-issue/__tests__/solve-issue-template.test.ts index e1cd604..2dd12f9 100644 --- a/packages/workflow-template-solve-issue/__tests__/solve-issue-template.test.ts +++ b/packages/workflow-template-solve-issue/__tests__/solve-issue-template.test.ts @@ -16,11 +16,25 @@ import { createSolveIssueRun, solveIssueModerator } from "../src/index.js"; import type { SolveIssueMeta } from "../src/roles.js"; const DEFAULT_PHASES: PlannerMeta["phases"] = [ - { hash: "4KNMR2PX", title: "Do the work", name: "phase-a", description: "Do the work", acceptance: "Done" }, + { + hash: "4KNMR2PX", + title: "Do the work", + name: "phase-a", + description: "Do the work", + acceptance: "Done", + }, ]; const EXPECT_PLANNER_META: PlannerMeta = { - phases: [{ hash: "7BQST3VW", title: "placeholder phase", name: "phase-1", description: "placeholder", acceptance: "placeholder" }], + phases: [ + { + hash: "7BQST3VW", + title: "placeholder phase", + name: "phase-1", + description: "placeholder", + acceptance: "placeholder", + }, + ], }; const EXPECT_CODER_META: CoderMeta = { @@ -179,8 +193,20 @@ describe("solveIssueModerator", () => { test("multiple planner phases → coder until all complete, then reviewer", () => { const phases: PlannerMeta["phases"] = [ - { hash: "AA000001", title: "first phase", name: "p1", description: "first", acceptance: "a1" }, - { hash: "AA000002", title: "second phase", name: "p2", description: "second", acceptance: "a2" }, + { + hash: "AA000001", + title: "first phase", + name: "p1", + description: "first", + acceptance: "a1", + }, + { + hash: "AA000002", + title: "second phase", + name: "p2", + description: "second", + acceptance: "a2", + }, ]; expect(solveIssueModerator(makeCtx(20, [plannerStep(phases)]))).toBe("coder"); expect(solveIssueModerator(makeCtx(20, [plannerStep(phases), coderStep("p1")]))).toBe("coder"); @@ -191,10 +217,34 @@ describe("solveIssueModerator", () => { test("one-shot coder reports only last phase name → reviewer (moderator treats as all phases done)", () => { const phases: PlannerMeta["phases"] = [ - { hash: "BB000001", title: "setup branch", name: "setup-branch", description: "branch", acceptance: "branch exists" }, - { hash: "BB000002", title: "write tests", name: "write-tests", description: "tests", acceptance: "tests pass" }, - { hash: "BB000003", title: "verify", name: "verify", description: "verify", acceptance: "ok" }, - { hash: "BB000004", title: "commit and pr", name: "commit-and-pr", description: "pr", acceptance: "pr open" }, + { + hash: "BB000001", + title: "setup branch", + name: "setup-branch", + description: "branch", + acceptance: "branch exists", + }, + { + hash: "BB000002", + title: "write tests", + name: "write-tests", + description: "tests", + acceptance: "tests pass", + }, + { + hash: "BB000003", + title: "verify", + name: "verify", + description: "verify", + acceptance: "ok", + }, + { + hash: "BB000004", + title: "commit and pr", + name: "commit-and-pr", + description: "pr", + acceptance: "pr open", + }, ]; expect( solveIssueModerator(makeCtx(20, [plannerStep(phases), coderStep("commit-and-pr")])), @@ -203,8 +253,20 @@ describe("solveIssueModerator", () => { test("completedPhase sentinel when not a planned name → reviewer", () => { const phases: PlannerMeta["phases"] = [ - { hash: "CC000001", title: "first phase", name: "p1", description: "first", acceptance: "a1" }, - { hash: "CC000002", title: "second phase", name: "p2", description: "second", acceptance: "a2" }, + { + hash: "CC000001", + title: "first phase", + name: "p1", + description: "first", + acceptance: "a1", + }, + { + hash: "CC000002", + title: "second phase", + name: "p2", + description: "second", + acceptance: "a2", + }, ]; expect(solveIssueModerator(makeCtx(20, [plannerStep(phases), coderStep("all-done")]))).toBe( "reviewer", @@ -213,8 +275,20 @@ describe("solveIssueModerator", () => { test("incomplete phases → END when max rounds exhausted", () => { const phases: PlannerMeta["phases"] = [ - { hash: "DD000001", title: "first phase", name: "p1", description: "first", acceptance: "a1" }, - { hash: "DD000002", title: "second phase", name: "p2", description: "second", acceptance: "a2" }, + { + hash: "DD000001", + title: "first phase", + name: "p1", + description: "first", + acceptance: "a1", + }, + { + hash: "DD000002", + title: "second phase", + name: "p2", + description: "second", + acceptance: "a2", + }, ]; const steps: ModeratorContext["steps"] = [plannerStep(phases), coderStep("p1")]; expect(solveIssueModerator(makeCtx(3, steps))).toBe(END); diff --git a/packages/workflow/__tests__/cas.test.ts b/packages/workflow/__tests__/cas.test.ts new file mode 100644 index 0000000..679e242 --- /dev/null +++ b/packages/workflow/__tests__/cas.test.ts @@ -0,0 +1,92 @@ +import { afterEach, beforeEach, describe, expect, test } from "bun:test"; +import { mkdtemp, rm } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; + +import { createThreadCas } from "../src/cas.js"; +import { hashString } from "../src/hash.js"; + +describe("createThreadCas", () => { + let casDir: string; + + beforeEach(async () => { + casDir = await mkdtemp(join(tmpdir(), "cas-test-")); + }); + + afterEach(async () => { + await rm(casDir, { recursive: true, force: true }); + }); + + test("put returns consistent hash for same content", async () => { + const cas = createThreadCas(casDir); + const h1 = await cas.put("hello world"); + const h2 = await cas.put("hello world"); + expect(h1).toBe(h2); + expect(h1).toHaveLength(13); + }); + + test("put returns hash matching hashString", async () => { + const cas = createThreadCas(casDir); + const content = "some content to store"; + const h = await cas.put(content); + expect(h).toBe(hashString(content)); + }); + + test("get returns stored content", async () => { + const cas = createThreadCas(casDir); + const content = "line1\nline2\nline3"; + const h = await cas.put(content); + const retrieved = await cas.get(h); + expect(retrieved).toBe(content); + }); + + test("get returns null for missing hash", async () => { + const cas = createThreadCas(casDir); + const result = await cas.get("0000000000000"); + expect(result).toBeNull(); + }); + + test("delete removes entry", async () => { + const cas = createThreadCas(casDir); + const h = await cas.put("to be deleted"); + await cas.delete(h); + const result = await cas.get(h); + expect(result).toBeNull(); + }); + + test("delete on missing hash does not throw", async () => { + const cas = createThreadCas(casDir); + await cas.delete("0000000000000"); + }); + + test("list returns all stored hashes", async () => { + const cas = createThreadCas(casDir); + const h1 = await cas.put("aaa"); + const h2 = await cas.put("bbb"); + const h3 = await cas.put("ccc"); + const hashes = await cas.list(); + expect(hashes.sort()).toEqual([h1, h2, h3].sort()); + }); + + test("list returns empty array when cas dir does not exist", async () => { + const cas = createThreadCas(join(casDir, "nonexistent")); + const hashes = await cas.list(); + expect(hashes).toEqual([]); + }); + + test("put is idempotent — same content written twice causes no error", async () => { + const cas = createThreadCas(casDir); + const h1 = await cas.put("idempotent"); + const h2 = await cas.put("idempotent"); + expect(h1).toBe(h2); + const content = await cas.get(h1); + expect(content).toBe("idempotent"); + }); + + test("different content produces different hashes", async () => { + const cas = createThreadCas(casDir); + const h1 = await cas.put("alpha"); + const h2 = await cas.put("beta"); + expect(h1).not.toBe(h2); + }); +}); diff --git a/packages/workflow/src/cas.ts b/packages/workflow/src/cas.ts new file mode 100644 index 0000000..8e422ea --- /dev/null +++ b/packages/workflow/src/cas.ts @@ -0,0 +1,70 @@ +import { mkdir, readdir, readFile, rename, unlink, writeFile } from "node:fs/promises"; +import { join } from "node:path"; + +import { hashString } from "./hash.js"; + +export type CasStore = { + put(content: string): Promise; + get(hash: string): Promise; + delete(hash: string): Promise; + list(): Promise; +}; + +export function createThreadCas(casDir: string): CasStore { + async function ensureDir(): Promise { + await mkdir(casDir, { recursive: true }); + } + + function filePath(hash: string): string { + return join(casDir, `${hash}.txt`); + } + + return { + async put(content: string): Promise { + const hash = hashString(content); + await ensureDir(); + const target = filePath(hash); + const tmp = `${target}.tmp.${Date.now()}`; + await writeFile(tmp, content, "utf8"); + await rename(tmp, target); + return hash; + }, + + async get(hash: string): Promise { + try { + return await readFile(filePath(hash), "utf8"); + } catch (e) { + const errObj = e as NodeJS.ErrnoException; + if (errObj.code === "ENOENT") { + return null; + } + throw e; + } + }, + + async delete(hash: string): Promise { + try { + await unlink(filePath(hash)); + } catch (e) { + const errObj = e as NodeJS.ErrnoException; + if (errObj.code === "ENOENT") { + return; + } + throw e; + } + }, + + async list(): Promise { + try { + const entries = await readdir(casDir); + return entries.filter((name) => name.endsWith(".txt")).map((name) => name.slice(0, -4)); + } catch (e) { + const errObj = e as NodeJS.ErrnoException; + if (errObj.code === "ENOENT") { + return []; + } + throw e; + } + }, + }; +} diff --git a/packages/workflow/src/hash.ts b/packages/workflow/src/hash.ts index 7d2c51c..3f85984 100644 --- a/packages/workflow/src/hash.ts +++ b/packages/workflow/src/hash.ts @@ -15,3 +15,10 @@ export function hashWorkflowBundleBytes(data: Uint8Array): string { const digest = XXH.h64(0).update(buf).digest(); return encodeUint64AsCrockford(digestToUint64(digest)); } + +/** XXH64 (seed 0) over a UTF-8 string, encoded as 13-char Crockford Base32. */ +export function hashString(content: string): string { + const buf = Buffer.from(content, "utf8"); + const digest = XXH.h64(0).update(buf).digest(); + return encodeUint64AsCrockford(digestToUint64(digest)); +} diff --git a/packages/workflow/src/index.ts b/packages/workflow/src/index.ts index 47d4564..131de9c 100644 --- a/packages/workflow/src/index.ts +++ b/packages/workflow/src/index.ts @@ -7,6 +7,7 @@ export { } from "./base32.js"; export { buildDescriptor } from "./build-descriptor.js"; export { validateWorkflowBundle, type WorkflowBundleValidationInput } from "./bundle-validator.js"; +export { type CasStore, createThreadCas } from "./cas.js"; export { createWorkflow } from "./create-workflow.js"; export { type ExecuteThreadIo, @@ -25,7 +26,7 @@ export { selectForkHistoricalSteps, } from "./fork-thread.js"; export { stringifyWorkflowDescriptor } from "./generate-descriptor.js"; -export { hashWorkflowBundleBytes } from "./hash.js"; +export { hashString, hashWorkflowBundleBytes } from "./hash.js"; export { type LlmError, llmErrorToCause,