Compare commits

..

13 Commits

Author SHA1 Message Date
xiaoju 6196e0974a feat: thread root node + workflowAsAgent returns root hash
- Engine generates step Merkle nodes (type: step) after each role step
- Engine generates thread root Merkle node (type: thread) on completion
- WorkflowResult gains rootHash field
- WorkflowFn returns WorkflowCompletion (no rootHash), engine wraps with rootHash
- workflowAsAgent returns rootHash instead of summary
- Full DAG traversal: root → steps → content
- 151 tests passing

Fixes #42
2026-05-07 13:17:44 +00:00
xiaoju 410e9e6d9b Merge pull request 'feat: Merkle node format + content → CAS' (#45) from feat/41-merkle-content-cas into main 2026-05-07 13:14:16 +00:00
xiaoju 84de74721d feat: Merkle node format + content → CAS
- MerkleNode type: { type, payload, children } serialized as YAML
- RoleOutput.content → contentHash (CAS hash of Merkle content node)
- Engine stores content in global CAS before writing to .data.jsonl
- create-workflow puts content as Merkle node, merges contentHash into refs
- fork/parse adapted for contentHash format
- buildAgentPrompt now async, reads content from CAS
- Bundle validator allows @uncaged/workflow import
- 150 tests passing

BREAKING: .data.jsonl no longer contains inline content

Fixes #41
2026-05-07 13:14:01 +00:00
xiaoju 4403532f35 Merge pull request 'feat: workflowAsAgent factory' (#39) from feat/33-workflow-as-agent into main 2026-05-07 10:52:40 +00:00
xiaoju e95e76c145 feat: workflowAsAgent factory
- workflowAsAgent(name) resolves via registry → bundle → child thread
- System-level depth limit (max 3, constant)
- Returns summary string, errors as string (no throw)
- Integration test with nested workflow execution
- 146 tests passing

Fixes #33
2026-05-07 10:52:26 +00:00
xiaoju af69e773a0 Merge pull request 'feat: CAS garbage collection' (#38) from feat/32-cas-gc into main 2026-05-07 10:48:07 +00:00
xiaoju 6488b7bbb4 feat: CAS garbage collection
- garbageCollectCas() mark-and-sweep: scan .data.jsonl refs, delete orphans
- 'uncaged-workflow gc' CLI command
- thread rm triggers GC automatically
- 141 tests passing

Fixes #32
2026-05-07 10:47:52 +00:00
xiaoju 15d39c96a7 Merge pull request 'feat: add refs tracking to RoleStep' (#35) from feat/31-refs-tracking into main 2026-05-07 10:44:44 +00:00
xiaoju 30e4e99908 feat: add refs tracking to RoleStep
- RoleOutput gains refs: string[] for CAS reference tracking
- RoleDefinition gains extractRefs: ((meta) => string[]) | null
- planner: phases.map(p => p.hash), coder: [completedPhase]
- Engine persists refs, fork preserves refs
- Backward compat: missing refs normalized to []
- 137 tests passing

Fixes #31
2026-05-07 10:44:25 +00:00
xiaoju a3c70a5041 Merge pull request 'feat: migrate CAS to global storage' (#34) from feat/30-global-cas into main 2026-05-07 10:40:41 +00:00
xiaoju 12d58a8206 feat: migrate CAS to global storage
- Add getGlobalCasDir() to storage-root.ts
- cmd-cas.ts uses global CAS dir, threadId kept for CLI compat
- thread rm no longer deletes .cas/ directories
- Rename createThreadCas → createCasStore (deprecated alias kept)
- 134 tests passing

BREAKING: CAS moves from <thread>.cas/ to <storageRoot>/cas/

Fixes #30
2026-05-07 10:40:14 +00:00
xiaoju c096f4d94e docs: add workflow-as-agent implementation plan
Covers 4 phases:
1. Global CAS migration
2. RoleStep refs tracking
3. CAS garbage collection
4. workflowAsAgent(name) factory

Refs #25
2026-05-07 10:35:26 +00:00
xiaoju 500401d93c feat(workflow): add preparer role to solve-issue workflow (#29, closes #28) 2026-05-07 10:18:05 +00:00
53 changed files with 2359 additions and 211 deletions
+315
View File
@@ -0,0 +1,315 @@
# Workflow-as-Agent Implementation Plan
> **For Hermes:** Use subagent-driven-development skill to implement this plan task-by-task.
**Goal:** Enable workflows to invoke other workflows as agents, backed by global CAS and refs tracking.
**Architecture:** Migrate CAS from thread-local to global (`~/.uncaged/workflow/cas/`), add `refs` to RoleStep for GC traceability, then build `workflowAsAgent(name)` factory that resolves workflow name → bundle via registry and spawns a child thread.
**Tech Stack:** TypeScript, Bun, Zod v4, monorepo with `packages/`
**Issue:** https://git.shazhou.work/uncaged/workflow/issues/25
---
## Phase 1: Global CAS Migration
Move CAS storage from `<threadDir>/<threadId>.cas/` to `~/.uncaged/workflow/cas/` (global, content-addressed, immutable). This is a **breaking change** — thread-local `.cas/` directories are abandoned.
### Task 1.1: Add `globalCasDir` helper to `storage-root.ts`
**Objective:** Provide a single function that returns the global CAS directory path.
**Files:**
- Modify: `packages/workflow/src/storage-root.ts`
- Test: `packages/workflow/__tests__/storage-root.test.ts`
**Implementation:**
```typescript
// storage-root.ts — add export
export function getGlobalCasDir(storageRoot?: string): string {
const root = storageRoot ?? getDefaultWorkflowStorageRoot();
return join(root, "cas");
}
```
Export from `packages/workflow/src/index.ts`.
### Task 1.2: Update `cmd-cas.ts` to use global CAS
**Objective:** CLI `cas get/put/list/rm` no longer needs threadId for storage location — CAS is global. But keep threadId in CLI for backward compat of planner/coder prompts (they pass threadId).
**Files:**
- Modify: `packages/cli-workflow/src/cmd-cas.ts`
**Changes:**
- `resolveCasDir` → use `getGlobalCasDir(storageRoot)` instead of deriving from thread data path
- `cmdCasPut` / `cmdCasGet` / `cmdCasList` / `cmdCasRm`: threadId is still accepted (prompts pass it) but storage goes to global dir
- Remove the `resolveThreadDataPath` dependency for CAS operations — thread doesn't need to exist to read CAS
```typescript
import { createThreadCas, getGlobalCasDir } from "@uncaged/workflow";
export async function cmdCasGet(
storageRoot: string,
_threadId: string, // kept for CLI compat, not used for path
hash: string,
): Promise<Result<string, string>> {
const cas = createThreadCas(getGlobalCasDir(storageRoot));
const content = await cas.get(hash);
if (content === null) {
return err(`cas entry not found: ${hash}`);
}
return ok(content);
}
// ... same pattern for put/list/rm
```
### Task 1.3: Update `cmd-thread.ts` — thread rm no longer deletes `.cas/`
**Objective:** Since CAS is global, `thread rm` should NOT delete CAS entries. CAS cleanup is GC's job.
**Files:**
- Modify: `packages/cli-workflow/src/cmd-thread.ts`
- Check: remove any `rmdir` / `unlink` of `<threadId>.cas/` directory
### Task 1.4: Rename `createThreadCas` → `createCasStore`
**Objective:** The name `createThreadCas` is misleading now. Rename to `createCasStore`.
**Files:**
- Modify: `packages/workflow/src/cas.ts` — rename function
- Modify: `packages/workflow/src/index.ts` — update export (keep `createThreadCas` as deprecated alias for one release)
- Modify: all consumers (`cmd-cas.ts`)
### Task 1.5: Update tests
**Objective:** All CAS-related tests use global dir instead of thread-local.
**Files:**
- Modify: `packages/cli-workflow/__tests__/commands.test.ts`
- Verify: `bun test` passes
### Task 1.6: Clean up old thread-local `.cas/` references
**Objective:** Remove dead code that creates/reads thread-local `.cas/` directories.
**Files:**
- Search all `*.ts` for `.cas` path construction patterns
- Remove orphaned helpers
---
## Phase 2: RoleStep `refs` Tracking
Add `refs: string[]` to persisted role steps so GC can trace which CAS entries are alive.
### Task 2.1: Add `refs` to `RoleOutput` and engine persistence
**Objective:** Every role step can declare which CAS hashes it produced or consumed.
**Files:**
- Modify: `packages/workflow/src/types.ts`
- Modify: `packages/workflow/src/engine.ts`
**Changes to `types.ts`:**
```typescript
export type RoleOutput = {
role: string;
content: string;
meta: Record<string, unknown>;
refs: string[]; // CAS hashes produced/consumed by this step
};
```
**Changes to `engine.ts`:**
- `appendDataLine` for role steps: include `refs` field (default `[]` if not provided)
### Task 2.2: Auto-populate refs from meta hashes
**Objective:** The engine should automatically extract CAS hashes from `meta` to populate `refs`, so roles don't need to manually track them.
**Strategy:** After meta extraction, walk the meta object and collect any string that looks like a CAS hash (Crockford Base32, 13 chars). This is a heuristic but works because CAS hashes are distinctive.
Alternative (simpler): Let each `RoleDefinition` optionally declare a `extractRefs(meta: M) => string[]` function. For planner, this returns `meta.phases.map(p => p.hash)`. For coder, `[meta.completedPhase]`.
**Recommended:** The explicit `extractRefs` approach — no magic, no false positives.
**Files:**
- Modify: `packages/workflow/src/types.ts` — add optional `extractRefs` to `RoleDefinition`
- Modify: `packages/workflow/src/create-workflow.ts` — call `extractRefs` after meta extraction, set on `RoleOutput.refs`
- Modify: `packages/workflow-role-planner/src/planner.ts` — implement `extractRefs`
- Modify: `packages/workflow-role-coder/src/coder.ts` — implement `extractRefs`
```typescript
// types.ts — RoleDefinition addition
export type RoleDefinition<Meta extends Record<string, unknown>> = {
description: string;
systemPrompt: string;
extractPrompt: string;
schema: z.ZodType<Meta>;
extractRefs?: (meta: Meta) => string[]; // CAS hashes to track
};
// planner.ts
extractRefs: (meta) => meta.phases.map(p => p.hash),
// coder.ts
extractRefs: (meta) => [meta.completedPhase],
```
### Task 2.3: Update fork logic to preserve refs
**Objective:** When forking a thread, `refs` from historical steps must be carried over.
**Files:**
- Modify: `packages/workflow/src/fork-thread.ts`
- Verify: `ForkHistoricalStep` / `PrefilledDiskStep` include `refs`
### Task 2.4: Tests for refs tracking
**Files:**
- Add: `packages/workflow/__tests__/refs-tracking.test.ts`
- Verify: refs appear in `.data.jsonl` output
---
## Phase 3: CAS Garbage Collection
### Task 3.1: Implement `gc.ts` in `@uncaged/workflow`
**Objective:** Mark-and-sweep GC — scan all thread `.data.jsonl` files, collect `refs`, delete orphaned CAS entries.
**Files:**
- Create: `packages/workflow/src/gc.ts`
- Export from: `packages/workflow/src/index.ts`
```typescript
export type GcResult = {
scannedThreads: number;
activeRefs: number;
deletedEntries: number;
deletedHashes: string[];
};
export async function garbageCollectCas(storageRoot: string): Promise<GcResult> {
// 1. Find all .data.jsonl files under storageRoot
// 2. Parse each, flatMap step.refs → Set<string>
// 3. List all CAS entries via createCasStore(globalCasDir).list()
// 4. Delete entries not in active set
// 5. Return stats
}
```
### Task 3.2: Add `uncaged-workflow gc` CLI command
**Files:**
- Create: `packages/cli-workflow/src/cmd-gc.ts`
- Modify: `packages/cli-workflow/src/cli-dispatch.ts` — add `gc` subcommand
### Task 3.3: Run GC on `thread rm`
**Files:**
- Modify: `packages/cli-workflow/src/cmd-thread.ts` — after deleting thread data, optionally run GC
### Task 3.4: Tests for GC
**Files:**
- Create: `packages/cli-workflow/__tests__/gc-cli.test.ts`
---
## Phase 4: `workflowAsAgent` Factory
### Task 4.1: Create `workflowAsAgent` in `@uncaged/workflow`
**Objective:** Factory function that takes a workflow name, resolves to bundle, returns an `AgentFn`.
**Files:**
- Create: `packages/workflow/src/workflow-as-agent.ts`
- Export from: `packages/workflow/src/index.ts`
```typescript
import type { AgentFn } from "./types.js";
export type WorkflowAsAgentOptions = {
storageRoot?: string;
};
export function workflowAsAgent(
workflowName: string,
options?: WorkflowAsAgentOptions,
): AgentFn {
return async (ctx) => {
const storageRoot = options?.storageRoot ?? getDefaultWorkflowStorageRoot();
// 1. Read registry → resolve name to bundle hash + path
const registry = await readWorkflowRegistry(storageRoot);
const entry = getRegisteredWorkflow(registry, workflowName);
if (entry === null) {
return `ERROR: workflow "${workflowName}" not found in registry`;
}
// 2. Load bundle
const bundlePath = join(storageRoot, "bundles", `${entry.hash}.esm.js`);
const bundleExports = await extractBundleExports(bundlePath);
// 3. Create child thread input from ctx.start.content (parent prompt)
const input: ThreadInput = {
prompt: ctx.start.content,
steps: [],
};
// 4. Generate child threadId
const childThreadId = generateUlid();
// 5. Execute — collect all yields, return final content
const io: ExecuteThreadIo = { ... };
const result = await executeThread(bundleExports.run, workflowName, input, ...);
// 6. Return summary as agent content
return result.summary;
};
}
```
### Task 4.2: System-level depth limit
**Objective:** Prevent infinite recursion. Track depth via thread metadata, enforce a global max (default 3, configurable in `workflow.yaml`).
**Files:**
- Modify: `packages/workflow/src/types.ts` — add `depth` to `WorkflowFnOptions`
- Modify: `packages/workflow/src/workflow-as-agent.ts` — increment depth, check limit
- Modify: registry or config types for `maxDepth` setting
### Task 4.3: Tests for workflowAsAgent
**Files:**
- Create: `packages/workflow/__tests__/workflow-as-agent.test.ts`
- Test: name resolution, depth limit, child thread execution
### Task 4.4: Integration test — nested workflow
**Objective:** Create a minimal test workflow that calls another workflow via `workflowAsAgent`.
**Files:**
- Create: `packages/workflow/__tests__/workflow-as-agent-integration.test.ts`
---
## Execution Order
```
Phase 1 (Global CAS) → Phase 2 (refs) → Phase 3 (GC) → Phase 4 (workflowAsAgent)
```
Each phase is independently mergeable. Phase 3 depends on Phase 2 (needs refs to know what's alive). Phase 4 depends on Phase 1 (global CAS for cross-thread sharing).
## Breaking Changes
- CAS storage location moves from `<thread>.cas/` to `~/.uncaged/workflow/cas/`
- `RoleOutput` gains required `refs: string[]` field
- Existing threads with thread-local CAS will lose access to old CAS data (acceptable — those are short-lived workflow artifacts)
- `createThreadCas` renamed to `createCasStore` (alias kept temporarily)
+1
View File
@@ -28,6 +28,7 @@ const greeter: RoleDefinition<Roles["greeter"]> = {
systemPrompt: "You greet the user briefly.",
extractPrompt: "Extract the greeting string produced for the user.",
schema: greeterMetaSchema,
extractRefs: null,
};
const extract = createExtract({
@@ -3,8 +3,9 @@ import { mkdir, mkdtemp, readFile, rm, unlink, writeFile } from "node:fs/promise
import { tmpdir } from "node:os";
import { join } from "node:path";
import { getRegisteredWorkflow, readWorkflowRegistry } from "@uncaged/workflow";
import { getGlobalCasDir, getRegisteredWorkflow, readWorkflowRegistry } from "@uncaged/workflow";
import { cmdAdd } from "../src/cmd-add.js";
import { cmdCasGet, cmdCasList, cmdCasPut, cmdCasRm } from "../src/cmd-cas.js";
import { cmdHistory } from "../src/cmd-history.js";
import { cmdList, formatListLines } from "../src/cmd-list.js";
import { cmdRemove } from "../src/cmd-remove.js";
@@ -15,6 +16,9 @@ import { addCliArgs } from "./bundle-fixture.js";
const fixtureDescriptor = `export const descriptor = { description: "fixture", roles: {} };
`;
const wfPutImport = `import { putContentMerkleNode } from "@uncaged/workflow";
`;
describe("cli workflow commands", () => {
let prevEnv: string | undefined;
let storageRoot: string;
@@ -40,11 +44,13 @@ describe("cli workflow commands", () => {
const bundlePath = join(bundleDir, "demo.esm.js");
await writeFile(
bundlePath,
`${fixtureDescriptor}import fs from "node:fs";
`${fixtureDescriptor}${wfPutImport}import fs from "node:fs";
export const run = async function* (input) {
export const run = async function* (input, options) {
fs.existsSync(".");
yield { role: "noop", content: input.prompt, meta: { done: true } };
const cas = options.cas;
const h = await putContentMerkleNode(cas, input.prompt);
yield { role: "noop", contentHash: h, meta: { done: true }, refs: [h] };
return { returnCode: 0, summary: "done" };
}
`,
@@ -111,8 +117,8 @@ export const run = async function* (input) { return { returnCode: 0, summary: in
const bundlePath = join(storageRoot, "solo.esm.js");
await writeFile(
bundlePath,
`export const run = async function* (input) {
yield { role: "x", content: input.prompt, meta: {} };
`export const run = async function* () {
yield { role: "x", contentHash: "STUBHASH00000000000000001", meta: {}, refs: [] };
return { returnCode: 0, summary: "ok" };
}
`,
@@ -140,8 +146,11 @@ export const run = async function* (input) { return { returnCode: 0, summary: in
},
},
};
export const run = async function* (input) {
yield { role: "greeter", content: input.prompt, meta: { greeting: "hi" } };
${wfPutImport}
export const run = async function* (input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, input.prompt);
yield { role: "greeter", contentHash: h, meta: { greeting: "hi" }, refs: [h] };
return { returnCode: 0, summary: "ok" };
};
`,
@@ -179,8 +188,10 @@ export const run = async function* (input) {
const bundlePath = join(bundleDir, "demo.esm.js");
await writeFile(
bundlePath,
`${fixtureDescriptor}export const run = async function* (input) {
yield { role: "a", content: "x", meta: {} };
`${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, "x");
yield { role: "a", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "x" };
}
`,
@@ -208,8 +219,10 @@ export const run = async function* (input) {
const dtsPath = join(bundleDir, "types.d.ts");
await writeFile(
bundlePath,
`${fixtureDescriptor}export const run = async function* (input) {
yield { role: "a", content: "x", meta: {} };
`${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, "x");
yield { role: "a", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "x" };
}
`,
@@ -239,8 +252,10 @@ export const run = async function* (input) {
const bundlePath = join(bundleDir, "demo.esm.js");
await writeFile(
bundlePath,
`${fixtureDescriptor}export const run = async function* (input) {
yield { role: "a", content: "x", meta: {} };
`${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, "x");
yield { role: "a", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "x" };
}
`,
@@ -260,13 +275,17 @@ export const run = async function* (input) {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
const v1 = `${fixtureDescriptor}export const run = async function* (input) {
yield { role: "a", content: "v1", meta: {} };
const v1 = `${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, "v1");
yield { role: "a", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "v1" };
}
`;
const v2 = `${fixtureDescriptor}export const run = async function* (input) {
yield { role: "a", content: "v2", meta: {} };
const v2 = `${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, "v2");
yield { role: "a", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "v2" };
}
`;
@@ -298,13 +317,17 @@ export const run = async function* (input) {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
const v1 = `${fixtureDescriptor}export const run = async function* (input) {
yield { role: "a", content: "v1", meta: {} };
const v1 = `${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, "v1");
yield { role: "a", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "v1" };
}
`;
const v2 = `${fixtureDescriptor}export const run = async function* (input) {
yield { role: "a", content: "v2", meta: {} };
const v2 = `${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, "v2");
yield { role: "a", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "v2" };
}
`;
@@ -346,8 +369,10 @@ export const run = async function* (input) {
const bundlePath = join(bundleDir, "demo.esm.js");
await writeFile(
bundlePath,
`${fixtureDescriptor}export const run = async function* (input) {
yield { role: "a", content: "x", meta: {} };
`${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, "x");
yield { role: "a", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "x" };
}
`,
@@ -357,8 +382,10 @@ export const run = async function* (input) {
expect(add1.ok).toBe(true);
await writeFile(
bundlePath,
`${fixtureDescriptor}export const run = async function* (input) {
yield { role: "a", content: "y", meta: {} };
`${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, "y");
yield { role: "a", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "y" };
}
`,
@@ -371,14 +398,47 @@ export const run = async function* (input) {
expect(bad.ok).toBe(false);
});
test("cas put/get/list/rm use global cas dir (thread id not required for storage)", async () => {
const put = await cmdCasPut(storageRoot, "nonexistent-thread-id", "phase doc");
expect(put.ok).toBe(true);
if (!put.ok) {
return;
}
const hash = put.value;
const blobPath = join(getGlobalCasDir(storageRoot), `${hash}.txt`);
expect(await readFile(blobPath, "utf8")).toBe("phase doc");
const got = await cmdCasGet(storageRoot, "other-thread", hash);
expect(got.ok).toBe(true);
if (!got.ok) {
return;
}
expect(got.value).toBe("phase doc");
const listed = await cmdCasList(storageRoot, "another-thread");
expect(listed.ok).toBe(true);
if (!listed.ok) {
return;
}
expect(listed.value).toContain(hash);
const removed = await cmdCasRm(storageRoot, "rm-thread", hash);
expect(removed.ok).toBe(true);
const missing = await cmdCasGet(storageRoot, "after-rm", hash);
expect(missing.ok).toBe(false);
});
test("rollback rejects missing bundle file for target hash", async () => {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
await writeFile(
bundlePath,
`${fixtureDescriptor}export const run = async function* (input) {
yield { role: "a", content: "x", meta: {} };
`${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, "x");
yield { role: "a", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "x" };
}
`,
@@ -392,8 +452,10 @@ export const run = async function* (input) {
const hash1 = add1.value.hash;
await writeFile(
bundlePath,
`${fixtureDescriptor}export const run = async function* (input) {
yield { role: "a", content: "y", meta: {} };
`${fixtureDescriptor}${wfPutImport}export const run = async function* (_input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, "y");
yield { role: "a", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "y" };
}
`,
@@ -2,6 +2,7 @@ import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createCasStore, getContentMerklePayload, getGlobalCasDir } from "@uncaged/workflow";
import { cmdAdd } from "../src/cmd-add.js";
import { cmdFork } from "../src/cmd-fork.js";
import { cmdRun } from "../src/cmd-run.js";
@@ -9,7 +10,9 @@ import { pathExists } from "../src/fs-utils.js";
import { addCliArgs } from "./bundle-fixture.js";
/** Three-role workflow that respects `input.steps` for fork/resume. */
const threeRoleBundleSource = `export const descriptor = {
const threeRoleBundleSource = `import { putContentMerkleNode } from "@uncaged/workflow";
export const descriptor = {
description: "fork-cli",
roles: {
planner: { description: "planner", schema: {} },
@@ -17,20 +20,21 @@ const threeRoleBundleSource = `export const descriptor = {
reviewer: { description: "reviewer", schema: {} },
},
};
export const run = async function* (input) {
export const run = async function* (input, options) {
const cas = options.cas;
const has = (r) => input.steps.some((s) => s.role === r);
if (!has("planner")) {
yield { role: "planner", content: "p1", meta: { k: "planner" } };
const h = await putContentMerkleNode(cas, "p1");
yield { role: "planner", contentHash: h, meta: { k: "planner" }, refs: [h] };
}
if (!has("coder")) {
yield { role: "coder", content: "c1", meta: { k: "coder" } };
const h = await putContentMerkleNode(cas, "c1");
yield { role: "coder", contentHash: h, meta: { k: "coder" }, refs: [h] };
}
if (!has("reviewer")) {
yield {
role: "reviewer",
content: "rev-" + String(input.steps.length),
meta: { k: "reviewer" },
};
const body = "rev-" + String(input.steps.length);
const h = await putContentMerkleNode(cas, body);
yield { role: "reviewer", contentHash: h, meta: { k: "reviewer" }, refs: [h] };
}
return { returnCode: 0, summary: "done" };
};
@@ -132,7 +136,8 @@ describe("cli fork", () => {
const last = JSON.parse(lines[lines.length - 1] ?? "{}") as Record<string, unknown>;
expect(last.role).toBe("reviewer");
expect(last.content).toBe("rev-1");
const cas = createCasStore(getGlobalCasDir(storageRoot));
expect(await getContentMerklePayload(cas, String(last.contentHash))).toBe("rev-1");
});
test("fork without --from-role retries last role", async () => {
@@ -179,11 +184,12 @@ describe("cli fork", () => {
const replayCoder = JSON.parse(lines[2] ?? "{}") as Record<string, unknown>;
expect(replayCoder.role).toBe("coder");
expect(replayCoder.content).toBe("c1");
const cas = createCasStore(getGlobalCasDir(storageRoot));
expect(await getContentMerklePayload(cas, String(replayCoder.contentHash))).toBe("c1");
const last = JSON.parse(lines[lines.length - 1] ?? "{}") as Record<string, unknown>;
expect(last.role).toBe("reviewer");
expect(last.content).toBe("rev-2");
expect(await getContentMerklePayload(cas, String(last.contentHash))).toBe("rev-2");
});
test("fork rejects unknown role with available names", async () => {
@@ -0,0 +1,162 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { spawnSync } from "node:child_process";
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { fileURLToPath } from "node:url";
import {
createCasStore,
garbageCollectCas,
getGlobalCasDir,
putContentMerkleNode,
} from "@uncaged/workflow";
import { cmdThreadRemove } from "../src/cmd-thread.js";
import { pathExists } from "../src/fs-utils.js";
const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url));
async function writeDemoDataJsonl(params: {
path: string;
threadId: string;
bundleHash: string;
cas: ReturnType<typeof createCasStore>;
activeHash: string;
}): Promise<void> {
const bodyHash = await putContentMerkleNode(params.cas, "p");
const text = [
JSON.stringify({
name: "demo",
hash: params.bundleHash,
threadId: params.threadId,
parameters: { prompt: "hi", options: { maxRounds: 5 } },
timestamp: 100,
}),
JSON.stringify({
role: "planner",
contentHash: bodyHash,
meta: {},
refs: [params.activeHash, bodyHash],
timestamp: 101,
}),
"",
].join("\n");
await writeFile(params.path, text, "utf8");
}
describe("gc cli and garbageCollectCas", () => {
let prevEnv: string | undefined;
let storageRoot: string;
beforeEach(async () => {
prevEnv = process.env.UNCAGED_WORKFLOW_STORAGE_ROOT;
storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-gc-"));
process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = storageRoot;
});
afterEach(async () => {
if (prevEnv === undefined) {
delete process.env.UNCAGED_WORKFLOW_STORAGE_ROOT;
} else {
process.env.UNCAGED_WORKFLOW_STORAGE_ROOT = prevEnv;
}
await rm(storageRoot, { recursive: true, force: true });
});
test("garbageCollectCas keeps CAS entries referenced by thread refs", async () => {
const bundleHash = "C9NMV6V2TQT81";
const threadId = "01AAA1111111111111111111";
const logsDir = join(storageRoot, "logs", bundleHash);
await mkdir(logsDir, { recursive: true });
const cas = createCasStore(getGlobalCasDir(storageRoot));
const activeHash = await cas.put("active-blob");
const orphanHash = await cas.put("orphan-blob");
await writeDemoDataJsonl({
path: join(logsDir, `${threadId}.data.jsonl`),
threadId,
bundleHash,
cas,
activeHash,
});
const gc = await garbageCollectCas(storageRoot);
expect(gc.ok).toBe(true);
if (!gc.ok) {
return;
}
expect(gc.value.scannedThreads).toBe(1);
expect(gc.value.activeRefs).toBe(2);
expect(gc.value.deletedEntries).toBe(1);
expect(gc.value.deletedHashes).toEqual([orphanHash]);
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${activeHash}.txt`))).toBe(true);
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${orphanHash}.txt`))).toBe(false);
});
test("garbageCollectCas deletes orphaned CAS when no threads reference them", async () => {
const cas = createCasStore(getGlobalCasDir(storageRoot));
const orphanHash = await cas.put("lonely");
const gc = await garbageCollectCas(storageRoot);
expect(gc.ok).toBe(true);
if (!gc.ok) {
return;
}
expect(gc.value.scannedThreads).toBe(0);
expect(gc.value.activeRefs).toBe(0);
expect(gc.value.deletedEntries).toBe(1);
expect(gc.value.deletedHashes).toEqual([orphanHash]);
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${orphanHash}.txt`))).toBe(false);
});
test("cli gc prints stats", async () => {
const bundleHash = "C9NMV6V2TQT81";
const threadId = "01BBB2222222222222222222";
const logsDir = join(storageRoot, "logs", bundleHash);
await mkdir(logsDir, { recursive: true });
const cas = createCasStore(getGlobalCasDir(storageRoot));
const activeHash = await cas.put("keep-me");
await cas.put("drop-me");
await writeDemoDataJsonl({
path: join(logsDir, `${threadId}.data.jsonl`),
threadId,
bundleHash,
cas,
activeHash,
});
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const proc = spawnSync(process.execPath, [cliEntryPath, "gc"], { env, encoding: "utf8" });
expect(proc.status).toBe(0);
expect(String(proc.stdout).trim()).toBe("scanned 1 threads, 2 active refs, deleted 1 entries");
});
test("thread rm triggers gc so unreferenced CAS is removed", async () => {
const bundleHash = "C9NMV6V2TQT81";
const threadId = "01CCC3333333333333333333";
const logsDir = join(storageRoot, "logs", bundleHash);
await mkdir(logsDir, { recursive: true });
const cas = createCasStore(getGlobalCasDir(storageRoot));
const activeHash = await cas.put("pinned-by-ref");
await writeDemoDataJsonl({
path: join(logsDir, `${threadId}.data.jsonl`),
threadId,
bundleHash,
cas,
activeHash,
});
const orphanHash = await cas.put("orphan-after-rm");
const orphanPath = join(getGlobalCasDir(storageRoot), `${orphanHash}.txt`);
const removed = await cmdThreadRemove(storageRoot, threadId);
expect(removed.ok).toBe(true);
expect(await pathExists(orphanPath)).toBe(false);
expect(await pathExists(join(getGlobalCasDir(storageRoot), `${activeHash}.txt`))).toBe(false);
});
});
@@ -4,7 +4,9 @@ import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { dirname, join } from "node:path";
import { fileURLToPath } from "node:url";
import { getGlobalCasDir } from "@uncaged/workflow";
import { cmdAdd } from "../src/cmd-add.js";
import { cmdCasPut } from "../src/cmd-cas.js";
import { cmdKill } from "../src/cmd-kill.js";
import { cmdPause } from "../src/cmd-pause.js";
import { cmdPs } from "../src/cmd-ps.js";
@@ -12,9 +14,12 @@ import { cmdResume } from "../src/cmd-resume.js";
import { cmdRun } from "../src/cmd-run.js";
import { cmdThreadRemove, cmdThreadShow } from "../src/cmd-thread.js";
import { cmdThreads } from "../src/cmd-threads.js";
import { pathExists } from "../src/fs-utils.js";
import { pathExists, readTextFileIfExists } from "../src/fs-utils.js";
import { addCliArgs } from "./bundle-fixture.js";
const wfPutImport = `import { putContentMerkleNode } from "@uncaged/workflow";
`;
const threadFixtureDescriptor = `export const descriptor = {
description: "thread-cli",
roles: {
@@ -29,18 +34,26 @@ const threadFixtureDescriptor = `export const descriptor = {
`;
const fastBundleSource = `${threadFixtureDescriptor}
export const run = async function* (input) {
yield { role: "planner", content: "plan", meta: { plan: input.prompt } };
yield { role: "coder", content: "code", meta: { diff: "y" } };
${wfPutImport}
export const run = async function* (input, options) {
const cas = options.cas;
let h = await putContentMerkleNode(cas, "plan");
yield { role: "planner", contentHash: h, meta: { plan: input.prompt }, refs: [h] };
h = await putContentMerkleNode(cas, "code");
yield { role: "coder", contentHash: h, meta: { diff: "y" }, refs: [h] };
return { returnCode: 0, summary: "done" };
};
`;
const slowPlannerBundleSource = `${threadFixtureDescriptor}
export const run = async function* (input) {
${wfPutImport}
export const run = async function* (input, options) {
await new Promise((r) => setTimeout(r, 400));
yield { role: "planner", content: "plan", meta: { plan: input.prompt } };
yield { role: "coder", content: "code", meta: { diff: "y" } };
const cas = options.cas;
let h = await putContentMerkleNode(cas, "plan");
yield { role: "planner", contentHash: h, meta: { plan: input.prompt }, refs: [h] };
h = await putContentMerkleNode(cas, "code");
yield { role: "coder", contentHash: h, meta: { diff: "y" }, refs: [h] };
return { returnCode: 0, summary: "done" };
};
`;
@@ -48,27 +61,38 @@ export const run = async function* (input) {
const cliEntryPath = fileURLToPath(new URL("../src/cli.ts", import.meta.url));
const abortablePlannerBundleSource = `${threadFixtureDescriptor}
export const run = async function* (input) {
${wfPutImport}
export const run = async function* (input, options) {
await new Promise((r) => setTimeout(r, 600));
yield { role: "planner", content: "plan", meta: { plan: input.prompt } };
yield { role: "coder", content: "code", meta: { diff: "y" } };
const cas = options.cas;
let h = await putContentMerkleNode(cas, "plan");
yield { role: "planner", contentHash: h, meta: { plan: input.prompt }, refs: [h] };
h = await putContentMerkleNode(cas, "code");
yield { role: "coder", contentHash: h, meta: { diff: "y" }, refs: [h] };
return { returnCode: 0, summary: "done" };
};
`;
const pauseResumeBundleSource = `${threadFixtureDescriptor}
export const run = async function* (input) {
yield { role: "first", content: "f", meta: {} };
${wfPutImport}
export const run = async function* (_input, options) {
const cas = options.cas;
let h = await putContentMerkleNode(cas, "f");
yield { role: "first", contentHash: h, meta: {}, refs: [h] };
await new Promise((r) => setTimeout(r, 1500));
yield { role: "second", content: "s", meta: {} };
h = await putContentMerkleNode(cas, "s");
yield { role: "second", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "done" };
};
`;
const delayedFirstYieldBundleSource = `${threadFixtureDescriptor}
export const run = async function* (input) {
${wfPutImport}
export const run = async function* (_input, options) {
await new Promise((r) => setTimeout(r, 900));
yield { role: "only", content: "x", meta: {} };
const cas = options.cas;
const h = await putContentMerkleNode(cas, "x");
yield { role: "only", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "done" };
};
`;
@@ -175,6 +199,55 @@ describe("cli thread commands", () => {
expect(await pathExists(dataPath)).toBe(false);
});
test("thread rm runs GC and removes CAS blobs not referenced by any remaining thread", async () => {
const bundleDir = join(storageRoot, "src");
await mkdir(bundleDir, { recursive: true });
const bundlePath = join(bundleDir, "demo.esm.js");
await writeFile(bundlePath, fastBundleSource, "utf8");
const added = await cmdAdd(storageRoot, addCliArgs("solve-issue", bundlePath));
expect(added.ok).toBe(true);
if (!added.ok) {
return;
}
const ran = await cmdRun(storageRoot, "solve-issue", "hello", 5);
expect(ran.ok).toBe(true);
if (!ran.ok) {
return;
}
const threadId = ran.value.threadId;
let threads = await cmdThreads(storageRoot, []);
for (
let attempt = 0;
attempt < 50 && threads.ok && !threads.value.some((l) => l.includes(threadId));
attempt++
) {
await new Promise((r) => setTimeout(r, 20));
threads = await cmdThreads(storageRoot, []);
}
const dataPath = join(storageRoot, "logs", added.value.hash, `${threadId}.data.jsonl`);
const runningPath = join(dirname(dataPath), `${threadId}.running`);
await waitUntilRunningFileAbsent(runningPath, 120);
const put = await cmdCasPut(storageRoot, threadId, "keep-after-thread-rm");
expect(put.ok).toBe(true);
if (!put.ok) {
return;
}
const hash = put.value;
const casBlob = join(getGlobalCasDir(storageRoot), `${hash}.txt`);
const removed = await cmdThreadRemove(storageRoot, threadId);
expect(removed.ok).toBe(true);
const stillThere = await readTextFileIfExists(casBlob);
expect(stillThere).toBeNull();
});
test("cli entrypoint dispatches threads / ps (spawn)", () => {
const env = { ...process.env, UNCAGED_WORKFLOW_STORAGE_ROOT: storageRoot };
const threads = spawnSync(process.execPath, [cliEntryPath, "threads"], {
+20
View File
@@ -2,6 +2,7 @@ import { printCliError, printCliLine, printCliWarn } from "./cli-output.js";
import { cmdAdd, formatAddSuccess, parseAddArgv } from "./cmd-add.js";
import { cmdCasGet, cmdCasList, cmdCasPut, cmdCasRm } from "./cmd-cas.js";
import { cmdFork, parseForkArgv } from "./cmd-fork.js";
import { cmdGc } from "./cmd-gc.js";
import { cmdHistory } from "./cmd-history.js";
import { cmdKill } from "./cmd-kill.js";
import { cmdList, formatListLines } from "./cmd-list.js";
@@ -34,6 +35,7 @@ function usage(): string {
" uncaged-workflow thread <id>",
" uncaged-workflow thread rm <id>",
" uncaged-workflow fork <thread-id> [--from-role <role>]",
" uncaged-workflow gc",
" uncaged-workflow cas get <thread-id> <hash>",
" uncaged-workflow cas put <thread-id> <content>",
" uncaged-workflow cas list <thread-id>",
@@ -266,6 +268,23 @@ async function dispatchThreadBranch(storageRoot: string, rest: string[]): Promis
return dispatchThread(storageRoot, rest);
}
async function dispatchGc(storageRoot: string, argv: string[]): Promise<number> {
if (argv.length > 0) {
printCliError(`${usage()}\n\nerror: gc takes no arguments`);
return 1;
}
const result = await cmdGc(storageRoot);
if (!result.ok) {
printCliError(result.error);
return 1;
}
const stats = result.value;
printCliLine(
`scanned ${stats.scannedThreads} threads, ${stats.activeRefs} active refs, deleted ${stats.deletedEntries} entries`,
);
return 0;
}
async function dispatchFork(storageRoot: string, argv: string[]): Promise<number> {
const parsed = parseForkArgv(argv);
if (!parsed.ok) {
@@ -387,6 +406,7 @@ const COMMAND_TABLE: Record<string, DispatchFn> = {
threads: dispatchThreads,
thread: dispatchThreadBranch,
fork: dispatchFork,
gc: dispatchGc,
cas: dispatchCas,
};
+1 -1
View File
@@ -192,7 +192,7 @@ export async function cmdAdd(
return validated;
}
const extracted = await extractBundleExports(resolvedPath);
const extracted = await extractBundleExports(resolvedPath, { storageRoot });
if (!extracted.ok) {
return extracted;
}
+9 -33
View File
@@ -1,23 +1,11 @@
import { dirname, join } from "node:path";
import { createThreadCas, err, ok, type Result } from "@uncaged/workflow";
import { resolveThreadDataPath } from "./thread-scan.js";
function resolveCasDir(threadDataPath: string, threadId: string): string {
return join(dirname(threadDataPath), `${threadId}.cas`);
}
import { createCasStore, err, getGlobalCasDir, ok, type Result } from "@uncaged/workflow";
export async function cmdCasGet(
storageRoot: string,
threadId: string,
_threadId: string,
hash: string,
): Promise<Result<string, string>> {
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
return err(`thread not found: ${threadId}`);
}
const cas = createThreadCas(resolveCasDir(dataPath, threadId));
const cas = createCasStore(getGlobalCasDir(storageRoot));
const content = await cas.get(hash);
if (content === null) {
return err(`cas entry not found: ${hash}`);
@@ -27,41 +15,29 @@ export async function cmdCasGet(
export async function cmdCasPut(
storageRoot: string,
threadId: string,
_threadId: string,
content: string,
): Promise<Result<string, string>> {
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
return err(`thread not found: ${threadId}`);
}
const cas = createThreadCas(resolveCasDir(dataPath, threadId));
const cas = createCasStore(getGlobalCasDir(storageRoot));
const hash = await cas.put(content);
return ok(hash);
}
export async function cmdCasList(
storageRoot: string,
threadId: string,
_threadId: string,
): Promise<Result<string[], string>> {
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
return err(`thread not found: ${threadId}`);
}
const cas = createThreadCas(resolveCasDir(dataPath, threadId));
const cas = createCasStore(getGlobalCasDir(storageRoot));
const hashes = await cas.list();
return ok(hashes);
}
export async function cmdCasRm(
storageRoot: string,
threadId: string,
_threadId: string,
hash: string,
): Promise<Result<void, string>> {
const dataPath = await resolveThreadDataPath(storageRoot, threadId);
if (dataPath === null) {
return err(`thread not found: ${threadId}`);
}
const cas = createThreadCas(resolveCasDir(dataPath, threadId));
const cas = createCasStore(getGlobalCasDir(storageRoot));
await cas.delete(hash);
return ok(undefined);
}
+2 -1
View File
@@ -65,8 +65,9 @@ export async function cmdFork(
const newThreadId = generateUlid(Date.now());
const stepsOnWire = plan.value.historicalSteps.map((s) => ({
role: s.role,
content: s.content,
contentHash: s.contentHash,
meta: s.meta,
refs: s.refs,
timestamp: s.timestamp,
}));
+5
View File
@@ -0,0 +1,5 @@
import { type GcResult, garbageCollectCas, type Result } from "@uncaged/workflow";
export async function cmdGc(storageRoot: string): Promise<Result<GcResult, string>> {
return garbageCollectCas(storageRoot);
}
+1 -1
View File
@@ -46,7 +46,7 @@ export async function cmdRun(
threadId,
workflowName: name,
prompt,
options: { maxRounds },
options: { maxRounds, depth: 0 },
},
{ awaitResponseLine: false },
);
+4 -4
View File
@@ -1,7 +1,7 @@
import { rm, unlink } from "node:fs/promises";
import { unlink } from "node:fs/promises";
import { dirname, join } from "node:path";
import { err, ok, type Result } from "@uncaged/workflow";
import { err, garbageCollectCas, ok, type Result } from "@uncaged/workflow";
import { readTextFileIfExists } from "./fs-utils.js";
import { resolveThreadDataPath } from "./thread-scan.js";
@@ -33,12 +33,12 @@ export async function cmdThreadRemove(
const dir = dirname(dataPath);
const infoPath = join(dir, `${threadId}.info.jsonl`);
const runningPath = join(dir, `${threadId}.running`);
const casPath = join(dir, `${threadId}.cas`);
await unlink(dataPath);
await unlink(infoPath).catch(() => {});
await unlink(runningPath).catch(() => {});
await rm(casPath, { recursive: true, force: true });
await garbageCollectCas(storageRoot);
return ok(undefined);
}
+1 -1
View File
@@ -54,7 +54,7 @@ export function createCursorAgent(config: CursorAgentConfig): AgentFn {
"From the thread context, determine the absolute filesystem path where the project/repository is located.",
extractCtx,
);
const fullPrompt = buildAgentPrompt(ctx);
const fullPrompt = await buildAgentPrompt(ctx);
const args = [
"-p",
fullPrompt,
+1 -1
View File
@@ -35,7 +35,7 @@ export function createHermesAgent(config: HermesAgentConfig): AgentFn {
const timeoutMs = config.timeout;
return async (ctx) => {
const fullPrompt = buildAgentPrompt(ctx);
const fullPrompt = await buildAgentPrompt(ctx);
const args = [
"chat",
"-q",
@@ -1,8 +1,14 @@
import { describe, expect, test } from "bun:test";
import { START, type ThreadContext } from "@uncaged/workflow";
import { mkdtempSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createCasStore, START, type ThreadContext } from "@uncaged/workflow";
import { createLlmAdapter } from "../src/create-llm-adapter.js";
const casDir = mkdtempSync(join(tmpdir(), "wf-llm-adapter-cas-"));
const testCas = createCasStore(casDir);
function makeCtx(userContent: string): ThreadContext {
return {
start: {
@@ -11,9 +17,11 @@ function makeCtx(userContent: string): ThreadContext {
meta: { maxRounds: 10 },
timestamp: 1,
},
depth: 0,
steps: [],
threadId: "01TEST000000000000000000TR",
currentRole: { name: "planner", systemPrompt: "system instructions" },
cas: testCas,
};
}
@@ -38,4 +38,5 @@ export const coderRole: RoleDefinition<CoderMeta> = {
extractPrompt:
"Extract completedPhase: the planner phase hash finished this round (exact hash string from the plan). If multiple phases were finished in one round, use the last finished phase hash. Extract filesChanged and a summary of the work.",
schema: coderMetaSchema,
extractRefs: (meta) => [meta.completedPhase],
};
@@ -31,4 +31,5 @@ export const committerRole: RoleDefinition<CommitterMeta> = {
extractPrompt:
"Extract the commit result: committed (with branch and SHA), recoverable failure, or unrecoverable failure. Include error details and log references if applicable.",
schema: committerMetaSchema,
extractRefs: null,
};
@@ -49,4 +49,5 @@ export const plannerRole: RoleDefinition<PlannerMeta> = {
extractPrompt:
"Extract the implementation phases from the agent's output. Each phase has a hash (the CAS content-hash returned by the cas put command) and a title (one-line summary).",
schema: plannerMetaSchema,
extractRefs: (meta) => meta.phases.map((p) => p.hash),
};
@@ -47,4 +47,5 @@ export const preparerRole: RoleDefinition<PreparerMeta> = {
extractPrompt:
"Extract repoPath (absolute path), defaultBranch, conventions (summary string or null), and toolchain (packageManager, testCommand, lintCommand, buildCommand — each string or null).",
schema: preparerMetaSchema,
extractRefs: null,
};
@@ -21,4 +21,5 @@ export const reviewerRole: RoleDefinition<ReviewerMeta> = {
extractPrompt:
"Extract the review verdict: approved or rejected. If rejected, list the blocking issues.",
schema: reviewerMetaSchema,
extractRefs: null,
};
@@ -1,5 +1,9 @@
import { afterEach, describe, expect, test } from "bun:test";
import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import {
createCasStore,
createExtract,
END,
type ModeratorContext,
@@ -104,6 +108,7 @@ function makeCtx(
): ModeratorContext<SolveIssueMeta> {
return {
threadId: "01TEST000000000000000000TR",
depth: 0,
start: makeStart(maxRounds),
steps,
};
@@ -112,7 +117,7 @@ function makeCtx(
function preparerStep(): RoleStep<SolveIssueMeta> {
return {
role: "preparer",
content: "prepared",
contentHash: "STUBHASHPREPARER01",
meta: {
repoPath: "/home/user/repos/test",
defaultBranch: "main",
@@ -124,6 +129,7 @@ function preparerStep(): RoleStep<SolveIssueMeta> {
buildCommand: "bun run build",
},
},
refs: [],
timestamp: 0,
};
}
@@ -131,8 +137,9 @@ function preparerStep(): RoleStep<SolveIssueMeta> {
function plannerStep(phases: PlannerMeta["phases"] = DEFAULT_PHASES): RoleStep<SolveIssueMeta> {
return {
role: "planner",
content: "plan",
contentHash: "STUBHASHPLANNER001",
meta: { phases },
refs: phases.map((p) => p.hash),
timestamp: 1,
};
}
@@ -140,8 +147,9 @@ function plannerStep(phases: PlannerMeta["phases"] = DEFAULT_PHASES): RoleStep<S
function coderStep(completedPhase = "4KNMR2PX"): RoleStep<SolveIssueMeta> {
return {
role: "coder",
content: "code",
contentHash: "STUBHASHCODER00001",
meta: { completedPhase, filesChanged: ["a.ts"], summary: "fixed" },
refs: [completedPhase],
timestamp: 2,
};
}
@@ -149,10 +157,11 @@ function coderStep(completedPhase = "4KNMR2PX"): RoleStep<SolveIssueMeta> {
function reviewerStep(approved: boolean): RoleStep<SolveIssueMeta> {
return {
role: "reviewer",
content: "rev",
contentHash: "STUBHASHREVIEWER01",
meta: approved
? { status: "approved" as const }
: { status: "rejected" as const, issues: ["needs fix"] },
refs: [],
timestamp: 3,
};
}
@@ -160,8 +169,9 @@ function reviewerStep(approved: boolean): RoleStep<SolveIssueMeta> {
function committerStep(): RoleStep<SolveIssueMeta> {
return {
role: "committer",
content: "commit",
contentHash: "STUBHASHCOMMITTER1",
meta: { status: "committed", branch: "feat/issue-1", commitSha: "abc1234" },
refs: [],
timestamp: 4,
};
}
@@ -275,10 +285,15 @@ describe("solveIssueModerator", () => {
describe("createSolveIssueRun", () => {
let restoreFetch: (() => void) | null = null;
let casDir: string | undefined;
afterEach(() => {
afterEach(async () => {
restoreFetch?.();
restoreFetch = null;
if (casDir !== undefined) {
await rm(casDir, { recursive: true, force: true }).catch(() => {});
casDir = undefined;
}
});
test("structured extraction yields preparer then planner meta from mocked chat completions", async () => {
@@ -295,10 +310,13 @@ describe("createSolveIssueRun", () => {
};
restoreFetch = installMockChatCompletions([EXPECT_PREPARER_META, EXPECT_PLANNER_META]);
casDir = await mkdtemp(join(tmpdir(), "solve-issue-cas-"));
const cas = createCasStore(casDir);
const run = createSolveIssueRun({ agent: async () => "" }, stubExtract);
const gen = run(
{ prompt: "task", steps: [] },
{ threadId: "01TEST000000000000000000TR", maxRounds: 20 },
{ threadId: "01TEST000000000000000000TR", maxRounds: 20, depth: 0, cas },
);
const first = await gen.next();
expect(first.done).toBe(false);
@@ -330,6 +348,9 @@ describe("createSolveIssueRun", () => {
EXPECT_CODER_META,
]);
casDir = await mkdtemp(join(tmpdir(), "solve-issue-cas-"));
const cas = createCasStore(casDir);
const calls: string[] = [];
const run = createSolveIssueRun(
{
@@ -356,7 +377,7 @@ describe("createSolveIssueRun", () => {
);
const gen = run(
{ prompt: "task", steps: [] },
{ threadId: "01TEST000000000000000000TR", maxRounds: 20 },
{ threadId: "01TEST000000000000000000TR", maxRounds: 20, depth: 0, cas },
);
await gen.next();
expect(calls).toEqual(["preparer"]);
@@ -1,5 +1,8 @@
import { describe, expect, test } from "bun:test";
import { START, type ThreadContext } from "@uncaged/workflow";
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createCasStore, putContentMerkleNode, START, type ThreadContext } from "@uncaged/workflow";
import { buildAgentPrompt } from "../src/index.js";
@@ -13,35 +16,53 @@ function startTask(content: string): ThreadContext["start"] {
}
describe("buildAgentPrompt", () => {
test("includes system prompt and full task; omits tools when there are no steps", () => {
let casRoot: string;
beforeEach(async () => {
casRoot = await mkdtemp(join(tmpdir(), "wf-build-prompt-cas-"));
});
afterEach(async () => {
await rm(casRoot, { recursive: true, force: true });
});
test("includes system prompt and full task; omits tools when there are no steps", async () => {
const cas = createCasStore(casRoot);
const ctx: ThreadContext = {
start: startTask("fix the bug"),
depth: 0,
steps: [],
threadId: "01TEST000000000000000000TR",
currentRole: { name: START, systemPrompt: "You are an agent." },
cas,
};
const text = buildAgentPrompt(ctx);
const text = await buildAgentPrompt(ctx);
expect(text).toContain("You are an agent.");
expect(text).toContain("## Task");
expect(text).toContain("fix the bug");
expect(text).not.toContain("## Tools");
});
test("single step shows full content and meta, and includes tools", () => {
test("single step shows full content and meta, and includes tools", async () => {
const cas = createCasStore(casRoot);
const onlyHash = await putContentMerkleNode(cas, "only step full body");
const ctx: ThreadContext = {
start: startTask("user task"),
depth: 0,
threadId: "01TEST000000000000000000TR",
currentRole: { name: "coder", systemPrompt: "Be helpful." },
cas,
steps: [
{
role: "coder",
content: "only step full body",
contentHash: onlyHash,
meta: { files: ["a.ts"] },
refs: [onlyHash],
timestamp: 2,
},
],
};
const text = buildAgentPrompt(ctx);
const text = await buildAgentPrompt(ctx);
expect(text).toContain("## Task");
expect(text).toContain("user task");
expect(text).toContain("## Step: coder");
@@ -51,27 +72,34 @@ describe("buildAgentPrompt", () => {
expect(text).toContain("uncaged-workflow thread 01TEST000000000000000000TR");
});
test("two or more steps: previous steps are meta-only; latest step is full", () => {
test("two or more steps: previous steps are meta-only; latest step is full", async () => {
const cas = createCasStore(casRoot);
const plannerHash = await putContentMerkleNode(cas, "PLANNER_SECRET_FULL_TEXT");
const coderHash = await putContentMerkleNode(cas, "last step full content");
const ctx: ThreadContext = {
start: startTask("first message full: task content here"),
depth: 0,
threadId: "01TEST000000000000000000TR",
currentRole: { name: "coder", systemPrompt: "System." },
cas,
steps: [
{
role: "planner",
content: "PLANNER_SECRET_FULL_TEXT",
contentHash: plannerHash,
meta: { plan: "short" },
refs: [plannerHash],
timestamp: 2,
},
{
role: "coder",
content: "last step full content",
contentHash: coderHash,
meta: { done: true },
refs: [coderHash],
timestamp: 3,
},
],
};
const text = buildAgentPrompt(ctx);
const text = await buildAgentPrompt(ctx);
expect(text).toContain("first message full: task content here");
expect(text).toContain("## Previous Steps");
expect(text).toContain("### Step 1: planner");
@@ -84,33 +112,42 @@ describe("buildAgentPrompt", () => {
expect(text).toContain("uncaged-workflow thread 01TEST000000000000000000TR");
});
test("middle steps show meta summary only, not full content", () => {
test("middle steps show meta summary only, not full content", async () => {
const cas = createCasStore(casRoot);
const ha = await putContentMerkleNode(cas, "HIDDEN_A");
const hb = await putContentMerkleNode(cas, "HIDDEN_B_MIDDLE");
const hc = await putContentMerkleNode(cas, "VISIBLE_LAST");
const ctx: ThreadContext = {
start: startTask("start"),
depth: 0,
threadId: "01TEST000000000000000000TR",
currentRole: { name: "c", systemPrompt: "S" },
cas,
steps: [
{
role: "a",
content: "HIDDEN_A",
contentHash: ha,
meta: { n: 1 },
refs: [ha],
timestamp: 2,
},
{
role: "b",
content: "HIDDEN_B_MIDDLE",
contentHash: hb,
meta: { n: 2 },
refs: [hb],
timestamp: 3,
},
{
role: "c",
content: "VISIBLE_LAST",
contentHash: hc,
meta: { n: 3 },
refs: [hc],
timestamp: 4,
},
],
};
const text = buildAgentPrompt(ctx);
const text = await buildAgentPrompt(ctx);
expect(text).not.toContain("HIDDEN_A");
expect(text).not.toContain("HIDDEN_B_MIDDLE");
expect(text).toContain('Summary: {"n":1}');
@@ -1,7 +1,16 @@
import type { AgentContext } from "@uncaged/workflow";
import { getContentMerklePayload } from "@uncaged/workflow";
async function resolveStepText(ctx: AgentContext, contentHash: string): Promise<string> {
const text = await getContentMerklePayload(ctx.cas, contentHash);
if (text === null) {
throw new Error(`buildAgentPrompt: missing CAS blob for ${contentHash}`);
}
return text;
}
/** Builds the full agent prompt: system instructions plus summarized thread history. */
export function buildAgentPrompt(ctx: AgentContext): string {
export async function buildAgentPrompt(ctx: AgentContext): Promise<string> {
const lines: string[] = [];
lines.push(ctx.currentRole.systemPrompt);
lines.push("");
@@ -15,10 +24,11 @@ export function buildAgentPrompt(ctx: AgentContext): string {
if (steps.length === 1) {
const s = steps[0];
const body = await resolveStepText(ctx, s.contentHash);
lines.push("");
lines.push(`## Step: ${s.role}`);
lines.push("");
lines.push(s.content);
lines.push(body);
lines.push("");
lines.push(`Meta: ${JSON.stringify(s.meta)}`);
} else {
@@ -31,10 +41,11 @@ export function buildAgentPrompt(ctx: AgentContext): string {
lines.push(`Summary: ${JSON.stringify(s.meta)}`);
}
const last = steps[steps.length - 1];
const lastBody = await resolveStepText(ctx, last.contentHash);
lines.push("");
lines.push(`## Latest Step: ${last.role}`);
lines.push("");
lines.push(last.content);
lines.push(lastBody);
lines.push("");
lines.push(`Meta: ${JSON.stringify(last.meta)}`);
}
@@ -22,6 +22,7 @@ describe("buildDescriptor", () => {
systemPrompt: "You are an analyst.",
extractPrompt: "Extract title and count from the analysis.",
schema,
extractRefs: null,
},
},
moderator: () => END,
@@ -26,6 +26,19 @@ export const run = async function* (input) {
expect(r.ok).toBe(true);
});
test("allows static import of @uncaged/workflow", () => {
const source = `${minimalDescriptor}import { putContentMerkleNode } from "@uncaged/workflow";
export const run = async function* (_input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, "x");
return { returnCode: 0, summary: h };
};
`;
const r = validateWorkflowBundle({ filePath: "/tmp/w.esm.js", source });
expect(r.ok).toBe(true);
});
test("rejects wrong filename suffix", () => {
const r = validateWorkflowBundle({
filePath: "/tmp/w.js",
+18 -12
View File
@@ -3,10 +3,16 @@ import { mkdtemp, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createThreadCas } from "../src/cas.js";
import { createCasStore, createThreadCas } from "../src/cas.js";
import { hashString } from "../src/hash.js";
describe("createThreadCas", () => {
describe("cas module exports", () => {
test("createThreadCas is a deprecated alias of createCasStore", () => {
expect(createThreadCas).toBe(createCasStore);
});
});
describe("createCasStore", () => {
let casDir: string;
beforeEach(async () => {
@@ -18,7 +24,7 @@ describe("createThreadCas", () => {
});
test("put returns consistent hash for same content", async () => {
const cas = createThreadCas(casDir);
const cas = createCasStore(casDir);
const h1 = await cas.put("hello world");
const h2 = await cas.put("hello world");
expect(h1).toBe(h2);
@@ -26,14 +32,14 @@ describe("createThreadCas", () => {
});
test("put returns hash matching hashString", async () => {
const cas = createThreadCas(casDir);
const cas = createCasStore(casDir);
const content = "some content to store";
const h = await cas.put(content);
expect(h).toBe(hashString(content));
});
test("get returns stored content", async () => {
const cas = createThreadCas(casDir);
const cas = createCasStore(casDir);
const content = "line1\nline2\nline3";
const h = await cas.put(content);
const retrieved = await cas.get(h);
@@ -41,13 +47,13 @@ describe("createThreadCas", () => {
});
test("get returns null for missing hash", async () => {
const cas = createThreadCas(casDir);
const cas = createCasStore(casDir);
const result = await cas.get("0000000000000");
expect(result).toBeNull();
});
test("delete removes entry", async () => {
const cas = createThreadCas(casDir);
const cas = createCasStore(casDir);
const h = await cas.put("to be deleted");
await cas.delete(h);
const result = await cas.get(h);
@@ -55,12 +61,12 @@ describe("createThreadCas", () => {
});
test("delete on missing hash does not throw", async () => {
const cas = createThreadCas(casDir);
const cas = createCasStore(casDir);
await cas.delete("0000000000000");
});
test("list returns all stored hashes", async () => {
const cas = createThreadCas(casDir);
const cas = createCasStore(casDir);
const h1 = await cas.put("aaa");
const h2 = await cas.put("bbb");
const h3 = await cas.put("ccc");
@@ -69,13 +75,13 @@ describe("createThreadCas", () => {
});
test("list returns empty array when cas dir does not exist", async () => {
const cas = createThreadCas(join(casDir, "nonexistent"));
const cas = createCasStore(join(casDir, "nonexistent"));
const hashes = await cas.list();
expect(hashes).toEqual([]);
});
test("put is idempotent — same content written twice causes no error", async () => {
const cas = createThreadCas(casDir);
const cas = createCasStore(casDir);
const h1 = await cas.put("idempotent");
const h2 = await cas.put("idempotent");
expect(h1).toBe(h2);
@@ -84,7 +90,7 @@ describe("createThreadCas", () => {
});
test("different content produces different hashes", async () => {
const cas = createThreadCas(casDir);
const cas = createCasStore(casDir);
const h1 = await cas.put("alpha");
const h2 = await cas.put("beta");
expect(h1).not.toBe(h2);
+141 -8
View File
@@ -4,10 +4,17 @@ import { tmpdir } from "node:os";
import { join } from "node:path";
import * as z from "zod/v4";
import { createCasStore } from "../src/cas.js";
import { createWorkflow } from "../src/create-workflow.js";
import { executeThread } from "../src/engine.js";
import { createExtract } from "../src/extract-fn.js";
import { createLogger } from "../src/logger.js";
import {
createContentMerkleNode,
getContentMerklePayload,
parseMerkleNode,
serializeMerkleNode,
} from "../src/merkle.js";
import { END } from "../src/types.js";
const plannerMetaSchema = z.object({
@@ -89,12 +96,14 @@ const demoWorkflow = createWorkflow<DemoMeta>(
systemPrompt: "You are a planner.",
extractPrompt: "Extract plan text and affected files list.",
schema: plannerMetaSchema,
extractRefs: null,
},
coder: {
description: "Demo coder",
systemPrompt: "You are a coder.",
extractPrompt: "Extract the code diff summary.",
schema: coderMetaSchema,
extractRefs: null,
},
},
moderator: (ctx) => {
@@ -138,6 +147,7 @@ describe("executeThread", () => {
const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`);
const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`);
await mkdir(join(root, "logs", hash), { recursive: true });
const cas = createCasStore(join(root, "cas"));
const logger = createLogger({ sink: { kind: "file", path: infoPath } });
const ac = new AbortController();
@@ -148,16 +158,30 @@ describe("executeThread", () => {
{ prompt: "Fix the login redirect bug in #3", steps: [] },
{
maxRounds: 5,
depth: 0,
signal: ac.signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: null,
prefilledDiskSteps: null,
},
{ threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath },
{ threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath, cas },
logger,
);
expect(result.returnCode).toBe(0);
expect(typeof result.rootHash).toBe("string");
expect(result.rootHash.length).toBeGreaterThan(0);
const rootYaml = await cas.get(result.rootHash);
expect(rootYaml).not.toBeNull();
const rootNode = parseMerkleNode(rootYaml ?? "");
expect(rootNode.type).toBe("thread");
const rootPayload = rootNode.payload as Record<string, unknown>;
expect(rootPayload.workflow).toBe("demo-flow");
expect(rootPayload.threadId).toBe(threadId);
const rootResult = rootPayload.result as Record<string, unknown>;
expect(rootResult.returnCode).toBe(0);
expect(rootNode.children.length).toBe(2);
const dataText = await readFile(dataPath, "utf8");
const lines = dataText
@@ -176,16 +200,34 @@ describe("executeThread", () => {
expect(params.prompt).toBe("Fix the login redirect bug in #3");
const opts = params.options as Record<string, unknown>;
expect(opts.maxRounds).toBe(5);
expect(Object.keys(opts).sort()).toEqual(["maxRounds"]);
expect(opts.depth).toBe(0);
expect(Object.keys(opts).sort()).toEqual(["depth", "maxRounds"]);
const role1 = JSON.parse(lines[1] ?? "{}") as Record<string, unknown>;
expect(role1.role).toBe("planner");
expect(role1.content).toBe("plan-body");
expect(typeof role1.contentHash).toBe("string");
expect(await getContentMerklePayload(cas, String(role1.contentHash))).toBe("plan-body");
expect(role1.meta).toEqual({ plan: "do-it", files: ["a.ts"] });
expect(role1.refs).toEqual([role1.contentHash]);
expect(typeof role1.timestamp).toBe("number");
const role2 = JSON.parse(lines[2] ?? "{}") as Record<string, unknown>;
expect(role2.role).toBe("coder");
expect(role2.refs).toEqual([role2.contentHash]);
const step1Yaml = await cas.get(rootNode.children[0] ?? "");
const step2Yaml = await cas.get(rootNode.children[1] ?? "");
expect(step1Yaml).not.toBeNull();
expect(step2Yaml).not.toBeNull();
const step1Node = parseMerkleNode(step1Yaml ?? "");
const step2Node = parseMerkleNode(step2Yaml ?? "");
expect(step1Node.type).toBe("step");
expect(step2Node.type).toBe("step");
expect(step1Node.children).toEqual([String(role1.contentHash)]);
expect(step2Node.children).toEqual([String(role2.contentHash)]);
const step1Payload = step1Node.payload as Record<string, unknown>;
expect(step1Payload.role).toBe("planner");
expect(step1Payload.meta).toEqual({ plan: "do-it", files: ["a.ts"] });
const infoText = await readFile(infoPath, "utf8");
const infoLines = infoText
@@ -213,11 +255,14 @@ describe("executeThread", () => {
const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`);
const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`);
await mkdir(join(root, "logs", hash), { recursive: true });
const cas = createCasStore(join(root, "cas"));
const plannerHash = await cas.put(serializeMerkleNode(createContentMerkleNode("plan-body")));
const logger = createLogger({ sink: { kind: "file", path: infoPath } });
const ac = new AbortController();
const histTs = 9_000_000;
const mergedPlannerRefs = ["CAS111AAAAAAA", plannerHash];
const result = await executeThread(
demoWorkflow,
"demo-flow",
@@ -226,30 +271,38 @@ describe("executeThread", () => {
steps: [
{
role: "planner",
content: "plan-body",
contentHash: plannerHash,
meta: { plan: "do-it", files: ["a.ts"] },
refs: mergedPlannerRefs,
},
],
},
{
maxRounds: 5,
depth: 0,
signal: ac.signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: "01SRC1111111111111111111",
prefilledDiskSteps: [
{
role: "planner",
content: "plan-body",
contentHash: plannerHash,
meta: { plan: "do-it", files: ["a.ts"] },
refs: mergedPlannerRefs,
timestamp: histTs,
},
],
},
{ threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath },
{ threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath, cas },
logger,
);
expect(result.returnCode).toBe(0);
expect(typeof result.rootHash).toBe("string");
const rootYaml = await cas.get(result.rootHash);
const rootNode = parseMerkleNode(rootYaml ?? "");
expect(rootNode.children.length).toBe(2);
const dataText = await readFile(dataPath, "utf8");
const lines = dataText
@@ -264,10 +317,11 @@ describe("executeThread", () => {
const role0 = JSON.parse(lines[1] ?? "{}") as Record<string, unknown>;
expect(role0.role).toBe("planner");
expect(role0.timestamp).toBe(histTs);
expect(role0.refs).toEqual(mergedPlannerRefs);
const role1 = JSON.parse(lines[2] ?? "{}") as Record<string, unknown>;
expect(role1.role).toBe("coder");
expect(role1.content).toBe("code-body");
expect(await getContentMerklePayload(cas, String(role1.contentHash))).toBe("code-body");
} finally {
await rm(root, { recursive: true, force: true });
}
@@ -281,6 +335,7 @@ describe("executeThread", () => {
const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`);
const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`);
await mkdir(join(root, "logs", hash), { recursive: true });
const cas = createCasStore(join(root, "cas"));
const logger = createLogger({ sink: { kind: "file", path: infoPath } });
const ac = new AbortController();
@@ -291,16 +346,23 @@ describe("executeThread", () => {
{ prompt: "hello", steps: [] },
{
maxRounds: 0,
depth: 0,
signal: ac.signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: null,
prefilledDiskSteps: null,
},
{ threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath },
{ threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath, cas },
logger,
);
expect(result.returnCode).toBe(0);
expect(typeof result.rootHash).toBe("string");
const rootYaml = await cas.get(result.rootHash);
const rootNode = parseMerkleNode(rootYaml ?? "");
expect(rootNode.type).toBe("thread");
expect(rootNode.children.length).toBe(0);
const dataText = await readFile(dataPath, "utf8");
const lines = dataText
@@ -312,4 +374,75 @@ describe("executeThread", () => {
await rm(root, { recursive: true, force: true });
}
});
test("Merkle DAG: root → step nodes → content for full thread traversal", async () => {
restoreFetch = installMockChatCompletions([
{ plan: "do-it", files: ["a.ts"] },
{ diff: "+ok" },
]);
const root = await mkdtemp(join(tmpdir(), "wf-engine-dag-"));
try {
const threadId = "01KQXKW18CT8G75T53R8F4G7YG";
const hash = "C9NMV6V2TQT81";
const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`);
const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`);
await mkdir(join(root, "logs", hash), { recursive: true });
const cas = createCasStore(join(root, "cas"));
const logger = createLogger({ sink: { kind: "file", path: infoPath } });
const ac = new AbortController();
const result = await executeThread(
demoWorkflow,
"demo-flow",
{ prompt: "DAG test", steps: [] },
{
maxRounds: 5,
depth: 0,
signal: ac.signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: null,
prefilledDiskSteps: null,
},
{ threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath, cas },
logger,
);
const dataText = await readFile(dataPath, "utf8");
const lines = dataText
.trim()
.split("\n")
.filter((l) => l !== "");
expect(lines.length).toBe(3);
const rolePlanner = JSON.parse(lines[1] ?? "{}") as Record<string, unknown>;
const roleCoder = JSON.parse(lines[2] ?? "{}") as Record<string, unknown>;
const threadYaml = await cas.get(result.rootHash);
expect(threadYaml).not.toBeNull();
const threadNode = parseMerkleNode(threadYaml ?? "");
expect(threadNode.type).toBe("thread");
const bodies: string[] = [];
for (const stepHash of threadNode.children) {
const stepYaml = await cas.get(stepHash);
expect(stepYaml).not.toBeNull();
const stepNode = parseMerkleNode(stepYaml ?? "");
expect(stepNode.type).toBe("step");
expect(stepNode.children.length).toBe(1);
const contentHash = stepNode.children[0];
expect(contentHash).toBeDefined();
const body = await getContentMerklePayload(cas, contentHash ?? "");
expect(body).not.toBeNull();
bodies.push(body ?? "");
}
expect(bodies.sort()).toEqual(["code-body", "plan-body"].sort());
expect(rolePlanner.role).toBe("planner");
expect(roleCoder.role).toBe("coder");
} finally {
await rm(root, { recursive: true, force: true });
}
});
});
@@ -7,9 +7,9 @@ import {
} from "../src/fork-thread.js";
const sampleDataJsonl = `{"name":"demo","hash":"C9NMV6V2TQT81","threadId":"01AAA1111111111111111111","parameters":{"prompt":"hi","options":{"maxRounds":5}},"timestamp":100}
{"role":"planner","content":"p","meta":{},"timestamp":101}
{"role":"coder","content":"c","meta":{},"timestamp":102}
{"role":"reviewer","content":"r","meta":{},"timestamp":103}
{"role":"planner","contentHash":"HP0000000000000000000001","meta":{},"refs":[],"timestamp":101}
{"role":"coder","contentHash":"HP0000000000000000000002","meta":{},"refs":[],"timestamp":102}
{"role":"reviewer","contentHash":"HP0000000000000000000003","meta":{},"refs":[],"timestamp":103}
`;
describe("fork-thread", () => {
@@ -24,6 +24,7 @@ describe("fork-thread", () => {
expect(r.value.start.threadId).toBe("01AAA1111111111111111111");
expect(r.value.start.prompt).toBe("hi");
expect(r.value.start.maxRounds).toBe(5);
expect(r.value.start.depth).toBe(0);
expect(r.value.roleSteps.length).toBe(3);
expect(r.value.roleSteps[0]?.role).toBe("planner");
});
@@ -83,6 +84,24 @@ describe("fork-thread", () => {
expect(r.value.workflowName).toBe("demo");
expect(r.value.historicalSteps.length).toBe(1);
expect(r.value.historicalSteps[0]?.timestamp).toBe(101);
expect(r.value.runOptions).toEqual({ maxRounds: 5 });
expect(r.value.runOptions).toEqual({ maxRounds: 5, depth: 0 });
});
test("parseThreadDataJsonl reads explicit depth from start record", () => {
const text = `{"name":"demo","hash":"H","threadId":"01ZZZZZZZZZZZZZZZZZZZZZZ","parameters":{"prompt":"p","options":{"maxRounds":3,"depth":2}},"timestamp":1}
{"role":"planner","contentHash":"HP0000000000000000000099","meta":{},"refs":[],"timestamp":2}
`;
const r = parseThreadDataJsonl(text);
expect(r.ok).toBe(true);
if (!r.ok) {
return;
}
expect(r.value.start.depth).toBe(2);
const plan = buildForkPlan(text, null);
expect(plan.ok).toBe(true);
if (!plan.ok) {
return;
}
expect(plan.value.runOptions).toEqual({ maxRounds: 3, depth: 2 });
});
});
@@ -0,0 +1,29 @@
import { describe, expect, test } from "bun:test";
import { createContentMerkleNode, parseMerkleNode, serializeMerkleNode } from "../src/merkle.js";
describe("merkle", () => {
test("content node roundtrips through YAML", () => {
const node = createContentMerkleNode("hello\nworld");
const yaml = serializeMerkleNode(node);
const back = parseMerkleNode(yaml);
expect(back).toEqual(node);
});
test("step node with object payload roundtrips", () => {
const node = {
type: "step" as const,
payload: { role: "planner", foo: 1 },
children: ["ABC123", "DEF456"],
};
const yaml = serializeMerkleNode(node);
const back = parseMerkleNode(yaml);
expect(back.type).toBe("step");
expect(back.payload).toEqual({ role: "planner", foo: 1 });
expect(back.children).toEqual(["ABC123", "DEF456"]);
});
test("parse rejects invalid YAML root", () => {
expect(() => parseMerkleNode("[]")).toThrow();
});
});
@@ -0,0 +1,201 @@
import { afterEach, describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, readFile, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import * as z from "zod/v4";
import { createCasStore } from "../src/cas.js";
import { createWorkflow } from "../src/create-workflow.js";
import { executeThread } from "../src/engine.js";
import { createExtract } from "../src/extract-fn.js";
import { buildForkPlan, parseThreadDataJsonl } from "../src/fork-thread.js";
import { createLogger } from "../src/logger.js";
import { END } from "../src/types.js";
const phaseSchema = z.object({
hash: z.string(),
title: z.string(),
});
const plannerMetaSchema = z.object({
phases: z.array(phaseSchema),
});
type RefsDemoMeta = {
planner: z.infer<typeof plannerMetaSchema>;
};
function installMockChatCompletions(sequence: ReadonlyArray<Record<string, unknown>>): () => void {
const origFetch = globalThis.fetch;
let i = 0;
const mockFetch = async (
input: Parameters<typeof fetch>[0],
init?: RequestInit,
): Promise<Response> => {
const args = sequence[i] ?? sequence[sequence.length - 1];
if (args === undefined) {
throw new Error("installMockChatCompletions: empty sequence");
}
i += 1;
void input;
const body = init?.body ? (JSON.parse(String(init.body)) as Record<string, unknown>) : {};
const tools = body.tools;
const firstTool =
Array.isArray(tools) && tools.length > 0 && tools[0] !== null && typeof tools[0] === "object"
? (tools[0] as Record<string, unknown>)
: null;
const fn =
firstTool !== null ? (firstTool.function as Record<string, unknown> | undefined) : undefined;
const toolName = typeof fn?.name === "string" ? fn.name : "extract";
return new Response(
JSON.stringify({
choices: [
{
message: {
tool_calls: [
{
type: "function",
function: {
name: toolName,
arguments: JSON.stringify(args),
},
},
],
},
},
],
}),
{ status: 200, headers: { "Content-Type": "application/json" } },
);
};
globalThis.fetch = Object.assign(mockFetch, {
preconnect: origFetch.preconnect.bind(origFetch),
}) as typeof fetch;
return () => {
globalThis.fetch = origFetch;
};
}
const refsDemoExtract = createExtract({
baseUrl: "http://127.0.0.1:9",
apiKey: "test",
model: "test",
});
const refsDemoWorkflow = createWorkflow<RefsDemoMeta>(
{
roles: {
planner: {
description: "Planner with phase hashes",
systemPrompt: "Plan.",
extractPrompt: "Extract phases with CAS hashes.",
schema: plannerMetaSchema,
extractRefs: (meta) => meta.phases.map((p) => p.hash),
},
},
moderator: (ctx) => (ctx.steps.length === 0 ? "planner" : END),
},
{
agent: async () => "plan-output",
},
refsDemoExtract,
);
describe("RoleStep refs tracking", () => {
let restoreFetch: (() => void) | null = null;
afterEach(() => {
restoreFetch?.();
restoreFetch = null;
});
test("parseThreadDataJsonl reads refs and defaults missing refs to []", () => {
const text = `{"name":"demo","hash":"C9NMV6V2TQT81","threadId":"01AAA1111111111111111111","parameters":{"prompt":"hi","options":{"maxRounds":5}},"timestamp":100}
{"role":"planner","contentHash":"HPAYLOAD111111","meta":{},"refs":["H111AAAAAAAAA","H222AAAAAAAAA"],"timestamp":101}
{"role":"coder","contentHash":"HPAYLOAD222222","meta":{},"timestamp":102}
`;
const r = parseThreadDataJsonl(text);
expect(r.ok).toBe(true);
if (!r.ok) {
return;
}
expect(r.value.roleSteps[0]?.refs).toEqual(["H111AAAAAAAAA", "H222AAAAAAAAA"]);
expect(r.value.roleSteps[1]?.refs).toEqual([]);
});
test("executeThread persists refs from extractRefs on role yields", async () => {
restoreFetch = installMockChatCompletions([
{
phases: [
{ hash: "C9NMV6V2TQT81", title: "phase-a" },
{ hash: "C9NMV6V2TQT82", title: "phase-b" },
],
},
]);
const root = await mkdtemp(join(tmpdir(), "wf-refs-"));
try {
const threadId = "01KQXKW18CT8G75T53R8F4G7YG";
const hash = "C9NMV6V2TQT81";
const dataPath = join(root, "logs", hash, `${threadId}.data.jsonl`);
const infoPath = join(root, "logs", hash, `${threadId}.info.jsonl`);
await mkdir(join(root, "logs", hash), { recursive: true });
const cas = createCasStore(join(root, "cas"));
const logger = createLogger({ sink: { kind: "file", path: infoPath } });
const ac = new AbortController();
const result = await executeThread(
refsDemoWorkflow,
"refs-demo",
{ prompt: "task", steps: [] },
{
maxRounds: 5,
depth: 0,
signal: ac.signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: null,
prefilledDiskSteps: null,
},
{ threadId, hash, dataJsonlPath: dataPath, infoJsonlPath: infoPath, cas },
logger,
);
expect(result.returnCode).toBe(0);
expect(typeof result.rootHash).toBe("string");
expect(result.rootHash.length).toBeGreaterThan(0);
const dataText = await readFile(dataPath, "utf8");
const lines = dataText
.trim()
.split("\n")
.filter((l) => l !== "");
expect(lines.length).toBe(2);
const role1 = JSON.parse(lines[1] ?? "{}") as Record<string, unknown>;
expect(role1.role).toBe("planner");
const refs = role1.refs as string[];
expect(refs).toContain("C9NMV6V2TQT81");
expect(refs).toContain("C9NMV6V2TQT82");
expect(typeof role1.contentHash).toBe("string");
expect(refs).toContain(String(role1.contentHash));
expect(refs.length).toBe(3);
} finally {
await rm(root, { recursive: true, force: true });
}
});
test("buildForkPlan carries refs on historical steps", () => {
const text = `{"name":"demo","hash":"C9NMV6V2TQT81","threadId":"01AAA1111111111111111111","parameters":{"prompt":"hi","options":{"maxRounds":5}},"timestamp":100}
{"role":"planner","contentHash":"HP111111111111","meta":{},"refs":["KEEPREFAAAAAA"],"timestamp":101}
{"role":"coder","contentHash":"HP222222222222","meta":{},"refs":["CODERHASHAAAA"],"timestamp":102}
`;
const plan = buildForkPlan(text, null);
expect(plan.ok).toBe(true);
if (!plan.ok) {
return;
}
expect(plan.value.historicalSteps.length).toBe(1);
expect(plan.value.historicalSteps[0]?.refs).toEqual(["KEEPREFAAAAAA"]);
});
});
@@ -0,0 +1,14 @@
import { describe, expect, test } from "bun:test";
import { join } from "node:path";
import { getDefaultWorkflowStorageRoot, getGlobalCasDir } from "../src/storage-root.js";
describe("getGlobalCasDir", () => {
test("joins cas segment under explicit storage root", () => {
expect(getGlobalCasDir("/tmp/wf-root")).toBe(join("/tmp/wf-root", "cas"));
});
test("defaults to default workflow root when storage root is undefined", () => {
expect(getGlobalCasDir(undefined)).toBe(join(getDefaultWorkflowStorageRoot(), "cas"));
});
});
@@ -10,6 +10,7 @@ describe("RFC-001 thread JSONL shapes", () => {
prompt: "Fix the login redirect bug in #3",
options: {
maxRounds: 5,
depth: 0,
},
},
timestamp: 1714963200000,
@@ -17,15 +18,18 @@ describe("RFC-001 thread JSONL shapes", () => {
const roleRecord = {
role: "planner",
content: "Plan: modify auth middleware...",
contentHash: "CPHASH000000000000000001",
meta: { plan: "...", files: ["src/auth.ts"] },
refs: [] as string[],
timestamp: 1714963201000,
};
expect(Object.keys(startRecord).sort()).toEqual(
["hash", "name", "parameters", "threadId", "timestamp"].sort(),
);
expect(Object.keys(roleRecord).sort()).toEqual(["content", "meta", "role", "timestamp"].sort());
expect(Object.keys(roleRecord).sort()).toEqual(
["contentHash", "meta", "refs", "role", "timestamp"].sort(),
);
});
test("documents the `.info.jsonl` debug record keys", () => {
+20 -7
View File
@@ -5,22 +5,29 @@ import { createConnection } from "node:net";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createCasStore } from "../src/cas.js";
import { createContentMerkleNode, serializeMerkleNode } from "../src/merkle.js";
import { getWorkerHostScriptPath } from "../src/worker-entry-path.js";
const bundleSource = `export const descriptor = {
const bundleSource = `import { putContentMerkleNode } from "@uncaged/workflow";
export const descriptor = {
description: "worker-test",
roles: {
planner: { description: "planner", schema: {} },
coder: { description: "coder", schema: {} },
},
};
export const run = async function* (input) {
export const run = async function* (input, options) {
const cas = options.cas;
const has = (r) => input.steps.some((s) => s.role === r);
if (!has("planner")) {
yield { role: "planner", content: "p", meta: { plan: input.prompt } };
const h = await putContentMerkleNode(cas, "p");
yield { role: "planner", contentHash: h, meta: { plan: input.prompt }, refs: [h] };
}
if (!has("coder")) {
yield { role: "coder", content: "c", meta: { diff: "y" } };
const h = await putContentMerkleNode(cas, "c");
yield { role: "coder", contentHash: h, meta: { diff: "y" }, refs: [h] };
}
return { returnCode: 0, summary: "completed: moderator returned END" };
};
@@ -102,7 +109,7 @@ describe("worker process", () => {
threadId,
workflowName: "demo-flow",
prompt: "hello",
options: { maxRounds: 5 },
options: { maxRounds: 5, depth: 0 },
});
const exitCode: number = await new Promise((resolve) => {
@@ -143,6 +150,11 @@ describe("worker process", () => {
const port = await readReadyPort(child);
const cas = createCasStore(join(root, "cas"));
const plannerReplayHash = await cas.put(
serializeMerkleNode(createContentMerkleNode("p-old")),
);
const threadId = "01KQXKW18CT8G75T53R8F4G7YG";
const srcId = "01SRCMMMMMMMMMMMMMMMMMMMM";
await sendJson(port, {
@@ -150,12 +162,13 @@ describe("worker process", () => {
threadId,
workflowName: "demo-flow",
prompt: "hello",
options: { maxRounds: 5 },
options: { maxRounds: 5, depth: 0 },
steps: [
{
role: "planner",
content: "p-old",
contentHash: plannerReplayHash,
meta: { plan: "z" },
refs: [plannerReplayHash],
timestamp: 555,
},
],
@@ -0,0 +1,223 @@
import { afterEach, describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, readdir, readFile, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import * as z from "zod/v4";
import { createCasStore } from "../src/cas.js";
import { createWorkflow } from "../src/create-workflow.js";
import { executeThread } from "../src/engine.js";
import { createExtract } from "../src/extract-fn.js";
import { hashWorkflowBundleBytes } from "../src/hash.js";
import { createLogger } from "../src/logger.js";
import { getContentMerklePayload, parseMerkleNode } from "../src/merkle.js";
import {
readWorkflowRegistry,
registerWorkflowVersion,
writeWorkflowRegistry,
} from "../src/registry.js";
import { END } from "../src/types.js";
import { workflowAsAgent } from "../src/workflow-as-agent.js";
const callerMetaSchema = z.object({ done: z.literal(true) });
type ParentMeta = {
caller: z.infer<typeof callerMetaSchema>;
};
function installMockChatCompletions(sequence: ReadonlyArray<Record<string, unknown>>): () => void {
const origFetch = globalThis.fetch;
let i = 0;
const mockFetch = async (
input: Parameters<typeof fetch>[0],
init?: RequestInit,
): Promise<Response> => {
const args = sequence[i] ?? sequence[sequence.length - 1];
if (args === undefined) {
throw new Error("installMockChatCompletions: empty sequence");
}
i += 1;
void input;
const body = init?.body ? (JSON.parse(String(init.body)) as Record<string, unknown>) : {};
const tools = body.tools;
const firstTool =
Array.isArray(tools) && tools.length > 0 && tools[0] !== null && typeof tools[0] === "object"
? (tools[0] as Record<string, unknown>)
: null;
const fn =
firstTool !== null ? (firstTool.function as Record<string, unknown> | undefined) : undefined;
const toolName = typeof fn?.name === "string" ? fn.name : "extract";
return new Response(
JSON.stringify({
choices: [
{
message: {
tool_calls: [
{
type: "function",
function: {
name: toolName,
arguments: JSON.stringify(args),
},
},
],
},
},
],
}),
{ status: 200, headers: { "Content-Type": "application/json" } },
);
};
globalThis.fetch = Object.assign(mockFetch, {
preconnect: origFetch.preconnect.bind(origFetch),
}) as typeof fetch;
return () => {
globalThis.fetch = origFetch;
};
}
const parentExtract = createExtract({
baseUrl: "http://127.0.0.1:9",
apiKey: "test",
model: "test",
});
const childBundleSource = `import { putContentMerkleNode } from "@uncaged/workflow";
export const descriptor = {
description: "child-integration",
roles: {
agent: {
description: "agent",
schema: { type: "object", properties: {}, additionalProperties: true },
},
},
};
export async function* run(input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, "child-body");
yield { role: "agent", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "child-done:" + input.prompt };
}
`;
async function installChildWorkflow(storageRoot: string): Promise<{ hash: string }> {
const bytes = new TextEncoder().encode(childBundleSource);
const hash = hashWorkflowBundleBytes(bytes);
await mkdir(join(storageRoot, "bundles"), { recursive: true });
await writeFile(join(storageRoot, "bundles", `${hash}.esm.js`), childBundleSource, "utf8");
const reg = await readWorkflowRegistry(storageRoot);
if (!reg.ok) {
throw reg.error;
}
const next = registerWorkflowVersion(reg.value, "child-wf", hash, Date.now());
const wr = await writeWorkflowRegistry(storageRoot, next);
if (!wr.ok) {
throw wr.error;
}
return { hash };
}
describe("workflowAsAgent integration", () => {
let restoreFetch: (() => void) | null = null;
afterEach(() => {
restoreFetch?.();
restoreFetch = null;
});
test("createWorkflow parent invokes nested workflow via workflowAsAgent", async () => {
restoreFetch = installMockChatCompletions([{ done: true }]);
const root = await mkdtemp(join(tmpdir(), "wf-waa-int-"));
try {
const { hash: childHash } = await installChildWorkflow(root);
const parentWorkflow = createWorkflow<ParentMeta>(
{
roles: {
caller: {
description: "delegates to child workflow",
systemPrompt: "system",
extractPrompt: "extract done flag",
schema: callerMetaSchema,
extractRefs: null,
},
},
moderator: (ctx) => (ctx.steps.length === 0 ? "caller" : END),
},
{ agent: workflowAsAgent("child-wf", { storageRoot: root }) },
parentExtract,
);
const threadId = "01KQXKW18CT8G75T53R8F4G7YG";
const parentHash = "C9NMV6V2TQT81";
const dataPath = join(root, "logs", parentHash, `${threadId}.data.jsonl`);
const infoPath = join(root, "logs", parentHash, `${threadId}.info.jsonl`);
await mkdir(join(root, "logs", parentHash), { recursive: true });
const cas = createCasStore(join(root, "cas"));
const logger = createLogger({ sink: { kind: "file", path: infoPath } });
const ac = new AbortController();
const result = await executeThread(
parentWorkflow,
"parent-wf",
{ prompt: "from-parent", steps: [] },
{
maxRounds: 5,
depth: 0,
signal: ac.signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: null,
prefilledDiskSteps: null,
},
{ threadId, hash: parentHash, dataJsonlPath: dataPath, infoJsonlPath: infoPath, cas },
logger,
);
expect(result.returnCode).toBe(0);
expect(typeof result.rootHash).toBe("string");
const parentText = await readFile(dataPath, "utf8");
const parentLines = parentText
.trim()
.split("\n")
.filter((l) => l !== "");
expect(parentLines.length).toBe(2);
const callerLine = JSON.parse(parentLines[1] ?? "{}") as Record<string, unknown>;
expect(callerLine.role).toBe("caller");
const childRootHash = await getContentMerklePayload(cas, String(callerLine.contentHash));
expect(childRootHash).not.toBeNull();
const childThreadYaml = await cas.get(childRootHash ?? "");
expect(childThreadYaml).not.toBeNull();
const childThreadNode = parseMerkleNode(childThreadYaml ?? "");
expect(childThreadNode.type).toBe("thread");
const childPayload = childThreadNode.payload as Record<string, unknown>;
expect(childPayload.workflow).toBe("child-wf");
const childResult = childPayload.result as Record<string, unknown>;
expect(childResult.summary).toBe("child-done:from-parent");
const childDir = join(root, "logs", childHash);
const childFiles = await readdir(childDir);
const childDataName = childFiles.find((n) => n.endsWith(".data.jsonl"));
expect(childDataName).toBeDefined();
const childText = await readFile(join(childDir, childDataName ?? ""), "utf8");
const childStart = JSON.parse(
childText
.trim()
.split("\n")
.filter((l) => l !== "")[0] ?? "{}",
) as Record<string, unknown>;
expect(childStart.forkFrom).toEqual({ threadId });
const childOpts = (childStart.parameters as Record<string, unknown>).options as Record<
string,
unknown
>;
expect(childOpts.depth).toBe(1);
} finally {
await rm(root, { recursive: true, force: true });
}
});
});
@@ -0,0 +1,128 @@
import { describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createCasStore } from "../src/cas.js";
import { hashWorkflowBundleBytes } from "../src/hash.js";
import { parseMerkleNode } from "../src/merkle.js";
import {
readWorkflowRegistry,
registerWorkflowVersion,
writeWorkflowRegistry,
} from "../src/registry.js";
import { type AgentContext, START } from "../src/types.js";
import { workflowAsAgent } from "../src/workflow-as-agent.js";
function makeAgentCtx(params: {
storageRoot: string;
depth: number;
prompt: string;
maxRounds: number;
}): AgentContext {
const ts = Date.now();
return {
threadId: "01PARENT000000000000000001AA",
depth: params.depth,
start: {
role: START,
content: params.prompt,
meta: { maxRounds: params.maxRounds },
timestamp: ts,
},
steps: [],
currentRole: {
name: "caller",
systemPrompt: "caller",
},
cas: createCasStore(join(params.storageRoot, "agent-ctx-cas")),
};
}
const childBundleSource = `import { putContentMerkleNode } from "@uncaged/workflow";
export const descriptor = {
description: "child-test",
roles: {
agent: {
description: "agent",
schema: { type: "object", properties: {}, additionalProperties: true },
},
},
};
export async function* run(input, options) {
const cas = options.cas;
const h = await putContentMerkleNode(cas, "child-body");
yield { role: "agent", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "child-done:" + input.prompt };
}
`;
async function installChildWorkflow(storageRoot: string): Promise<{ hash: string }> {
const bytes = new TextEncoder().encode(childBundleSource);
const hash = hashWorkflowBundleBytes(bytes);
await mkdir(join(storageRoot, "bundles"), { recursive: true });
await writeFile(join(storageRoot, "bundles", `${hash}.esm.js`), childBundleSource, "utf8");
const reg = await readWorkflowRegistry(storageRoot);
if (!reg.ok) {
throw reg.error;
}
const next = registerWorkflowVersion(reg.value, "child-wf", hash, Date.now());
const wr = await writeWorkflowRegistry(storageRoot, next);
if (!wr.ok) {
throw wr.error;
}
return { hash };
}
describe("workflowAsAgent", () => {
test("returns error when workflow name is not registered", async () => {
const root = await mkdtemp(join(tmpdir(), "wf-waa-missing-"));
try {
const agent = workflowAsAgent("missing-wf", { storageRoot: root });
const out = await agent(
makeAgentCtx({ storageRoot: root, depth: 0, prompt: "x", maxRounds: 5 }),
);
expect(out).toContain("not found in registry");
expect(out).toContain("missing-wf");
} finally {
await rm(root, { recursive: true, force: true });
}
});
test("runs registered workflow and returns child thread root CAS hash", async () => {
const root = await mkdtemp(join(tmpdir(), "wf-waa-ok-"));
try {
await installChildWorkflow(root);
const agent = workflowAsAgent("child-wf", { storageRoot: root });
const out = await agent(
makeAgentCtx({ storageRoot: root, depth: 0, prompt: "hello-parent", maxRounds: 5 }),
);
const cas = createCasStore(join(root, "cas"));
const threadYaml = await cas.get(out);
expect(threadYaml).not.toBeNull();
const node = parseMerkleNode(threadYaml ?? "");
expect(node.type).toBe("thread");
const payload = node.payload as Record<string, unknown>;
expect(payload.workflow).toBe("child-wf");
const resultObj = payload.result as Record<string, unknown>;
expect(resultObj.summary).toBe("child-done:hello-parent");
expect(node.children.length).toBe(1);
} finally {
await rm(root, { recursive: true, force: true });
}
});
test("enforces depth limit (returns error string, does not throw)", async () => {
const root = await mkdtemp(join(tmpdir(), "wf-waa-depth-"));
try {
const agent = workflowAsAgent("child-wf", { storageRoot: root });
const out = await agent(
makeAgentCtx({ storageRoot: root, depth: 3, prompt: "x", maxRounds: 5 }),
);
expect(out).toContain("depth limit");
} finally {
await rm(root, { recursive: true, force: true });
}
});
});
@@ -0,0 +1,8 @@
import { pathToFileURL } from "node:url";
/**
* Dynamic-import a workflow bundle path (see {@link extractBundleExports} — symlink must exist first).
*/
export async function importWorkflowBundleModule(bundlePath: string): Promise<unknown> {
return import(pathToFileURL(bundlePath).href);
}
+7 -4
View File
@@ -41,9 +41,12 @@ function isAllowedImportSpecifier(spec: string): boolean {
if (spec.length === 0) {
return false;
}
if (spec.startsWith(".") || spec.startsWith("/")) {
if (spec.startsWith(".") || spec.startsWith("/") || spec.startsWith("file:")) {
return false;
}
if (spec === "@uncaged/workflow") {
return true;
}
return isBuiltin(spec);
}
@@ -297,7 +300,7 @@ function validateImportDeclaration(node: ImportDeclaration): string | null {
return "only static string import specifiers are allowed";
}
if (!isAllowedImportSpecifier(spec)) {
return `disallowed import specifier "${spec}" (only Node built-ins are allowed)`;
return `disallowed import specifier "${spec}" (only Node built-ins and "@uncaged/workflow" are allowed)`;
}
return null;
}
@@ -312,7 +315,7 @@ function validateExportSource(
return staticMessage;
}
if (!isAllowedImportSpecifier(spec)) {
return `${disallowedPrefix} "${spec}" (only Node built-ins are allowed)`;
return `${disallowedPrefix} "${spec}" (only Node built-ins and "@uncaged/workflow" are allowed)`;
}
return null;
}
@@ -365,7 +368,7 @@ function bundleConstraintViolationForNode(node: Node): string | null {
/**
* Validate RFC-001 bundle rules: single-file ESM shape, named exports `run` + `descriptor`,
* no default export, no dynamic `import()`, static imports restricted to Node builtins.
* no default export, no dynamic `import()`, static imports restricted to Node builtins plus `@uncaged/workflow`.
*/
export function validateWorkflowBundle(input: WorkflowBundleValidationInput): Result<void, string> {
if (!endsWithEsmJs(input.filePath)) {
+4 -1
View File
@@ -10,7 +10,7 @@ export type CasStore = {
list(): Promise<string[]>;
};
export function createThreadCas(casDir: string): CasStore {
export function createCasStore(casDir: string): CasStore {
async function ensureDir(): Promise<void> {
await mkdir(casDir, { recursive: true });
}
@@ -68,3 +68,6 @@ export function createThreadCas(casDir: string): CasStore {
},
};
}
/** @deprecated Use {@link createCasStore} — CAS is global, not per-thread. */
export const createThreadCas = createCasStore;
+35 -5
View File
@@ -1,19 +1,22 @@
import type { ExtractFn } from "./extract-fn.js";
import { putContentMerkleNode } from "./merkle.js";
import { mergeRefsWithContentHash } from "./refs-field.js";
import {
type AgentBinding,
type AgentContext,
END,
type ExtractContext,
type ModeratorContext,
type RoleDefinition,
type RoleMeta,
type RoleOutput,
type RoleStep,
START,
type ThreadInput,
type WorkflowCompletion,
type WorkflowDefinition,
type WorkflowFn,
type WorkflowFnOptions,
type WorkflowResult,
} from "./types.js";
function isRoleNext<M extends RoleMeta>(
@@ -22,6 +25,17 @@ function isRoleNext<M extends RoleMeta>(
return next !== END;
}
function resolveExtractedRefs(
roleDef: RoleDefinition<Record<string, unknown>>,
meta: unknown,
): string[] {
const extractRefsFn = roleDef.extractRefs;
if (extractRefsFn === null || typeof extractRefsFn !== "function") {
return [];
}
return extractRefsFn(meta as Record<string, unknown>);
}
/**
* Binds pure role definitions + moderator to runtime agents and structured extraction.
* Assign with `export const run = createWorkflow(def, binding, extract)`.
@@ -34,7 +48,7 @@ export function createWorkflow<M extends RoleMeta>(
return async function* workflowLoop(
input: ThreadInput,
options: WorkflowFnOptions,
): AsyncGenerator<RoleOutput, WorkflowResult> {
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
const nowMs = Date.now();
const start: ModeratorContext<M>["start"] = {
role: START,
@@ -46,8 +60,9 @@ export function createWorkflow<M extends RoleMeta>(
const baseTs = Date.now();
let steps: RoleStep<M>[] = input.steps.map((out, i) => ({
role: out.role,
content: out.content,
contentHash: out.contentHash,
meta: out.meta,
refs: out.refs,
timestamp: baseTs + i,
})) as RoleStep<M>[];
@@ -61,6 +76,7 @@ export function createWorkflow<M extends RoleMeta>(
const modCtx: ModeratorContext<M> = {
threadId: options.threadId,
depth: options.depth,
start,
steps,
};
@@ -79,6 +95,7 @@ export function createWorkflow<M extends RoleMeta>(
const agentCtx: AgentContext<M> = {
...modCtx,
currentRole: { name: next, systemPrompt: roleDef.systemPrompt },
cas: options.cas,
};
const agent = binding.overrides?.[next] ?? binding.agent;
@@ -96,15 +113,28 @@ export function createWorkflow<M extends RoleMeta>(
extractCtx as unknown as ExtractContext,
);
const contentHash = await putContentMerkleNode(options.cas, raw);
const refs = mergeRefsWithContentHash(
resolveExtractedRefs(roleDef as unknown as RoleDefinition<Record<string, unknown>>, meta),
contentHash,
);
const ts = Date.now();
const step = {
role: next,
content: raw,
contentHash,
meta,
refs,
timestamp: ts,
} as RoleStep<M>;
yield { role: step.role, content: step.content, meta: step.meta };
yield {
role: step.role,
contentHash: step.contentHash,
meta: step.meta,
refs: step.refs,
};
steps = [...steps, step];
}
+135 -16
View File
@@ -1,26 +1,39 @@
import { appendFile, mkdir } from "node:fs/promises";
import { dirname } from "node:path";
import type { CasStore } from "./cas.js";
import type { LogFn } from "./logger.js";
import type { ThreadInput, WorkflowFn, WorkflowFnOptions, WorkflowResult } from "./types.js";
import { getContentMerklePayload, putStepMerkleNode, putThreadMerkleNode } from "./merkle.js";
import { normalizeRefsField } from "./refs-field.js";
import type {
ThreadInput,
WorkflowCompletion,
WorkflowFn,
WorkflowFnOptions,
WorkflowResult,
} from "./types.js";
export type ExecuteThreadIo = {
threadId: string;
hash: string;
dataJsonlPath: string;
infoJsonlPath: string;
cas: CasStore;
};
/** One persisted role line in `.data.jsonl` (engine adds these for fork replay before running the generator). */
export type PrefilledDiskStep = {
role: string;
content: string;
contentHash: string;
meta: Record<string, unknown>;
refs: string[];
timestamp: number;
};
export type ExecuteThreadOptions = {
maxRounds: number;
/** Passed to the bundle as `WorkflowFnOptions.depth`. */
depth: number;
signal: AbortSignal;
/** Invoked after each successful yield (and outer-loop checks); used for pause/resume. */
awaitAfterEachYield: () => Promise<void>;
@@ -38,50 +51,123 @@ async function appendDataLine(path: string, record: unknown): Promise<void> {
await appendFile(path, line, "utf8");
}
async function finalizeThreadResult(params: {
cas: CasStore;
workflowName: string;
threadId: string;
stepMerkleHashes: readonly string[];
completion: WorkflowCompletion;
}): Promise<WorkflowResult> {
const rootHash = await putThreadMerkleNode(
params.cas,
{
workflow: params.workflowName,
threadId: params.threadId,
result: {
returnCode: params.completion.returnCode,
summary: params.completion.summary,
},
},
params.stepMerkleHashes,
);
return {
returnCode: params.completion.returnCode,
summary: params.completion.summary,
rootHash,
};
}
async function driveWorkflowGenerator(params: {
fn: WorkflowFn;
workflowName: string;
input: ThreadInput;
bundleOptions: WorkflowFnOptions;
executeOptions: ExecuteThreadOptions;
dataJsonlPath: string;
threadId: string;
logger: LogFn;
cas: CasStore;
stepMerkleHashes: string[];
}): Promise<WorkflowResult> {
const { fn, input, bundleOptions, executeOptions, dataJsonlPath, threadId, logger } = params;
const {
fn,
workflowName,
input,
bundleOptions,
executeOptions,
dataJsonlPath,
threadId,
logger,
cas,
stepMerkleHashes,
} = params;
const gen = fn(input, bundleOptions);
let written = 0;
while (true) {
if (executeOptions.signal.aborted) {
logger("V8JX4NP2", `thread ${threadId} aborted`);
return { returnCode: 130, summary: "thread aborted" };
return await finalizeThreadResult({
cas,
workflowName,
threadId,
stepMerkleHashes,
completion: { returnCode: 130, summary: "thread aborted" },
});
}
if (written >= executeOptions.maxRounds) {
logger("R3CW7YBQ", `thread ${threadId} stopped at maxRounds=${executeOptions.maxRounds}`);
return {
returnCode: 0,
summary: `completed: reached maxRounds (${executeOptions.maxRounds})`,
};
return await finalizeThreadResult({
cas,
workflowName,
threadId,
stepMerkleHashes,
completion: {
returnCode: 0,
summary: `completed: reached maxRounds (${executeOptions.maxRounds})`,
},
});
}
const iterResult = await gen.next();
if (iterResult.done) {
logger("F3HN8QKP", `thread ${threadId} generator finished`);
return iterResult.value;
const completion = iterResult.value;
return await finalizeThreadResult({
cas,
workflowName,
threadId,
stepMerkleHashes,
completion,
});
}
written++;
const step = iterResult.value;
const resolved = await getContentMerklePayload(cas, step.contentHash);
if (resolved === null) {
throw new Error(
`role step ${step.role}: CAS blob missing for contentHash ${step.contentHash}`,
);
}
const ts = Date.now();
await appendDataLine(dataJsonlPath, {
role: step.role,
content: step.content,
contentHash: step.contentHash,
meta: step.meta,
refs: normalizeRefsField(step.refs),
timestamp: ts,
});
const stepNodeHash = await putStepMerkleNode(
cas,
{ role: step.role, meta: step.meta },
step.contentHash,
);
stepMerkleHashes.push(stepNodeHash);
logger("N7BW4YHQ", `thread ${threadId} wrote role ${step.role}`);
await Promise.race([
@@ -97,7 +183,13 @@ async function driveWorkflowGenerator(params: {
if (executeOptions.signal.aborted) {
logger("V8JX4NP4", `thread ${threadId} aborted`);
return { returnCode: 130, summary: "thread aborted" };
return await finalizeThreadResult({
cas,
workflowName,
threadId,
stepMerkleHashes,
completion: { returnCode: 130, summary: "thread aborted" },
});
}
}
}
@@ -133,6 +225,7 @@ export async function executeThread(
prompt: input.prompt,
options: {
maxRounds: options.maxRounds,
depth: options.depth,
},
},
timestamp: nowMs,
@@ -145,37 +238,63 @@ export async function executeThread(
logger("T9HQ2KHM", `thread ${io.threadId} started for workflow ${workflowName}`);
const stepMerkleHashes: string[] = [];
if (prefilled !== null) {
for (const row of prefilled) {
const prefilledPayload = await getContentMerklePayload(io.cas, row.contentHash);
if (prefilledPayload === null) {
throw new Error(
`prefilled step ${row.role}: CAS blob missing for contentHash ${row.contentHash}`,
);
}
await appendDataLine(io.dataJsonlPath, {
role: row.role,
content: row.content,
contentHash: row.contentHash,
meta: row.meta,
refs: normalizeRefsField(row.refs),
timestamp: row.timestamp,
});
const stepNodeHash = await putStepMerkleNode(
io.cas,
{ role: row.role, meta: row.meta },
row.contentHash,
);
stepMerkleHashes.push(stepNodeHash);
}
}
if (options.maxRounds <= 0) {
logger("R3CW7YBQ", `thread ${io.threadId} stopped at maxRounds=${options.maxRounds}`);
return {
returnCode: 0,
summary: `completed: reached maxRounds (${options.maxRounds})`,
};
return await finalizeThreadResult({
cas: io.cas,
workflowName,
threadId: io.threadId,
stepMerkleHashes,
completion: {
returnCode: 0,
summary: `completed: reached maxRounds (${options.maxRounds})`,
},
});
}
const bundleOptions: WorkflowFnOptions = {
threadId: io.threadId,
maxRounds: options.maxRounds,
depth: options.depth,
cas: io.cas,
};
return await driveWorkflowGenerator({
fn,
workflowName,
input,
bundleOptions,
executeOptions: options,
dataJsonlPath: io.dataJsonlPath,
threadId: io.threadId,
logger,
cas: io.cas,
stepMerkleHashes,
});
}
@@ -0,0 +1,36 @@
import { mkdir, readlink, symlink, unlink } from "node:fs/promises";
import path from "node:path";
import { fileURLToPath } from "node:url";
/** This module lives in `@uncaged/workflow/src`; parent dir is the package root. */
function installedWorkflowPackageDir(): string {
return fileURLToPath(new URL("..", import.meta.url));
}
/**
* Ensures `<storageRoot>/node_modules/@uncaged/workflow` points at the installed `@uncaged/workflow`
* package so workflow bundles loaded from `<storageRoot>/bundles/*.esm.js` can resolve `import "@uncaged/workflow"`.
*/
export async function ensureUncagedWorkflowSymlink(storageRoot: string): Promise<void> {
const target = installedWorkflowPackageDir();
const linkDir = path.join(storageRoot, "node_modules", "@uncaged");
const linkPath = path.join(linkDir, "workflow");
await mkdir(linkDir, { recursive: true });
try {
const existing = await readlink(linkPath);
const normalizedExisting = path.resolve(linkDir, existing);
if (normalizedExisting === target) {
return;
}
await unlink(linkPath);
} catch (e) {
const errObj = e as NodeJS.ErrnoException;
if (errObj.code !== "ENOENT" && errObj.code !== "EINVAL") {
throw e;
}
}
const linkType = process.platform === "win32" ? "junction" : "dir";
await symlink(target, linkPath, linkType);
}
@@ -1,5 +1,5 @@
import { pathToFileURL } from "node:url";
import { importWorkflowBundleModule } from "./bundle-import-env.js";
import { ensureUncagedWorkflowSymlink } from "./ensure-uncaged-workflow-symlink.js";
import { err, ok, type Result } from "./result.js";
import type { WorkflowFn } from "./types.js";
import type { WorkflowDescriptor } from "./workflow-descriptor.js";
@@ -10,14 +10,23 @@ export type ExtractedBundleExports = {
descriptor: WorkflowDescriptor;
};
export type ExtractBundleExportsOptions = {
/** When set, ensures `node_modules/@uncaged/workflow` exists under this root before import. */
storageRoot: string | null;
};
/** Load a workflow `.esm.js` bundle and read its named exports (`run`, `descriptor`). */
export async function extractBundleExports(
bundlePath: string,
options: ExtractBundleExportsOptions = { storageRoot: null },
): Promise<Result<ExtractedBundleExports, string>> {
let modUnknown: unknown;
try {
if (options.storageRoot !== null) {
await ensureUncagedWorkflowSymlink(options.storageRoot);
}
// Dynamic import required: user bundle path resolved at runtime
modUnknown = await import(pathToFileURL(bundlePath).href);
modUnknown = await importWorkflowBundleModule(bundlePath);
} catch (e) {
const message = e instanceof Error ? e.message : String(e);
return err(`failed to import bundle: ${message}`);
+6 -1
View File
@@ -1,6 +1,7 @@
import type * as z from "zod/v4";
import { llmExtractWithRetry } from "./llm-extract.js";
import { getContentMerklePayload } from "./merkle.js";
import type { ExtractContext, LlmProvider } from "./types.js";
export type ExtractFn = <T extends Record<string, unknown>>(
@@ -29,8 +30,12 @@ export function createExtract(provider: LlmProvider): ExtractFn {
if (ctx.steps.length > 0) {
lines.push("## Thread History");
for (const step of ctx.steps) {
const body = await getContentMerklePayload(ctx.cas, step.contentHash);
if (body === null) {
throw new Error(`extract: missing CAS blob for step ${step.role}: ${step.contentHash}`);
}
lines.push(`### ${step.role}`);
lines.push(step.content);
lines.push(body);
lines.push(`Meta: ${JSON.stringify(step.meta)}`);
lines.push("");
}
+14 -6
View File
@@ -1,3 +1,4 @@
import { normalizeRefsField } from "./refs-field.js";
import { err, ok, type Result } from "./result.js";
import type { RoleOutput } from "./types.js";
@@ -10,6 +11,7 @@ export type ParsedThreadStartRecord = {
threadId: string;
prompt: string;
maxRounds: number;
depth: number;
};
function parseRoleLine(
@@ -17,14 +19,14 @@ function parseRoleLine(
lineIndex: number,
): Result<ForkHistoricalStep, string> {
const role = obj.role;
const content = obj.content;
const contentHash = obj.contentHash;
const meta = obj.meta;
const timestamp = obj.timestamp;
if (typeof role !== "string") {
return err(`invalid role record at line ${lineIndex}: missing role`);
}
if (typeof content !== "string") {
return err(`invalid role record at line ${lineIndex}: missing content`);
if (typeof contentHash !== "string") {
return err(`invalid role record at line ${lineIndex}: missing contentHash`);
}
if (meta === null || typeof meta !== "object") {
return err(`invalid role record at line ${lineIndex}: missing meta`);
@@ -34,8 +36,9 @@ function parseRoleLine(
}
return ok({
role,
content,
contentHash,
meta: meta as Record<string, unknown>,
refs: normalizeRefsField(obj.refs),
timestamp,
});
}
@@ -76,12 +79,17 @@ function parseStartRecordLine(firstLine: string): Result<ParsedThreadStartRecord
return err("start record missing parameters.options.maxRounds");
}
const depthRaw = optRec.depth;
const depth =
typeof depthRaw === "number" && Number.isFinite(depthRaw) ? Math.trunc(depthRaw) : 0;
return ok({
workflowName: name,
hash,
threadId,
prompt,
maxRounds,
depth,
});
}
@@ -194,7 +202,7 @@ export type ForkPlan = {
hash: string;
sourceThreadId: string;
prompt: string;
runOptions: { maxRounds: number };
runOptions: { maxRounds: number; depth: number };
historicalSteps: ForkHistoricalStep[];
};
@@ -219,7 +227,7 @@ export function buildForkPlan(
hash: start.hash,
sourceThreadId: start.threadId,
prompt: start.prompt,
runOptions: { maxRounds: start.maxRounds },
runOptions: { maxRounds: start.maxRounds, depth: start.depth },
historicalSteps: selected.value,
});
}
+131
View File
@@ -0,0 +1,131 @@
import { readdir, readFile } from "node:fs/promises";
import { join } from "node:path";
import { type CasStore, createCasStore } from "./cas.js";
import { parseThreadDataJsonl } from "./fork-thread.js";
import { err, ok, type Result } from "./result.js";
import { getGlobalCasDir } from "./storage-root.js";
export type GcResult = {
scannedThreads: number;
activeRefs: number;
deletedEntries: number;
deletedHashes: string[];
};
async function listThreadDataJsonlPaths(storageRoot: string): Promise<Result<string[], string>> {
const logsRoot = join(storageRoot, "logs");
const paths: string[] = [];
let hashes: string[];
try {
hashes = await readdir(logsRoot);
} catch (e) {
const errObj = e as NodeJS.ErrnoException;
if (errObj.code === "ENOENT") {
return ok([]);
}
return err(`failed to read logs directory: ${String(e)}`);
}
for (const hash of hashes) {
const dir = join(logsRoot, hash);
let entries: string[];
try {
entries = await readdir(dir);
} catch {
continue;
}
for (const fileName of entries) {
if (fileName.endsWith(".data.jsonl")) {
paths.push(join(dir, fileName));
}
}
}
paths.sort();
return ok(paths);
}
async function collectActiveRefsFromDataPaths(
dataPaths: string[],
): Promise<Result<Set<string>, string>> {
const activeRefs = new Set<string>();
for (const dataPath of dataPaths) {
let text: string;
try {
text = await readFile(dataPath, "utf8");
} catch (e) {
return err(`failed to read ${dataPath}: ${String(e)}`);
}
const parsed = parseThreadDataJsonl(text);
if (!parsed.ok) {
return err(`${dataPath}: ${parsed.error}`);
}
for (const step of parsed.value.roleSteps) {
for (const ref of step.refs) {
activeRefs.add(ref);
}
}
}
return ok(activeRefs);
}
async function deleteCasNotInSet(
cas: CasStore,
activeRefs: Set<string>,
): Promise<Result<string[], string>> {
let listed: string[];
try {
listed = await cas.list();
} catch (e) {
return err(`failed to list cas entries: ${String(e)}`);
}
const deletedHashes: string[] = [];
for (const hash of listed) {
if (activeRefs.has(hash)) {
continue;
}
try {
await cas.delete(hash);
} catch (e) {
return err(`failed to delete cas ${hash}: ${String(e)}`);
}
deletedHashes.push(hash);
}
deletedHashes.sort();
return ok(deletedHashes);
}
/**
* Mark-and-sweep CAS GC: collect `refs` from all thread `.data.jsonl` files under `storageRoot`,
* then delete CAS blobs not referenced by any surviving thread data.
*/
export async function garbageCollectCas(storageRoot: string): Promise<Result<GcResult, string>> {
const pathsResult = await listThreadDataJsonlPaths(storageRoot);
if (!pathsResult.ok) {
return pathsResult;
}
const paths = pathsResult.value;
const refsResult = await collectActiveRefsFromDataPaths(paths);
if (!refsResult.ok) {
return refsResult;
}
const activeRefs = refsResult.value;
const cas = createCasStore(getGlobalCasDir(storageRoot));
const deletedResult = await deleteCasNotInSet(cas, activeRefs);
if (!deletedResult.ok) {
return deletedResult;
}
const deletedHashes = deletedResult.value;
return ok({
scannedThreads: paths.length,
activeRefs: activeRefs.size,
deletedEntries: deletedHashes.length,
deletedHashes,
});
}
+18 -2
View File
@@ -7,7 +7,7 @@ export {
} from "./base32.js";
export { buildDescriptor } from "./build-descriptor.js";
export { validateWorkflowBundle, type WorkflowBundleValidationInput } from "./bundle-validator.js";
export { type CasStore, createThreadCas } from "./cas.js";
export { type CasStore, createCasStore, createThreadCas } from "./cas.js";
export { createWorkflow } from "./create-workflow.js";
export {
type ExecuteThreadIo,
@@ -25,6 +25,7 @@ export {
parseThreadDataJsonl,
selectForkHistoricalSteps,
} from "./fork-thread.js";
export { type GcResult, garbageCollectCas } from "./gc.js";
export { stringifyWorkflowDescriptor } from "./generate-descriptor.js";
export { hashString, hashWorkflowBundleBytes } from "./hash.js";
export {
@@ -39,6 +40,19 @@ export {
type LogFn,
type LoggerSink,
} from "./logger.js";
export {
createContentMerkleNode,
getContentMerklePayload,
type MerkleNode,
type MerkleNodeType,
parseMerkleNode,
putContentMerkleNode,
putStepMerkleNode,
putThreadMerkleNode,
type StepMerklePayload,
serializeMerkleNode,
type ThreadMerklePayload,
} from "./merkle.js";
export {
getRegisteredWorkflow,
listRegisteredWorkflowNames,
@@ -55,7 +69,7 @@ export {
writeWorkflowRegistry,
} from "./registry.js";
export { err, ok, type Result } from "./result.js";
export { getDefaultWorkflowStorageRoot } from "./storage-root.js";
export { getDefaultWorkflowStorageRoot, getGlobalCasDir } from "./storage-root.js";
export { createThreadPauseGate, type ThreadPauseGate } from "./thread-pause-gate.js";
export {
type AgentBinding,
@@ -74,6 +88,7 @@ export {
type StartStep,
type ThreadContext,
type ThreadInput,
type WorkflowCompletion,
type WorkflowDefinition,
type WorkflowFn,
type WorkflowFnOptions,
@@ -81,6 +96,7 @@ export {
} from "./types.js";
export { generateUlid } from "./ulid.js";
export { getWorkerHostScriptPath } from "./worker-entry-path.js";
export { type WorkflowAsAgentOptions, workflowAsAgent } from "./workflow-as-agent.js";
export {
validateWorkflowDescriptor,
type WorkflowDescriptor,
+122
View File
@@ -0,0 +1,122 @@
import { parse, stringify } from "yaml";
import type { CasStore } from "./cas.js";
export type MerkleNodeType = "content" | "step" | "thread";
export type MerkleNode = {
type: MerkleNodeType;
payload: string | Record<string, unknown>;
children: string[];
};
export function serializeMerkleNode(node: MerkleNode): string {
return stringify(
{ type: node.type, payload: node.payload, children: node.children },
{ indent: 2 },
);
}
export function parseMerkleNode(yamlText: string): MerkleNode {
const raw = parse(yamlText) as unknown;
if (raw === null || typeof raw !== "object") {
throw new Error("merkle: YAML root must be an object");
}
const rec = raw as Record<string, unknown>;
const type = rec.type;
const payload = rec.payload;
const children = rec.children;
if (type !== "content" && type !== "step" && type !== "thread") {
throw new Error("merkle: invalid or missing type");
}
if (typeof payload !== "string" && (payload === null || typeof payload !== "object")) {
throw new Error("merkle: payload must be a string or object");
}
if (!Array.isArray(children)) {
throw new Error("merkle: children must be an array");
}
const childHashes: string[] = [];
for (const c of children) {
if (typeof c !== "string") {
throw new Error("merkle: child hash must be a string");
}
childHashes.push(c);
}
return {
type,
payload: typeof payload === "string" ? payload : (payload as Record<string, unknown>),
children: childHashes,
};
}
export function createContentMerkleNode(payload: string): MerkleNode {
return { type: "content", payload, children: [] };
}
export type StepMerklePayload = {
role: string;
meta: Record<string, unknown>;
};
export type ThreadMerklePayload = {
workflow: string;
threadId: string;
result: {
returnCode: number;
summary: string;
};
};
/** Serializes a step Merkle node (role + meta + content child) and stores it in CAS. */
export async function putStepMerkleNode(
store: CasStore,
payload: StepMerklePayload,
contentHash: string,
): Promise<string> {
const node: MerkleNode = {
type: "step",
payload: { role: payload.role, meta: payload.meta },
children: [contentHash],
};
return store.put(serializeMerkleNode(node));
}
/** Serializes the thread root Merkle node and stores it in CAS. */
export async function putThreadMerkleNode(
store: CasStore,
payload: ThreadMerklePayload,
stepHashes: readonly string[],
): Promise<string> {
const node: MerkleNode = {
type: "thread",
payload: {
workflow: payload.workflow,
threadId: payload.threadId,
result: payload.result,
},
children: [...stepHashes],
};
return store.put(serializeMerkleNode(node));
}
/** Serializes a content Merkle node and stores it in CAS; returns its hash. */
export async function putContentMerkleNode(store: CasStore, content: string): Promise<string> {
const yamlText = serializeMerkleNode(createContentMerkleNode(content));
return store.put(yamlText);
}
/** Loads a CAS blob and returns the payload string for a `content` Merkle node. */
export async function getContentMerklePayload(
store: CasStore,
hash: string,
): Promise<string | null> {
const yamlText = await store.get(hash);
if (yamlText === null) {
return null;
}
const node = parseMerkleNode(yamlText);
if (node.type !== "content" || typeof node.payload !== "string") {
return null;
}
return node.payload;
}
+22
View File
@@ -0,0 +1,22 @@
/** Append `contentHash` to `refs` when not already present (dedupe by first occurrence order). */
export function mergeRefsWithContentHash(refs: string[], contentHash: string): string[] {
const out = [...refs];
if (!out.includes(contentHash)) {
out.push(contentHash);
}
return out;
}
/** Normalize `refs` from persisted JSONL or IPC payloads (missing or invalid → []). */
export function normalizeRefsField(value: unknown): string[] {
if (!Array.isArray(value)) {
return [];
}
const out: string[] = [];
for (const x of value) {
if (typeof x === "string") {
out.push(x);
}
}
return out;
}
+6
View File
@@ -5,3 +5,9 @@ import { join } from "node:path";
export function getDefaultWorkflowStorageRoot(): string {
return join(homedir(), ".uncaged", "workflow");
}
/** Global content-addressed store directory under the workflow storage root (`<root>/cas`). */
export function getGlobalCasDir(storageRoot: string | undefined): string {
const root = storageRoot ?? getDefaultWorkflowStorageRoot();
return join(root, "cas");
}
+30 -5
View File
@@ -1,5 +1,7 @@
import type * as z from "zod/v4";
import type { CasStore } from "./cas.js";
/** Sentinel values for automaton control flow. */
export const START = "__start__" as const;
export const END = "__end__" as const;
@@ -17,16 +19,24 @@ export type LlmProvider = {
/** What each generator yield produces — one role's output (engine adds `timestamp` when persisting). */
export type RoleOutput = {
role: string;
content: string;
/** CAS hash of the serialized Merkle content node for this step's body text. */
contentHash: string;
meta: Record<string, unknown>;
/** CAS hashes produced or consumed by this step (for GC traceability). */
refs: string[];
};
/** What the workflow AsyncGenerator returns when done. */
export type WorkflowResult = {
/** Generator completion value from a workflow bundle (`run` export). Root hash is added by the engine. */
export type WorkflowCompletion = {
returnCode: number;
summary: string;
};
/** Final thread outcome from {@link executeThread}, including Merkle thread root CAS hash. */
export type WorkflowResult = WorkflowCompletion & {
rootHash: string;
};
/** Input to a workflow — prompt plus optional historical steps for fork/resume. */
export type ThreadInput = {
prompt: string;
@@ -37,13 +47,17 @@ export type ThreadInput = {
export type WorkflowFnOptions = {
threadId: string;
maxRounds: number;
/** Nesting depth for workflow-as-agent chains; root threads use `0`. */
depth: number;
/** Global CAS store for Merkle content blobs (role step bodies). */
cas: CasStore;
};
/** Bundle contract — named export `run` is a function returning an AsyncGenerator. */
export type WorkflowFn = (
input: ThreadInput,
options: WorkflowFnOptions,
) => AsyncGenerator<RoleOutput, WorkflowResult>;
) => AsyncGenerator<RoleOutput, WorkflowCompletion>;
/** Engine start frame: initial prompt + thread identity. */
export type StartStep = {
@@ -55,12 +69,20 @@ export type StartStep = {
/** A completed role step in the thread. */
export type RoleStep<M extends RoleMeta> = {
[K in keyof M & string]: { role: K; meta: M[K]; content: string; timestamp: number };
[K in keyof M & string]: {
role: K;
meta: M[K];
contentHash: string;
refs: string[];
timestamp: number;
};
}[keyof M & string];
/** Phase 1: Moderator decides next role. */
export type ModeratorContext<M extends RoleMeta = RoleMeta> = {
threadId: string;
/** Same as `WorkflowFnOptions.depth` for the active thread. */
depth: number;
start: StartStep;
steps: RoleStep<M>[];
};
@@ -71,6 +93,7 @@ export type AgentContext<M extends RoleMeta = RoleMeta> = ModeratorContext<M> &
name: string;
systemPrompt: string;
};
cas: CasStore;
};
/** Phase 3: Extractor runs — has agent output; the extraction instruction is a separate argument to the extract function. */
@@ -96,6 +119,8 @@ export type RoleDefinition<Meta extends Record<string, unknown>> = {
systemPrompt: string;
extractPrompt: string;
schema: z.ZodType<Meta>;
/** When non-null, produces CAS hashes to persist on this role's steps (see `RoleOutput.refs`). */
extractRefs: ((meta: Meta) => string[]) | null;
};
/**
+25 -8
View File
@@ -1,11 +1,15 @@
import { mkdir, unlink, writeFile } from "node:fs/promises";
import { createServer, type Socket } from "node:net";
import { dirname, join } from "node:path";
import { pathToFileURL } from "node:url";
import { importWorkflowBundleModule } from "./bundle-import-env.js";
import { createCasStore } from "./cas.js";
import type { PrefilledDiskStep } from "./engine.js";
import { type ExecuteThreadIo, executeThread } from "./engine.js";
import { ensureUncagedWorkflowSymlink } from "./ensure-uncaged-workflow-symlink.js";
import { createLogger } from "./logger.js";
import { normalizeRefsField } from "./refs-field.js";
import { err, ok, type Result } from "./result.js";
import { getGlobalCasDir } from "./storage-root.js";
import { createThreadPauseGate, type ThreadPauseGate } from "./thread-pause-gate.js";
import type { RoleOutput, WorkflowFn } from "./types.js";
@@ -16,7 +20,7 @@ type RunCommand = {
threadId: string;
workflowName: string;
prompt: string;
options: { maxRounds: number };
options: { maxRounds: number; depth: number };
steps: RoleOutput[];
/** Timestamps aligned with `steps` for `.data.jsonl` replay; length must match `steps` when non-null. */
stepTimestamps: number[] | null;
@@ -47,15 +51,20 @@ type ThreadHandle = {
function parseRoleOutputRecord(obj: Record<string, unknown>): RoleOutput | null {
const role = obj.role;
const content = obj.content;
const contentHash = obj.contentHash;
const meta = obj.meta;
if (typeof role !== "string" || typeof content !== "string") {
if (typeof role !== "string" || typeof contentHash !== "string") {
return null;
}
if (meta === null || typeof meta !== "object") {
return null;
}
return { role, content, meta: meta as Record<string, unknown> };
return {
role,
contentHash,
meta: meta as Record<string, unknown>,
refs: normalizeRefsField(obj.refs),
};
}
function parseRunStepsPayload(rec: Record<string, unknown>): {
@@ -118,6 +127,9 @@ function parseRunControlPayload(rec: Record<string, unknown>): RunCommand | null
if (typeof maxRounds !== "number") {
return null;
}
const depthRaw = optRec.depth;
const depth =
typeof depthRaw === "number" && Number.isFinite(depthRaw) ? Math.trunc(depthRaw) : 0;
const parsedSteps = parseRunStepsPayload(rec);
if (parsedSteps === null) {
return null;
@@ -135,7 +147,7 @@ function parseRunControlPayload(rec: Record<string, unknown>): RunCommand | null
threadId,
workflowName,
prompt,
options: { maxRounds },
options: { maxRounds, depth },
steps: parsedSteps.steps,
stepTimestamps: parsedSteps.stepTimestamps,
forkSourceThreadId,
@@ -291,8 +303,9 @@ async function main(): Promise<void> {
return;
}
await ensureUncagedWorkflowSymlink(storageRoot);
// Dynamic import required: user bundle path resolved at runtime
const modUnknown: unknown = await import(pathToFileURL(bundlePath).href);
const modUnknown: unknown = await importWorkflowBundleModule(bundlePath);
const modRec = modUnknown as Record<string, unknown>;
const runExport = modRec.run;
if (!isWorkflowFnLike(runExport)) {
@@ -306,6 +319,8 @@ async function main(): Promise<void> {
let activeThreads = 0;
let shutdownTimer: ReturnType<typeof setTimeout> | null = null;
const cas = createCasStore(getGlobalCasDir(storageRoot));
const workerCtlPath = join(storageRoot, "workers", `${hash}.json`);
function cancelShutdownTimer(): void {
@@ -354,6 +369,7 @@ async function main(): Promise<void> {
hash,
dataJsonlPath,
infoJsonlPath,
cas,
};
const existing = threads.get(threadId);
@@ -380,8 +396,9 @@ async function main(): Promise<void> {
const ts = cmd.stepTimestamps?.[i];
return {
role: step.role,
content: step.content,
contentHash: step.contentHash,
meta: step.meta,
refs: normalizeRefsField(step.refs),
timestamp: typeof ts === "number" && ts > 0 ? ts : baseTs + i,
};
});
+101
View File
@@ -0,0 +1,101 @@
import { join } from "node:path";
import { createCasStore } from "./cas.js";
import { type ExecuteThreadIo, executeThread } from "./engine.js";
import { extractBundleExports } from "./extract-bundle-exports.js";
import { createLogger } from "./logger.js";
import { getRegisteredWorkflow, readWorkflowRegistry } from "./registry.js";
import { getDefaultWorkflowStorageRoot, getGlobalCasDir } from "./storage-root.js";
import type { AgentContext, AgentFn, ThreadInput } from "./types.js";
import { generateUlid } from "./ulid.js";
/** Maximum `WorkflowFnOptions.depth` allowed for a child spawned via `workflowAsAgent`. */
const WORKFLOW_AS_AGENT_MAX_DEPTH = 3;
export type WorkflowAsAgentOptions = {
/** When `null`, uses `getDefaultWorkflowStorageRoot()`. */
storageRoot: string | null;
};
function resolveWorkflowAsAgentStorageRoot(options: WorkflowAsAgentOptions | null): string {
if (options !== null && options.storageRoot !== null) {
return options.storageRoot;
}
return getDefaultWorkflowStorageRoot();
}
/**
* Returns an {@link AgentFn} that runs another registered workflow in a new thread,
* using the parent thread's initial prompt (`ctx.start.content`) as the child {@link ThreadInput.prompt}.
*/
export function workflowAsAgent(
workflowName: string,
options: WorkflowAsAgentOptions | null = null,
): AgentFn {
return async (ctx: AgentContext): Promise<string> => {
const nextDepth = ctx.depth + 1;
if (nextDepth > WORKFLOW_AS_AGENT_MAX_DEPTH) {
return `ERROR: workflow-as-agent depth limit exceeded (max ${WORKFLOW_AS_AGENT_MAX_DEPTH})`;
}
const storageRoot = resolveWorkflowAsAgentStorageRoot(options);
const registryResult = await readWorkflowRegistry(storageRoot);
if (!registryResult.ok) {
return `ERROR: failed to read workflow registry: ${registryResult.error.message}`;
}
const entry = getRegisteredWorkflow(registryResult.value, workflowName);
if (entry === null) {
return `ERROR: workflow "${workflowName}" not found in registry`;
}
const bundlePath = join(storageRoot, "bundles", `${entry.hash}.esm.js`);
const bundleExportsResult = await extractBundleExports(bundlePath, { storageRoot });
if (!bundleExportsResult.ok) {
return `ERROR: ${bundleExportsResult.error}`;
}
const input: ThreadInput = {
prompt: ctx.start.content,
steps: [],
};
const childThreadId = generateUlid(Date.now());
const dataJsonlPath = join(storageRoot, "logs", entry.hash, `${childThreadId}.data.jsonl`);
const infoJsonlPath = join(storageRoot, "logs", entry.hash, `${childThreadId}.info.jsonl`);
const io: ExecuteThreadIo = {
threadId: childThreadId,
hash: entry.hash,
dataJsonlPath,
infoJsonlPath,
cas: createCasStore(getGlobalCasDir(storageRoot)),
};
const logger = createLogger({ sink: { kind: "file", path: infoJsonlPath } });
const signalNever = new AbortController();
try {
const result = await executeThread(
bundleExportsResult.value.run,
workflowName,
input,
{
maxRounds: ctx.start.meta.maxRounds,
depth: nextDepth,
signal: signalNever.signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: ctx.threadId,
prefilledDiskSteps: null,
},
io,
logger,
);
return result.rootHash;
} catch (e) {
const message = e instanceof Error ? e.message : String(e);
return `ERROR: ${message}`;
}
};
}