Compare commits

..

5 Commits

Author SHA1 Message Date
xiaoju 30e4e99908 feat: add refs tracking to RoleStep
- RoleOutput gains refs: string[] for CAS reference tracking
- RoleDefinition gains extractRefs: ((meta) => string[]) | null
- planner: phases.map(p => p.hash), coder: [completedPhase]
- Engine persists refs, fork preserves refs
- Backward compat: missing refs normalized to []
- 137 tests passing

Fixes #31
2026-05-07 10:44:25 +00:00
xiaoju a3c70a5041 Merge pull request 'feat: migrate CAS to global storage' (#34) from feat/30-global-cas into main 2026-05-07 10:40:41 +00:00
xiaoju 12d58a8206 feat: migrate CAS to global storage
- Add getGlobalCasDir() to storage-root.ts
- cmd-cas.ts uses global CAS dir, threadId kept for CLI compat
- thread rm no longer deletes .cas/ directories
- Rename createThreadCas → createCasStore (deprecated alias kept)
- 134 tests passing

BREAKING: CAS moves from <thread>.cas/ to <storageRoot>/cas/

Fixes #30
2026-05-07 10:40:14 +00:00
xiaoju c096f4d94e docs: add workflow-as-agent implementation plan
Covers 4 phases:
1. Global CAS migration
2. RoleStep refs tracking
3. CAS garbage collection
4. workflowAsAgent(name) factory

Refs #25
2026-05-07 10:35:26 +00:00
xiaoju 500401d93c feat(workflow): add preparer role to solve-issue workflow (#29, closes #28) 2026-05-07 10:18:05 +00:00
28 changed files with 732 additions and 57 deletions
+315
View File
@@ -0,0 +1,315 @@
# Workflow-as-Agent Implementation Plan
> **For Hermes:** Use subagent-driven-development skill to implement this plan task-by-task.
**Goal:** Enable workflows to invoke other workflows as agents, backed by global CAS and refs tracking.
**Architecture:** Migrate CAS from thread-local to global (`~/.uncaged/workflow/cas/`), add `refs` to RoleStep for GC traceability, then build `workflowAsAgent(name)` factory that resolves workflow name → bundle via registry and spawns a child thread.
**Tech Stack:** TypeScript, Bun, Zod v4, monorepo with `packages/`
**Issue:** https://git.shazhou.work/uncaged/workflow/issues/25
---
## Phase 1: Global CAS Migration
Move CAS storage from `<threadDir>/<threadId>.cas/` to `~/.uncaged/workflow/cas/` (global, content-addressed, immutable). This is a **breaking change** — thread-local `.cas/` directories are abandoned.
### Task 1.1: Add `globalCasDir` helper to `storage-root.ts`
**Objective:** Provide a single function that returns the global CAS directory path.
**Files:**
- Modify: `packages/workflow/src/storage-root.ts`
- Test: `packages/workflow/__tests__/storage-root.test.ts`
**Implementation:**
```typescript
// storage-root.ts — add export
export function getGlobalCasDir(storageRoot?: string): string {
const root = storageRoot ?? getDefaultWorkflowStorageRoot();
return join(root, "cas");
}
```
Export from `packages/workflow/src/index.ts`.
### Task 1.2: Update `cmd-cas.ts` to use global CAS
**Objective:** CLI `cas get/put/list/rm` no longer needs threadId for storage location — CAS is global. But keep threadId in CLI for backward compat of planner/coder prompts (they pass threadId).
**Files:**
- Modify: `packages/cli-workflow/src/cmd-cas.ts`
**Changes:**
- `resolveCasDir` → use `getGlobalCasDir(storageRoot)` instead of deriving from thread data path
- `cmdCasPut` / `cmdCasGet` / `cmdCasList` / `cmdCasRm`: threadId is still accepted (prompts pass it) but storage goes to global dir
- Remove the `resolveThreadDataPath` dependency for CAS operations — thread doesn't need to exist to read CAS
```typescript
import { createThreadCas, getGlobalCasDir } from "@uncaged/workflow";
export async function cmdCasGet(
storageRoot: string,
_threadId: string, // kept for CLI compat, not used for path
hash: string,
): Promise<Result<string, string>> {
const cas = createThreadCas(getGlobalCasDir(storageRoot));
const content = await cas.get(hash);
if (content === null) {
return err(`cas entry not found: ${hash}`);
}
return ok(content);
}
// ... same pattern for put/list/rm
```
### Task 1.3: Update `cmd-thread.ts` — thread rm no longer deletes `.cas/`
**Objective:** Since CAS is global, `thread rm` should NOT delete CAS entries. CAS cleanup is GC's job.
**Files:**
- Modify: `packages/cli-workflow/src/cmd-thread.ts`
- Check: remove any `rmdir` / `unlink` of `<threadId>.cas/` directory
### Task 1.4: Rename `createThreadCas` → `createCasStore`
**Objective:** The name `createThreadCas` is misleading now. Rename to `createCasStore`.
**Files:**
- Modify: `packages/workflow/src/cas.ts` — rename function
- Modify: `packages/workflow/src/index.ts` — update export (keep `createThreadCas` as deprecated alias for one release)
- Modify: all consumers (`cmd-cas.ts`)
### Task 1.5: Update tests
**Objective:** All CAS-related tests use global dir instead of thread-local.
**Files:**
- Modify: `packages/cli-workflow/__tests__/commands.test.ts`
- Verify: `bun test` passes
### Task 1.6: Clean up old thread-local `.cas/` references
**Objective:** Remove dead code that creates/reads thread-local `.cas/` directories.
**Files:**
- Search all `*.ts` for `.cas` path construction patterns
- Remove orphaned helpers
---
## Phase 2: RoleStep `refs` Tracking
Add `refs: string[]` to persisted role steps so GC can trace which CAS entries are alive.
### Task 2.1: Add `refs` to `RoleOutput` and engine persistence
**Objective:** Every role step can declare which CAS hashes it produced or consumed.
**Files:**
- Modify: `packages/workflow/src/types.ts`
- Modify: `packages/workflow/src/engine.ts`
**Changes to `types.ts`:**
```typescript
export type RoleOutput = {
role: string;
content: string;
meta: Record<string, unknown>;
refs: string[]; // CAS hashes produced/consumed by this step
};
```
**Changes to `engine.ts`:**
- `appendDataLine` for role steps: include `refs` field (default `[]` if not provided)
### Task 2.2: Auto-populate refs from meta hashes
**Objective:** The engine should automatically extract CAS hashes from `meta` to populate `refs`, so roles don't need to manually track them.
**Strategy:** After meta extraction, walk the meta object and collect any string that looks like a CAS hash (Crockford Base32, 13 chars). This is a heuristic but works because CAS hashes are distinctive.
Alternative (simpler): Let each `RoleDefinition` optionally declare a `extractRefs(meta: M) => string[]` function. For planner, this returns `meta.phases.map(p => p.hash)`. For coder, `[meta.completedPhase]`.
**Recommended:** The explicit `extractRefs` approach — no magic, no false positives.
**Files:**
- Modify: `packages/workflow/src/types.ts` — add optional `extractRefs` to `RoleDefinition`
- Modify: `packages/workflow/src/create-workflow.ts` — call `extractRefs` after meta extraction, set on `RoleOutput.refs`
- Modify: `packages/workflow-role-planner/src/planner.ts` — implement `extractRefs`
- Modify: `packages/workflow-role-coder/src/coder.ts` — implement `extractRefs`
```typescript
// types.ts — RoleDefinition addition
export type RoleDefinition<Meta extends Record<string, unknown>> = {
description: string;
systemPrompt: string;
extractPrompt: string;
schema: z.ZodType<Meta>;
extractRefs?: (meta: Meta) => string[]; // CAS hashes to track
};
// planner.ts
extractRefs: (meta) => meta.phases.map(p => p.hash),
// coder.ts
extractRefs: (meta) => [meta.completedPhase],
```
### Task 2.3: Update fork logic to preserve refs
**Objective:** When forking a thread, `refs` from historical steps must be carried over.
**Files:**
- Modify: `packages/workflow/src/fork-thread.ts`
- Verify: `ForkHistoricalStep` / `PrefilledDiskStep` include `refs`
### Task 2.4: Tests for refs tracking
**Files:**
- Add: `packages/workflow/__tests__/refs-tracking.test.ts`
- Verify: refs appear in `.data.jsonl` output
---
## Phase 3: CAS Garbage Collection
### Task 3.1: Implement `gc.ts` in `@uncaged/workflow`
**Objective:** Mark-and-sweep GC — scan all thread `.data.jsonl` files, collect `refs`, delete orphaned CAS entries.
**Files:**
- Create: `packages/workflow/src/gc.ts`
- Export from: `packages/workflow/src/index.ts`
```typescript
export type GcResult = {
scannedThreads: number;
activeRefs: number;
deletedEntries: number;
deletedHashes: string[];
};
export async function garbageCollectCas(storageRoot: string): Promise<GcResult> {
// 1. Find all .data.jsonl files under storageRoot
// 2. Parse each, flatMap step.refs → Set<string>
// 3. List all CAS entries via createCasStore(globalCasDir).list()
// 4. Delete entries not in active set
// 5. Return stats
}
```
### Task 3.2: Add `uncaged-workflow gc` CLI command
**Files:**
- Create: `packages/cli-workflow/src/cmd-gc.ts`
- Modify: `packages/cli-workflow/src/cli-dispatch.ts` — add `gc` subcommand
### Task 3.3: Run GC on `thread rm`
**Files:**
- Modify: `packages/cli-workflow/src/cmd-thread.ts` — after deleting thread data, optionally run GC
### Task 3.4: Tests for GC
**Files:**
- Create: `packages/cli-workflow/__tests__/gc-cli.test.ts`
---
## Phase 4: `workflowAsAgent` Factory
### Task 4.1: Create `workflowAsAgent` in `@uncaged/workflow`
**Objective:** Factory function that takes a workflow name, resolves to bundle, returns an `AgentFn`.
**Files:**
- Create: `packages/workflow/src/workflow-as-agent.ts`
- Export from: `packages/workflow/src/index.ts`
```typescript
import type { AgentFn } from "./types.js";
export type WorkflowAsAgentOptions = {
storageRoot?: string;
};
export function workflowAsAgent(
workflowName: string,
options?: WorkflowAsAgentOptions,
): AgentFn {
return async (ctx) => {
const storageRoot = options?.storageRoot ?? getDefaultWorkflowStorageRoot();
// 1. Read registry → resolve name to bundle hash + path
const registry = await readWorkflowRegistry(storageRoot);
const entry = getRegisteredWorkflow(registry, workflowName);
if (entry === null) {
return `ERROR: workflow "${workflowName}" not found in registry`;
}
// 2. Load bundle
const bundlePath = join(storageRoot, "bundles", `${entry.hash}.esm.js`);
const bundleExports = await extractBundleExports(bundlePath);
// 3. Create child thread input from ctx.start.content (parent prompt)
const input: ThreadInput = {
prompt: ctx.start.content,
steps: [],
};
// 4. Generate child threadId
const childThreadId = generateUlid();
// 5. Execute — collect all yields, return final content
const io: ExecuteThreadIo = { ... };
const result = await executeThread(bundleExports.run, workflowName, input, ...);
// 6. Return summary as agent content
return result.summary;
};
}
```
### Task 4.2: System-level depth limit
**Objective:** Prevent infinite recursion. Track depth via thread metadata, enforce a global max (default 3, configurable in `workflow.yaml`).
**Files:**
- Modify: `packages/workflow/src/types.ts` — add `depth` to `WorkflowFnOptions`
- Modify: `packages/workflow/src/workflow-as-agent.ts` — increment depth, check limit
- Modify: registry or config types for `maxDepth` setting
### Task 4.3: Tests for workflowAsAgent
**Files:**
- Create: `packages/workflow/__tests__/workflow-as-agent.test.ts`
- Test: name resolution, depth limit, child thread execution
### Task 4.4: Integration test — nested workflow
**Objective:** Create a minimal test workflow that calls another workflow via `workflowAsAgent`.
**Files:**
- Create: `packages/workflow/__tests__/workflow-as-agent-integration.test.ts`
---
## Execution Order
```
Phase 1 (Global CAS) → Phase 2 (refs) → Phase 3 (GC) → Phase 4 (workflowAsAgent)
```
Each phase is independently mergeable. Phase 3 depends on Phase 2 (needs refs to know what's alive). Phase 4 depends on Phase 1 (global CAS for cross-thread sharing).
## Breaking Changes
- CAS storage location moves from `<thread>.cas/` to `~/.uncaged/workflow/cas/`
- `RoleOutput` gains required `refs: string[]` field
- Existing threads with thread-local CAS will lose access to old CAS data (acceptable — those are short-lived workflow artifacts)
- `createThreadCas` renamed to `createCasStore` (alias kept temporarily)
+1
View File
@@ -28,6 +28,7 @@ const greeter: RoleDefinition<Roles["greeter"]> = {
systemPrompt: "You greet the user briefly.",
extractPrompt: "Extract the greeting string produced for the user.",
schema: greeterMetaSchema,
extractRefs: null,
};
const extract = createExtract({
@@ -3,8 +3,9 @@ import { mkdir, mkdtemp, readFile, rm, unlink, writeFile } from "node:fs/promise
import { tmpdir } from "node:os";
import { join } from "node:path";
import { getRegisteredWorkflow, readWorkflowRegistry } from "@uncaged/workflow";
import { getGlobalCasDir, getRegisteredWorkflow, readWorkflowRegistry } from "@uncaged/workflow";
import { cmdAdd } from "../src/cmd-add.js";
import { cmdCasGet, cmdCasList, cmdCasPut, cmdCasRm } from "../src/cmd-cas.js";
import { cmdHistory } from "../src/cmd-history.js";
import { cmdList, formatListLines } from "../src/cmd-list.js";
import { cmdRemove } from "../src/cmd-remove.js";
@@ -371,6 +372,37 @@ export const run = async function* (input) {
expect(bad.ok).toBe(false);
});
test("cas put/get/list/rm use global cas dir (thread id not required for storage)", async () => {
const put = await cmdCasPut(storageRoot, "nonexistent-thread-id", "phase doc");
expect(put.ok).toBe(true);
if (!put.ok) {
return;
}
const hash = put.value;
const blobPath = join(getGlobalCasDir(storageRoot), `${hash}.txt`);
expect(await readFile(blobPath, "utf8")).toBe("phase doc");
const got = await cmdCasGet(storageRoot, "other-thread", hash);
expect(got.ok).toBe(true);
if (!got.ok) {
return;
}
expect(got.value).toBe("phase doc");
const listed = await cmdCasList(storageRoot, "another-thread");
expect(listed.ok).toBe(true);
if (!listed.ok) {
return;
}
expect(listed.value).toContain(hash);
const removed = await cmdCasRm(storageRoot, "rm-thread", hash);
expect(removed.ok).toBe(true);
const missing = await cmdCasGet(storageRoot, "after-rm", hash);
expect(missing.ok).toBe(false);
});
test("rollback rejects missing bundle file for target hash", async () => {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
@@ -4,7 +4,9 @@ import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import { getGlobalCasDir } from "@uncaged/workflow";
import { cmdAdd } from "../src/cmd-add.js";
import { cmdCasPut } from "../src/cmd-cas.js";
import { cmdKill } from "../src/cmd-kill.js";
import { cmdPause } from "../src/cmd-pause.js";
import { cmdPs } from "../src/cmd-ps.js";
@@ -12,7 +14,7 @@ import { cmdResume } from "../src/cmd-resume.js";
import { cmdRun } from "../src/cmd-run.js";
import { cmdThreadRemove, cmdThreadShow } from "../src/cmd-thread.js";
import { cmdThreads } from "../src/cmd-threads.js";
import { pathExists } from "../src/fs-utils.js";
import { pathExists, readTextFileIfExists } from "../src/fs-utils.js";
import { addCliArgs } from "./bundle-fixture.js";
const threadFixtureDescriptor = `export const descriptor = {
@@ -175,6 +177,55 @@ describe("cli thread commands", () => {
expect(await pathExists(dataPath)).toBe(false);
});
test("thread rm does not delete global cas blobs for that thread id", async () => {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
await writeFile(bundlePath, fastBundleSource, "utf8");
const added = await cmdAdd(storageRoot, addCliArgs("solve-issue", bundlePath));
expect(added.ok).toBe(true);
if (!added.ok) {
return;
}
const ran = await cmdRun(storageRoot, "solve-issue", "hello", 5);
expect(ran.ok).toBe(true);
if (!ran.ok) {
return;
}
const threadId = ran.value.threadId;
let threads = await cmdThreads(storageRoot, []);
for (
let attempt = 0;
attempt < 50 && threads.ok && !threads.value.some((l) => l.includes(threadId));
attempt++
) {
await new Promise((r) => setTimeout(r, 20));
threads = await cmdThreads(storageRoot, []);
}
const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`);
const runningPath = join(dirname(dataPath), `${threadId}.running`);
await waitUntilRunningFileAbsent(runningPath, 120);
const put = await cmdCasPut(storageRoot, threadId, "keep-after-thread-rm");
expect(put.ok).toBe(true);
if (!put.ok) {
return;
}
const hash = put.value;
const casBlob = join(getGlobalCasDir(storageRoot), `${hash}.txt`);
const removed = await cmdThreadRemove(storageRoot, threadId);
expect(removed.ok).toBe(true);
const stillThere = await readTextFileIfExists(casBlob);
expect(stillThere).toBe("keep-after-thread-rm");
});
test("cli entrypoint dispatches threads / ps (spawn)", () => {
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const threads = spawnSync(process.execPath, [cliEntryPath, "threads"], {
+9 -33
View File
@@ -1,23 +1,11 @@
import { dirname, join } from "node:path";
import { createThreadCas, err, ok, type Result } from "@uncaged/workflow";
import { resolveThreadDataPath } from "./thread-scan.js";
function resolveCasDir(threadDataPath: string, threadId: string): string {
return join(dirname(threadDataPath), `${threadId}.cas`);
}
import { createCasStore, err, getGlobalCasDir, ok, type Result } from "@uncaged/workflow";
export async function cmdCasGet(
storageRoot: string,
threadId: string,
_threadId: string,
hash: string,
): Promise<Result<string, string>> {
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
return err(`thread not found: ${threadId}`);
}
const cas = createThreadCas(resolveCasDir(dataPath, threadId));
const cas = createCasStore(getGlobalCasDir(storageRoot));
const content = await cas.get(hash);
if (content === null) {
return err(`cas entry not found: ${hash}`);
@@ -27,41 +15,29 @@ export async function cmdCasGet(
export async function cmdCasPut(
storageRoot: string,
threadId: string,
_threadId: string,
content: string,
): Promise<Result<string, string>> {
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
return err(`thread not found: ${threadId}`);
}
const cas = createThreadCas(resolveCasDir(dataPath, threadId));
const cas = createCasStore(getGlobalCasDir(storageRoot));
const hash = await cas.put(content);
return ok(hash);
}
export async function cmdCasList(
storageRoot: string,
threadId: string,
_threadId: string,
): Promise<Result<string[], string>> {
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
return err(`thread not found: ${threadId}`);
}
const cas = createThreadCas(resolveCasDir(dataPath, threadId));
const cas = createCasStore(getGlobalCasDir(storageRoot));
const hashes = await cas.list();
return ok(hashes);
}
export async function cmdCasRm(
storageRoot: string,
threadId: string,
_threadId: string,
hash: string,
): Promise<Result<void, string>> {
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
return err(`thread not found: ${threadId}`);
}
const cas = createThreadCas(resolveCasDir(dataPath, threadId));
const cas = createCasStore(getGlobalCasDir(storageRoot));
await cas.delete(hash);
return ok(undefined);
}
+1 -3
View File
@@ -1,4 +1,4 @@
import { rm, unlink } from "node:fs/promises";
import { unlink } from "node:fs/promises";
import { dirname, join } from "node:path";
import { err, ok, type Result } from "@uncaged/workflow";
@@ -33,12 +33,10 @@ export async function cmdThreadRemove(
const dir = dirname(dataPath);
const infoPath = join(dir, `${threadId}.info.jsonl`);
const runningPath = join(dir, `${threadId}.running`);
const casPath = join(dir, `${threadId}.cas`);
await unlink(dataPath);
await unlink(infoPath).catch(() => {});
await unlink(runningPath).catch(() => {});
await rm(casPath, { recursive: true, force: true });
return ok(undefined);
}
@@ -38,4 +38,5 @@ export const coderRole: RoleDefinition<CoderMeta> = {
extractPrompt:
"Extract completedPhase: the planner phase hash finished this round (exact hash string from the plan). If multiple phases were finished in one round, use the last finished phase hash. Extract filesChanged and a summary of the work.",
schema: coderMetaSchema,
extractRefs: (meta) => [meta.completedPhase],
};
@@ -31,4 +31,5 @@ export const committerRole: RoleDefinition<CommitterMeta> = {
extractPrompt:
"Extract the commit result: committed (with branch and SHA), recoverable failure, or unrecoverable failure. Include error details and log references if applicable.",
schema: committerMetaSchema,
extractRefs: null,
};
@@ -49,4 +49,5 @@ export const plannerRole: RoleDefinition<PlannerMeta> = {
extractPrompt:
"Extract the implementation phases from the agent's output. Each phase has a hash (the CAS content-hash returned by the cas put command) and a title (one-line summary).",
schema: plannerMetaSchema,
extractRefs: (meta) => meta.phases.map((p) => p.hash),
};
@@ -47,4 +47,5 @@ export const preparerRole: RoleDefinition<PreparerMeta> = {
extractPrompt:
"Extract repoPath (absolute path), defaultBranch, conventions (summary string or null), and toolchain (packageManager, testCommand, lintCommand, buildCommand — each string or null).",
schema: preparerMetaSchema,
extractRefs: null,
};
@@ -21,4 +21,5 @@ export const reviewerRole: RoleDefinition<ReviewerMeta> = {
extractPrompt:
"Extract the review verdict: approved or rejected. If rejected, list the blocking issues.",
schema: reviewerMetaSchema,
extractRefs: null,
};
@@ -124,6 +124,7 @@ function preparerStep(): RoleStep<SolveIssueMeta> {
buildCommand: "bun run build",
},
},
refs: [],
timestamp: 0,
};
}
@@ -133,6 +134,7 @@ function plannerStep(phases: PlannerMeta["phases"] = DEFAULT_PHASES): RoleStep<S
role: "planner",
content: "plan",
meta: { phases },
refs: phases.map((p) => p.hash),
timestamp: 1,
};
}
@@ -142,6 +144,7 @@ function coderStep(completedPhase = "4KNMR2PX"): RoleStep<SolveIssueMeta> {
role: "coder",
content: "code",
meta: { completedPhase, filesChanged: ["a.ts"], summary: "fixed" },
refs: [completedPhase],
timestamp: 2,
};
}
@@ -153,6 +156,7 @@ function reviewerStep(approved: boolean): RoleStep<SolveIssueMeta> {
meta: approved
? { status: "approved" as const }
: { status: "rejected" as const, issues: ["needs fix"] },
refs: [],
timestamp: 3,
};
}
@@ -162,6 +166,7 @@ function committerStep(): RoleStep<SolveIssueMeta> {
role: "committer",
content: "commit",
meta: { status: "committed", branch: "feat/issue-1", commitSha: "abc1234" },
refs: [],
timestamp: 4,
};
}
@@ -37,6 +37,7 @@ describe("buildAgentPrompt", () => {
role: "coder",
content: "only step full body",
meta: { files: ["a.ts"] },
refs: [],
timestamp: 2,
},
],
@@ -61,12 +62,14 @@ describe("buildAgentPrompt", () => {
role: "planner",
content: "PLANNER_SECRET_FULL_TEXT",
meta: { plan: "short" },
refs: [],
timestamp: 2,
},
{
role: "coder",
content: "last step full content",
meta: { done: true },
refs: [],
timestamp: 3,
},
],
@@ -94,18 +97,21 @@ describe("buildAgentPrompt", () => {
role: "a",
content: "HIDDEN_A",
meta: { n: 1 },
refs: [],
timestamp: 2,
},
{
role: "b",
content: "HIDDEN_B_MIDDLE",
meta: { n: 2 },
refs: [],
timestamp: 3,
},
{
role: "c",
content: "VISIBLE_LAST",
meta: { n: 3 },
refs: [],
timestamp: 4,
},
],
@@ -22,6 +22,7 @@ describe("buildDescriptor", () => {
systemPrompt: "You are an analyst.",
extractPrompt: "Extract title and count from the analysis.",
schema,
extractRefs: null,
},
},
moderator: () => END,
+18 -12
View File
@@ -3,10 +3,16 @@ import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createThreadCas } from "../src/cas.js";
import { createCasStore, createThreadCas } from "../src/cas.js";
import { hashString } from "../src/hash.js";
describe("createThreadCas", () => {
describe("cas module exports", () => {
test("createThreadCas is a deprecated alias of createCasStore", () => {
expect(createThreadCas).toBe(createCasStore);
});
});
describe("createCasStore", () => {
let casDir: string;
beforeEach(async () => {
@@ -18,7 +24,7 @@ describe("createThreadCas", () => {
});
test("put returns consistent hash for same content", async () => {
const cas = createThreadCas(casDir);
const cas = createCasStore(casDir);
const h1 = await cas.put("hello world");
const h2 = await cas.put("hello world");
expect(h1).toBe(h2);
@@ -26,14 +32,14 @@ describe("createThreadCas", () => {
});
test("put returns hash matching hashString", async () => {
const cas = createThreadCas(casDir);
const cas = createCasStore(casDir);
const content = "some content to store";
const h = await cas.put(content);
expect(h).toBe(hashString(content));
});
test("get returns stored content", async () => {
const cas = createThreadCas(casDir);
const cas = createCasStore(casDir);
const content = "line1\nline2\nline3";
const h = await cas.put(content);
const retrieved = await cas.get(h);
@@ -41,13 +47,13 @@ describe("createThreadCas", () => {
});
test("get returns null for missing hash", async () => {
const cas = createThreadCas(casDir);
const cas = createCasStore(casDir);
const result = await cas.get("0000000000000");
expect(result).toBeNull();
});
test("delete removes entry", async () => {
const cas = createThreadCas(casDir);
const cas = createCasStore(casDir);
const h = await cas.put("to be deleted");
await cas.delete(h);
const result = await cas.get(h);
@@ -55,12 +61,12 @@ describe("createThreadCas", () => {
});
test("delete on missing hash does not throw", async () => {
const cas = createThreadCas(casDir);
const cas = createCasStore(casDir);
await cas.delete("0000000000000");
});
test("list returns all stored hashes", async () => {
const cas = createThreadCas(casDir);
const cas = createCasStore(casDir);
const h1 = await cas.put("aaa");
const h2 = await cas.put("bbb");
const h3 = await cas.put("ccc");
@@ -69,13 +75,13 @@ describe("createThreadCas", () => {
});
test("list returns empty array when cas dir does not exist", async () => {
const cas = createThreadCas(join(casDir, "nonexistent"));
const cas = createCasStore(join(casDir, "nonexistent"));
const hashes = await cas.list();
expect(hashes).toEqual([]);
});
test("put is idempotent — same content written twice causes no error", async () => {
const cas = createThreadCas(casDir);
const cas = createCasStore(casDir);
const h1 = await cas.put("idempotent");
const h2 = await cas.put("idempotent");
expect(h1).toBe(h2);
@@ -84,7 +90,7 @@ describe("createThreadCas", () => {
});
test("different content produces different hashes", async () => {
const cas = createThreadCas(casDir);
const cas = createCasStore(casDir);
const h1 = await cas.put("alpha");
const h2 = await cas.put("beta");
expect(h1).not.toBe(h2);
@@ -89,12 +89,14 @@ const demoWorkflow = createWorkflow<DemoMeta>(
systemPrompt: "You are a planner.",
extractPrompt: "Extract plan text and affected files list.",
schema: plannerMetaSchema,
extractRefs: null,
},
coder: {
description: "Demo coder",
systemPrompt: "You are a coder.",
extractPrompt: "Extract the code diff summary.",
schema: coderMetaSchema,
extractRefs: null,
},
},
moderator: (ctx) => {
@@ -182,10 +184,12 @@ describe("executeThread", () => {
expect(role1.role).toBe("planner");
expect(role1.content).toBe("plan-body");
expect(role1.meta).toEqual({ plan: "do-it", files: ["a.ts"] });
expect(role1.refs).toEqual([]);
expect(typeof role1.timestamp).toBe("number");
const role2 = JSON.parse(lines[2] ?? "{}") as Record<string, unknown>;
expect(role2.role).toBe("coder");
expect(role2.refs).toEqual([]);
const infoText = await readFile(infoPath, "utf8");
const infoLines = infoText
@@ -228,6 +232,7 @@ describe("executeThread", () => {
role: "planner",
content: "plan-body",
meta: { plan: "do-it", files: ["a.ts"] },
refs: ["CAS111AAAAAAA"],
},
],
},
@@ -241,6 +246,7 @@ describe("executeThread", () => {
role: "planner",
content: "plan-body",
meta: { plan: "do-it", files: ["a.ts"] },
refs: ["CAS111AAAAAAA"],
timestamp: histTs,
},
],
@@ -264,6 +270,7 @@ describe("executeThread", () => {
const role0 = JSON.parse(lines[1] ?? "{}") as Record<string, unknown>;
expect(role0.role).toBe("planner");
expect(role0.timestamp).toBe(histTs);
expect(role0.refs).toEqual(["CAS111AAAAAAA"]);
const role1 = JSON.parse(lines[2] ?? "{}") as Record<string, unknown>;
expect(role1.role).toBe("coder");
@@ -0,0 +1,191 @@
import { afterEach, describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, readFile, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import * as z from "zod/v4";
import { createWorkflow } from "../src/create-workflow.js";
import { executeThread } from "../src/engine.js";
import { createExtract } from "../src/extract-fn.js";
import { buildForkPlan, parseThreadDataJsonl } from "../src/fork-thread.js";
import { createLogger } from "../src/logger.js";
import { END } from "../src/types.js";
const phaseSchema = z.object({
hash: z.string(),
title: z.string(),
});
const plannerMetaSchema = z.object({
phases: z.array(phaseSchema),
});
type RefsDemoMeta = {
planner: z.infer<typeof plannerMetaSchema>;
};
function installMockChatCompletions(sequence: ReadonlyArray<Record<string, unknown>>): () => void {
const origFetch = globalThis.fetch;
let i = 0;
const mockFetch = async (
input: Parameters<typeof fetch>[0],
init?: RequestInit,
): Promise<Response> => {
const args = sequence[i] ?? sequence[sequence.length - 1];
if (args === undefined) {
throw new Error("installMockChatCompletions: empty sequence");
}
i += 1;
void input;
const body = init?.body ? (JSON.parse(String(init.body)) as Record<string, unknown>) : {};
const tools = body.tools;
const firstTool =
Array.isArray(tools) && tools.length > 0 && tools[0] !== null && typeof tools[0] === "object"
? (tools[0] as Record<string, unknown>)
: null;
const fn =
firstTool !== null ? (firstTool.function as Record<string, unknown> | undefined) : undefined;
const toolName = typeof fn?.name === "string" ? fn.name : "extract";
return new Response(
JSON.stringify({
choices: [
{
message: {
tool_calls: [
{
type: "function",
function: {
name: toolName,
arguments: JSON.stringify(args),
},
},
],
},
},
],
}),
{ status: 200, headers: { "Content-Type": "application/json" } },
);
};
globalThis.fetch = Object.assign(mockFetch, {
preconnect: origFetch.preconnect.bind(origFetch),
}) as typeof fetch;
return () => {
globalThis.fetch = origFetch;
};
}
const refsDemoExtract = createExtract({
baseUrl: "http://127.0.0.1:9",
apiKey: "test",
model: "test",
});
const refsDemoWorkflow = createWorkflow<RefsDemoMeta>(
{
roles: {
planner: {
description: "Planner with phase hashes",
systemPrompt: "Plan.",
extractPrompt: "Extract phases with CAS hashes.",
schema: plannerMetaSchema,
extractRefs: (meta) => meta.phases.map((p) => p.hash),
},
},
moderator: (ctx) => (ctx.steps.length === 0 ? "planner" : END),
},
{
agent: async () => "plan-output",
},
refsDemoExtract,
);
describe("RoleStep refs tracking", () => {
let restoreFetch: (() => void) | null = null;
afterEach(() => {
restoreFetch?.();
restoreFetch = null;
});
test("parseThreadDataJsonl reads refs and defaults missing refs to []", () => {
const text = `{"name":"demo","hash":"C9NMV6V2TQT81","threadId":"01AAA1111111111111111111","parameters":{"prompt":"hi","options":{"maxRounds":5}},"timestamp":100}
{"role":"planner","content":"p","meta":{},"refs":["H111AAAAAAAAA","H222AAAAAAAAA"],"timestamp":101}
{"role":"coder","content":"c","meta":{},"timestamp":102}
`;
const r = parseThreadDataJsonl(text);
expect(r.ok).toBe(true);
if (!r.ok) {
return;
}
expect(r.value.roleSteps[0]?.refs).toEqual(["H111AAAAAAAAA", "H222AAAAAAAAA"]);
expect(r.value.roleSteps[1]?.refs).toEqual([]);
});
test("executeThread persists refs from extractRefs on role yields", async () => {
restoreFetch = installMockChatCompletions([
{
phases: [
{ hash: "C9NMV6V2TQT81", title: "phase-a" },
{ hash: "C9NMV6V2TQT82", title: "phase-b" },
],
},
]);
const root = await mkdtemp(join(tmpdir(), "wf-refs-"));
try {
const threadId = "01KQXKW18CT8G75T53R8F4G7YG";
const hash = "C9NMV6V2TQT81";
const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`);
const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`);
await mkdir(join(root, "logs", hash), { recursive: true });
const logger = createLogger({ sink: { kind: "file", path: infoPath } });
const ac = new AbortController();
const result = await executeThread(
refsDemoWorkflow,
"refs-demo",
{ prompt: "task", steps: [] },
{
maxRounds: 5,
signal: ac.signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: null,
prefilledDiskSteps: null,
},
{ threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath },
logger,
);
expect(result.returnCode).toBe(0);
const dataText = await readFile(dataPath, "utf8");
const lines = dataText
.trim()
.split("\n")
.filter((l) => l !== "");
expect(lines.length).toBe(2);
const role1 = JSON.parse(lines[1] ?? "{}") as Record<string, unknown>;
expect(role1.role).toBe("planner");
expect(role1.refs).toEqual(["C9NMV6V2TQT81", "C9NMV6V2TQT82"]);
} finally {
await rm(root, { recursive: true, force: true });
}
});
test("buildForkPlan carries refs on historical steps", () => {
const text = `{"name":"demo","hash":"C9NMV6V2TQT81","threadId":"01AAA1111111111111111111","parameters":{"prompt":"hi","options":{"maxRounds":5}},"timestamp":100}
{"role":"planner","content":"p","meta":{},"refs":["KEEPREFAAAAAA"],"timestamp":101}
{"role":"coder","content":"c","meta":{},"refs":["CODERHASHAAAA"],"timestamp":102}
`;
const plan = buildForkPlan(text, null);
expect(plan.ok).toBe(true);
if (!plan.ok) {
return;
}
expect(plan.value.historicalSteps.length).toBe(1);
expect(plan.value.historicalSteps[0]?.refs).toEqual(["KEEPREFAAAAAA"]);
});
});
@@ -0,0 +1,14 @@
import { describe, expect, test } from "bun:test";
import { join } from "node:path";
import { getDefaultWorkflowStorageRoot, getGlobalCasDir } from "../src/storage-root.js";
describe("getGlobalCasDir", () => {
test("joins cas segment under explicit storage root", () => {
expect(getGlobalCasDir("/tmp/wf-root")).toBe(join("/tmp/wf-root", "cas"));
});
test("defaults to default workflow root when storage root is undefined", () => {
expect(getGlobalCasDir(undefined)).toBe(join(getDefaultWorkflowStorageRoot(), "cas"));
});
});
@@ -19,13 +19,16 @@ describe("RFC-001 thread JSONL shapes", () => {
role: "planner",
content: "Plan: modify auth middleware...",
meta: { plan: "...", files: ["src/auth.ts"] },
refs: [] as string[],
timestamp: 1714963201000,
};
expect(Object.keys(startRecord).sort()).toEqual(
["hash", "name", "parameters", "threadId", "timestamp"].sort(),
);
expect(Object.keys(roleRecord).sort()).toEqual(["content", "meta", "role", "timestamp"].sort());
expect(Object.keys(roleRecord).sort()).toEqual(
["content", "meta", "refs", "role", "timestamp"].sort(),
);
});
test("documents the `.info.jsonl` debug record keys", () => {
+4 -1
View File
@@ -10,7 +10,7 @@ export type CasStore = {
list(): Promise<string[]>;
};
export function createThreadCas(casDir: string): CasStore {
export function createCasStore(casDir: string): CasStore {
async function ensureDir(): Promise<void> {
await mkdir(casDir, { recursive: true });
}
@@ -68,3 +68,6 @@ export function createThreadCas(casDir: string): CasStore {
},
};
}
/** @deprecated Use {@link createCasStore} — CAS is global, not per-thread. */
export const createThreadCas = createCasStore;
+20 -1
View File
@@ -5,6 +5,7 @@ import {
END,
type ExtractContext,
type ModeratorContext,
type RoleDefinition,
type RoleMeta,
type RoleOutput,
type RoleStep,
@@ -22,6 +23,17 @@ function isRoleNext<M extends RoleMeta>(
return next !== END;
}
function resolveExtractedRefs(
roleDef: RoleDefinition<Record<string, unknown>>,
meta: unknown,
): string[] {
const extractRefsFn = roleDef.extractRefs;
if (extractRefsFn === null || typeof extractRefsFn !== "function") {
return [];
}
return extractRefsFn(meta as Record<string, unknown>);
}
/**
* Binds pure role definitions + moderator to runtime agents and structured extraction.
* Assign with `export const run = createWorkflow(def, binding, extract)`.
@@ -48,6 +60,7 @@ export function createWorkflow<M extends RoleMeta>(
role: out.role,
content: out.content,
meta: out.meta,
refs: out.refs,
timestamp: baseTs + i,
})) as RoleStep<M>[];
@@ -96,15 +109,21 @@ export function createWorkflow<M extends RoleMeta>(
extractCtx as unknown as ExtractContext,
);
const refs = resolveExtractedRefs(
roleDef as unknown as RoleDefinition<Record<string, unknown>>,
meta,
);
const ts = Date.now();
const step = {
role: next,
content: raw,
meta,
refs,
timestamp: ts,
} as RoleStep<M>;
yield { role: step.role, content: step.content, meta: step.meta };
yield { role: step.role, content: step.content, meta: step.meta, refs: step.refs };
steps = [...steps, step];
}
+4
View File
@@ -2,6 +2,7 @@ import { appendFile, mkdir } from "node:fs/promises";
import { dirname } from "node:path";
import type { LogFn } from "./logger.js";
import { normalizeRefsField } from "./refs-field.js";
import type { ThreadInput, WorkflowFn, WorkflowFnOptions, WorkflowResult } from "./types.js";
export type ExecuteThreadIo = {
@@ -16,6 +17,7 @@ export type PrefilledDiskStep = {
role: string;
content: string;
meta: Record<string, unknown>;
refs: string[];
timestamp: number;
};
@@ -79,6 +81,7 @@ async function driveWorkflowGenerator(params: {
role: step.role,
content: step.content,
meta: step.meta,
refs: normalizeRefsField(step.refs),
timestamp: ts,
});
@@ -151,6 +154,7 @@ export async function executeThread(
role: row.role,
content: row.content,
meta: row.meta,
refs: normalizeRefsField(row.refs),
timestamp: row.timestamp,
});
}
+2
View File
@@ -1,3 +1,4 @@
import { normalizeRefsField } from "./refs-field.js";
import { err, ok, type Result } from "./result.js";
import type { RoleOutput } from "./types.js";
@@ -36,6 +37,7 @@ function parseRoleLine(
role,
content,
meta: meta as Record<string, unknown>,
refs: normalizeRefsField(obj.refs),
timestamp,
});
}
+2 -2
View File
@@ -7,7 +7,7 @@ export {
} from "./base32.js";
export { buildDescriptor } from "./build-descriptor.js";
export { validateWorkflowBundle, type WorkflowBundleValidationInput } from "./bundle-validator.js";
export { type CasStore, createThreadCas } from "./cas.js";
export { type CasStore, createCasStore, createThreadCas } from "./cas.js";
export { createWorkflow } from "./create-workflow.js";
export {
type ExecuteThreadIo,
@@ -55,7 +55,7 @@ export {
writeWorkflowRegistry,
} from "./registry.js";
export { err, ok, type Result } from "./result.js";
export { getDefaultWorkflowStorageRoot } from "./storage-root.js";
export { getDefaultWorkflowStorageRoot, getGlobalCasDir } from "./storage-root.js";
export { createThreadPauseGate, type ThreadPauseGate } from "./thread-pause-gate.js";
export {
type AgentBinding,
+13
View File
@@ -0,0 +1,13 @@
/** Normalize `refs` from persisted JSONL or IPC payloads (missing or invalid → []). */
export function normalizeRefsField(value: unknown): string[] {
if (!Array.isArray(value)) {
return [];
}
const out: string[] = [];
for (const x of value) {
if (typeof x === "string") {
out.push(x);
}
}
return out;
}
+6
View File
@@ -5,3 +5,9 @@ import { join } from "node:path";
export function getDefaultWorkflowStorageRoot(): string {
return join(homedir(), ".uncaged", "workflow");
}
/** Global content-addressed store directory under the workflow storage root (`<root>/cas`). */
export function getGlobalCasDir(storageRoot: string | undefined): string {
const root = storageRoot ?? getDefaultWorkflowStorageRoot();
return join(root, "cas");
}
+11 -1
View File
@@ -19,6 +19,8 @@ export type RoleOutput = {
role: string;
content: string;
meta: Record<string, unknown>;
/** CAS hashes produced or consumed by this step (for GC traceability). */
refs: string[];
};
/** What the workflow AsyncGenerator returns when done. */
@@ -55,7 +57,13 @@ export type StartStep = {
/** A completed role step in the thread. */
export type RoleStep<M extends RoleMeta> = {
[K in keyof M & string]: { role: K; meta: M[K]; content: string; timestamp: number };
[K in keyof M & string]: {
role: K;
meta: M[K];
content: string;
refs: string[];
timestamp: number;
};
}[keyof M & string];
/** Phase 1: Moderator decides next role. */
@@ -96,6 +104,8 @@ export type RoleDefinition<Meta extends Record<string, unknown>> = {
systemPrompt: string;
extractPrompt: string;
schema: z.ZodType<Meta>;
/** When non-null, produces CAS hashes to persist on this role's steps (see `RoleOutput.refs`). */
extractRefs: ((meta: Meta) => string[]) | null;
};
/**
+8 -1
View File
@@ -5,6 +5,7 @@ import { pathToFileURL } from "node:url";
import type { PrefilledDiskStep } from "./engine.js";
import { type ExecuteThreadIo, executeThread } from "./engine.js";
import { createLogger } from "./logger.js";
import { normalizeRefsField } from "./refs-field.js";
import { err, ok, type Result } from "./result.js";
import { createThreadPauseGate, type ThreadPauseGate } from "./thread-pause-gate.js";
import type { RoleOutput, WorkflowFn } from "./types.js";
@@ -55,7 +56,12 @@ function parseRoleOutputRecord(obj: Record<string, unknown>): RoleOutput | null
if (meta === null || typeof meta !== "object") {
return null;
}
return { role, content, meta: meta as Record<string, unknown> };
return {
role,
content,
meta: meta as Record<string, unknown>,
refs: normalizeRefsField(obj.refs),
};
}
function parseRunStepsPayload(rec: Record<string, unknown>): {
@@ -382,6 +388,7 @@ async function main(): Promise<void> {
role: step.role,
content: step.content,
meta: step.meta,
refs: normalizeRefsField(step.refs),
timestamp: typeof ts === "number" && ts > 0 ? ts : baseTs + i,
};
});