Compare commits

..

11 Commits

Author SHA1 Message Date
xiaoju 81c582ae0e feat: Phase 2 — engine write path (CAS nodes + threads.json)
- Engine writes StartNode, StateNode, ContentMerkleNode as CAS blobs
- threads.json tracks active threads, completed → history/{date}.jsonl
- No more .data.jsonl writes
- ancestors skip-list: [parent, ...parentAncestors] capped at 11
- Tests: 4 pass (engine write path)

Refs #155, closes #157

小橘 <xiaoju@shazhou.work>
2026-05-09 07:53:44 +00:00
xiaoju 6f000512d2 feat: Phase 1 — CAS thread storage types + helpers
- Add StartNode, StateNode, ContentMerkleNode types to workflow-protocol
- Add collectRefs() to workflow-cas — extracts CAS hashes from StateNode payload
- Add findReachableHashes() to workflow-cas — recursive mark traversal via refs[]
- Tests: 7 pass (collect-refs + reachable)

Refs #155, closes #156

小橘 <xiaoju@shazhou.work>
2026-05-09 07:30:47 +00:00
xiaoju 8f78a00063 docs: RFC v3 — named payload fields, refs as GC index, merge parent+ancestors
- payload is source of truth with named fields (start, content, ancestors, compact)
- refs[] auto-derived by collectRefs(), pure GC index
- parent merged into ancestors[0]

小橘 <xiaoju@shazhou.work>
2026-05-09 07:12:29 +00:00
xiaoju 6c2a137aef docs: update CAS thread storage RFC
- StartNode prompt via refs[0] instead of inline
- threads.json active-only, completed → history/{date}.jsonl
- Content Merkle node carries role artifact refs
- Extract phase expanded to produce refs[]

小橘 <xiaoju@shazhou.work>
2026-05-09 07:08:10 +00:00
xiaoju 6cd856ca99 docs: add RFC for CAS-based thread storage
小橘 <xiaoju@shazhou.work>
2026-05-09 06:34:24 +00:00
xiaoju 064696c558 docs: update architecture docs and package READMEs for post-split structure
- Rewrite docs/architecture.md with 15-package map, dependency graph, updated engine paths
- Update CLAUDE.md monorepo structure section
- Add READMEs for: workflow-protocol, workflow-runtime, workflow-util, workflow-cas, workflow-register, workflow-execute, workflow-reactor
- Fix agent READMEs: update deps from @uncaged/workflow to actual packages
- Mark workflow-as-agent plan as outdated

Fixes #153

小橘 <xiaoju@shazhou.work>
2026-05-09 04:39:57 +00:00
xiaoju 0f28e9b61a refactor: remove non-index re-exports
Remove re-exports from non-index files to enforce folder module discipline:
- workflow-agent-hermes: remove buildAgentPrompt re-export
- workflow-agent-cursor: remove buildAgentPrompt re-export
- cli-workflow/cli-dispatch: remove CommandEntry/CommandGroup/DispatchFn/getCommandRegistry re-exports

小橘 <xiaoju@shazhou.work>
2026-05-09 04:19:49 +00:00
xiaoju 1ea56009a2 Merge pull request 'chore: rename dashboard folder' (#152) from chore/rename-dashboard-folder into main 2026-05-09 03:59:32 +00:00
xiaoju 6cc2481a16 chore: remove accidental pnpm-lock.yaml 2026-05-09 03:58:30 +00:00
xiaoju 44018bd17d chore: rename packages/dashboard → packages/workflow-dashboard
Align folder name with package name @uncaged/workflow-dashboard,
consistent with all other packages in the monorepo.
2026-05-09 03:57:49 +00:00
xingyue 28c35bb3e0 Merge pull request 'refactor: 七包拆分 — protocol / runtime / util / cas / reactor / register / execute' (#151) from refactor/143-split-packages into main 2026-05-09 03:53:54 +00:00
56 changed files with 1815 additions and 321 deletions
+19 -6
View File
@@ -2,7 +2,7 @@
## Project Overview
**@uncaged/workflow** is a workflow engine that executes single-file ESM bundles. Each workflow is a self-contained `.esm.js` file with an XXH64 hash as its version identifier.
This monorepo implements a workflow engine that executes single-file ESM bundles. Each workflow is a self-contained `.esm.js` file with an XXH64 hash as its version identifier. Shared types live in `@uncaged/workflow-protocol`; bundle authors typically depend on `@uncaged/workflow-runtime`.
### Key Terms
@@ -19,14 +19,27 @@
```
workflow/
packages/
workflow/ # @uncaged/workflow — core lib (types, hash, ULID, JSONL, registry)
cli-workflow/ # @uncaged/cli-workflow — CLI (uncaged-workflow command)
workflow-protocol/ # @uncaged/workflow-protocol — shared types + Result
workflow-runtime/ # @uncaged/workflow-runtime — createWorkflow, type re-exports
workflow-util/ # @uncaged/workflow-util — Base32, ULID, logger, storage paths, refs helpers
workflow-reactor/ # @uncaged/workflow-reactor — LLM fn + thread reactor (tool calls)
workflow-cas/ # @uncaged/workflow-cas — CAS store, hash, Merkle
workflow-register/ # @uncaged/workflow-register — bundle validation, registry YAML, model resolution
workflow-execute/ # @uncaged/workflow-execute — engine, extract, fork, GC, workflowAsAgent
cli-workflow/ # @uncaged/cli-workflow — uncaged-workflow CLI
workflow-agent-cursor/ # @uncaged/workflow-agent-cursor
workflow-agent-hermes/ # @uncaged/workflow-agent-hermes
workflow-agent-llm/ # @uncaged/workflow-agent-llm
workflow-util-agent/ # @uncaged/workflow-util-agent — buildAgentPrompt, spawnCli
workflow-template-develop/ # @uncaged/workflow-template-develop
workflow-template-solve-issue/ # @uncaged/workflow-template-solve-issue
workflow-dashboard/ # @uncaged/workflow-dashboard — React dashboard (private app)
docs/ # RFCs, conventions
biome.json # root Biome config
tsconfig.json # root TypeScript config
```
- `workflow` is the core; `cli-workflow` depends on it
- Execution stack layers: `workflow-protocol` → (`workflow-runtime`, `workflow-util`, `workflow-reactor`) → (`workflow-cas`, `workflow-register`) → `workflow-execute``cli-workflow`
- Packages use `workspace:*` protocol
## Language & Paradigm
@@ -167,10 +180,10 @@ type Result<T, E = Error> = { ok: true; value: T } | { ok: false; error: E };
Never use `console.log/warn/error` directly — Biome's `noConsole` rule enforces this.
All logging goes through the structured logger from `@uncaged/workflow`:
All logging goes through the structured logger from `@uncaged/workflow-util`:
```typescript
import { createLogger } from "@uncaged/workflow";
import { createLogger } from "@uncaged/workflow-util";
const log = createLogger();
+143 -130
View File
@@ -1,6 +1,6 @@
# @uncaged/workflow — Architecture
# Uncaged workflow — Architecture
**Last updated:** 2026-05-06 by 小橘 🍊(NEKO Team)
**Last updated:** 2026-05-09
---
@@ -8,72 +8,106 @@
A workflow engine that executes single-file ESM bundles. Each workflow is a self-contained `.esm.js` file identified by its XXH64 hash (Crockford Base32). No daemon — processes start on demand and exit when done.
## Package Structure
The implementation lives in **15** Bun workspace packages under `packages/`, using the `workspace:*` protocol.
| Package | npm Name | Purpose |
|---------|----------|---------|
| `workflow` | `@uncaged/workflow` | Core: types, engine, ExtractFn, hash/ULID/registry |
| `cli-workflow` | `@uncaged/cli-workflow` | CLI: `uncaged-workflow` command |
| `workflow-agent-cursor` | `@uncaged/workflow-agent-cursor` | Cursor CLI agent (extracts workspace from ctx) |
| `workflow-agent-hermes` | `@uncaged/workflow-agent-hermes` | Hermes CLI agent |
| `workflow-agent-llm` | `@uncaged/workflow-agent-llm` | OpenAI-compatible LLM agent |
| `workflow-template-develop` | `@uncaged/workflow-template-develop` | Develop workflow template (roles in `src/roles/`) |
| `workflow-template-solve-issue` | `@uncaged/workflow-template-solve-issue` | Solve-issue workflow template (roles in `src/roles/`) |
| `workflow-util-agent` | `@uncaged/workflow-util-agent` | `buildAgentPrompt` + `spawnCli` utilities |
## Package map
Monorepo with **bun workspace**, `workspace:*` protocol.
Grouped by responsibility (npm name → folder).
## Core Types
| Layer | Package | One-line role |
|-------|---------|----------------|
| Contract | `@uncaged/workflow-protocol``workflow-protocol` | Shared TypeScript types and `Result` helpers; peer `zod` only — no other workspace deps. |
| Author API | `@uncaged/workflow-runtime``workflow-runtime` | `createWorkflow` and re-exports of protocol workflow types for bundle authors. |
| Shared infra | `@uncaged/workflow-util``workflow-util` | Base32/ULID, logger, storage root paths, global CAS dir, ref-field helpers. |
| LLM plumbing | `@uncaged/workflow-reactor``workflow-reactor` | `createLlmFn`, `createThreadReactor`, and related tool-call types for threaded LLM invocation. |
| CAS | `@uncaged/workflow-cas``workflow-cas` | `CasStore` implementation, XXH64 hashing, Merkle helpers over CAS payloads. |
| Registry / bundles | `@uncaged/workflow-register``workflow-register` | Bundle validation & dynamic export extraction, `workflow.yaml` registry I/O, provider/model resolution. |
| Engine | `@uncaged/workflow-execute``workflow-execute` | Thread execution, worker entry path, fork/GC, extract pipeline, `workflowAsAgent`. |
| CLI | `@uncaged/cli-workflow``cli-workflow` | `uncaged-workflow` binary (depends on engine, registry, CAS, protocol, util, runtime). |
| Agent adapters | `@uncaged/workflow-agent-cursor``workflow-agent-cursor` | `AgentFn` via `cursor-agent` CLI + workspace extraction. |
| | `@uncaged/workflow-agent-hermes``workflow-agent-hermes` | `AgentFn` via `hermes chat` CLI. |
| | `@uncaged/workflow-agent-llm``workflow-agent-llm` | `AgentFn` via OpenAI-compatible HTTP (`LlmProvider` from runtime). |
| Agent shared | `@uncaged/workflow-util-agent``workflow-util-agent` | `buildAgentPrompt`, `spawnCli` for CLI-backed agents. |
| Templates | `@uncaged/workflow-template-develop``workflow-template-develop` | Develop workflow definition, roles, descriptor builder. |
| | `@uncaged/workflow-template-solve-issue``workflow-template-solve-issue` | Solve-issue workflow definition, roles, descriptor builder. |
| Dashboard | `@uncaged/workflow-dashboard``workflow-dashboard` | Private Vite + React app (`src/main.tsx`); only `react` / `react-dom` dependencies — no workspace packages. |
```typescript
// --- Sentinel values ---
const START = "__start__";
const END = "__end__";
## Dependency graph (workspace packages)
// --- RoleMeta: maps role names → their meta types ---
type RoleMeta = Record<string, Record<string, unknown>>;
Bottom-up layering for the execution stack:
// --- Role Definition: pure data, no execution logic ---
type RoleDefinition<Meta> = {
description: string; // human-readable
systemPrompt: string; // given to agent
extractPrompt: string; // given to extractor
schema: z.ZodType<Meta>; // meta shape (Zod v4)
};
// --- Workflow Definition: pure data, no agent binding ---
type WorkflowDefinition<M extends RoleMeta> = {
description: string;
roles: { [K in keyof M & string]: RoleDefinition<M[K]> };
moderator: Moderator<M>;
};
// --- Agent: raw string output, reads role info from context ---
type AgentFn = (ctx: AgentContext) => Promise<string>;
// --- Agent Binding: runtime assignment ---
type AgentBinding = {
agent: AgentFn;
overrides?: Partial<Record<string, AgentFn>>;
};
// --- Extract: structured data from context ---
type ExtractFn = <T>(schema: z.ZodType<T>, prompt: string, ctx: ExtractContext) => Promise<T>;
// --- Moderator: pure routing function ---
type Moderator<M extends RoleMeta> = (ctx: ModeratorContext<M>) => (keyof M & string) | typeof END;
// --- Composition ---
// createWorkflow(def, binding, extract) => WorkflowFn
```mermaid
flowchart BT
subgraph L0["Layer 0 — contract"]
protocol["@uncaged/workflow-protocol"]
end
subgraph L1["Layer 1 — on protocol"]
runtime["@uncaged/workflow-runtime"]
util["@uncaged/workflow-util"]
reactor["@uncaged/workflow-reactor"]
end
subgraph L2["Layer 2 — protocol + util"]
cas["@uncaged/workflow-cas"]
register["@uncaged/workflow-register"]
end
subgraph L3["Layer 3 — engine"]
execute["@uncaged/workflow-execute"]
end
subgraph L4["Layer 4 — CLI"]
cli["@uncaged/cli-workflow"]
end
runtime --> protocol
util --> protocol
reactor --> protocol
cas --> protocol
cas --> util
register --> protocol
register --> util
execute --> protocol
execute --> runtime
execute --> util
execute --> cas
execute --> reactor
execute --> register
cli --> protocol
cli --> util
cli --> cas
cli --> execute
cli --> register
cli --> runtime
```
## Three-Phase Engine Loop
**Adjacent consumers** (not in the main CLI stack):
Each role execution has three distinct phases with progressive context:
- `@uncaged/workflow-util-agent``@uncaged/workflow-runtime`
- `@uncaged/workflow-agent-llm``@uncaged/workflow-runtime`
- `@uncaged/workflow-agent-cursor``@uncaged/workflow-runtime`, `@uncaged/workflow-util-agent`, `zod`
- `@uncaged/workflow-agent-hermes``@uncaged/workflow-runtime`, `@uncaged/workflow-util-agent`
- `@uncaged/workflow-template-develop``@uncaged/workflow-register`, `@uncaged/workflow-runtime`, `zod`
- `@uncaged/workflow-template-solve-issue``@uncaged/workflow-register`, `@uncaged/workflow-runtime`, `zod` (dev-only workspace deps: `@uncaged/workflow-cas`, `@uncaged/workflow-execute` for tests/tooling per `package.json`)
## Package roles (detail)
- **`workflow-protocol`** — Pure types (`WorkflowFn`, contexts, `CasStore` interface, descriptor shapes), `START` / `END`, `ok` / `err`. Depends only on peer `zod` for schema-related types in signatures.
- **`workflow-runtime`** — Workflow author surface: `createWorkflow` from `src/create-workflow.js`, re-exports protocol types/constants used when authoring bundles.
- **`workflow-util`** — Cross-cutting utilities: Crockford Base32, ULID, `createLogger`, `getDefaultWorkflowStorageRoot`, `getGlobalCasDir`, ref normalization; re-exports `ok`/`err` from protocol.
- **`workflow-cas`** — Filesystem CAS (`createCasStore`), `hashString` / `hashWorkflowBundleBytes`, Merkle node serialization and helpers (`merkle.js`).
- **`workflow-register`** — Bundle pipeline (`validateWorkflowBundle`, `extractBundleExports`, descriptor builders), registry YAML read/write, `resolveModel` / `splitProviderModelRef`.
- **`workflow-execute`** — `executeThread`, supervisor/worker wiring (`engine/`), fork/GC/pause gate, `createExtract` + LLM extract helpers (`extract/`), `workflowAsAgent`. Imports `@uncaged/workflow-reactor` for LLM-backed extract/supervisor paths (`extract-fn.ts`, `supervisor.ts`).
- **`workflow-reactor`** — `createLlmFn`, `createThreadReactor`, and thread tool-invocation types — consumed by `workflow-execute`.
- **`cli-workflow`** — CLI commands and HTTP/dashboard-related wiring (`hono`, `yaml`); composes register + execute + CAS + util.
- **`workflow-agent-*`** — Replaceable `AgentFn` implementations (Cursor / Hermes CLIs, or HTTP LLM).
- **`workflow-util-agent`** — Shared prompt assembly and subprocess spawning for CLI agents.
- **`workflow-template-*`** — Concrete `WorkflowDefinition` graphs + Zod role schemas + descriptor builders for publishing bundles.
- **`workflow-dashboard`** — Standalone React UI; no published library entry matching `src/index.ts`.
## Three-phase engine loop
Each role round is implemented in `packages/workflow-runtime/src/create-workflow.ts` (`advanceOneRound`): moderator → agent → extractor, with progressive context types from `@uncaged/workflow-protocol`.
```
┌─→ Phase 1: MODERATOR
│ Context: ModeratorContext { threadId, start, steps }
│ Context: ModeratorContext { threadId, depth, start, steps }
│ Action: moderator(ctx) → role name | END
│ Phase 2: AGENT
@@ -82,90 +116,80 @@ Each role execution has three distinct phases with progressive context:
│ Phase 3: EXTRACTOR
│ Context: ExtractContext = AgentCtx + { agentContent }
│ Action: extract(schema, extractPrompt, ctx) → typed meta
│ Action: runtime.extract(schema, extractPrompt, ctx) → typed meta
│ Merge: RoleStep { role, content, meta, timestamp }
│ Merge: RoleStep { role, contentHash, meta, refs, timestamp }
│ Append to steps
└─────────────────────────────────────────────────────┘
```
### Context Types (progressive)
### Context types (progressive)
Defined in `packages/workflow-protocol/src/types.ts`:
```typescript
// Phase 1: Moderator sees accumulated state only
type ModeratorContext<M> = {
threadId: string;
start: StartStep;
steps: RoleStep<M>[];
};
// Phase 2: Agent knows its identity
type ModeratorContext<M> = ThreadContext<M>;
type AgentContext<M> = ModeratorContext<M> & {
currentRole: { name: string; systemPrompt: string };
};
// Phase 3: Extractor has agent output
type ExtractContext<M> = AgentContext<M> & {
agentContent: string;
};
// ThreadContext is an alias for AgentContext (backward compat)
type ThreadContext<M> = AgentContext<M>;
type ExtractContext<M> = AgentContext<M> & { agentContent: string };
```
### Key Properties
### Key properties
- **Moderator is synchronous and pure** — no I/O, no state mutation
- **Agent gets context, not instructions** — reads `ctx.currentRole.systemPrompt`
- **Extractor is a general tool** — not limited to post-agent extraction; agents can use it too (e.g. Cursor agent extracts workspace path before execution)
- **extractPrompt is a call parameter**, not context state — different callers use different prompts
- **Moderator is synchronous and pure** — no I/O, no state mutation inside `createWorkflow`’s moderator call path.
- **Agent receives `AgentContext`** — reads `ctx.currentRole.systemPrompt`; raw output becomes `agentContent` for extract.
- **Extractor is `WorkflowRuntime.extract`** — supplied by the engine from registry-resolved LLM config (`workflow-execute`); stores agent body in CAS and yields `contentHash` + `refs` on each step (`create-workflow.ts`).
- **`extractPrompt` is a call parameter** on `RoleDefinition`, not implicit context state.
## Agent Information Sources
## Agent information sources
An agent has exactly three information sources:
1. **Prior knowledge** — LLM training, agent memory, agent skills
2. **Thread context**`AgentContext` (start, steps, currentRole)
2. **Thread context**`AgentContext` (`start`, `steps`, `currentRole`)
3. **Derived information** — from 1 & 2 (e.g. tool calls, shell commands)
No hidden environment parameters. If an agent needs something (like a workspace path), it extracts it from context using `ExtractFn`.
No hidden environment parameters. If an agent needs something (like a workspace path), it obtains it via `ExtractFn` (e.g. Cursor agent).
## Bundle Contract
## Bundle contract
A workflow bundle is a single `.esm.js` file with two named exports:
A workflow bundle is a single `.esm.js` file with two named exports (see `WorkflowFn` / `WorkflowDescriptor` in `packages/workflow-protocol/src/types.ts`):
```typescript
// Named exports (no default export)
export const descriptor: WorkflowDescriptor;
export const run: WorkflowFn;
type WorkflowFn = (
input: { prompt: string; steps: RoleOutput[] },
options: { threadId: string; maxRounds: number },
) => AsyncGenerator<RoleOutput, WorkflowResult>;
thread: ThreadContext,
runtime: WorkflowRuntime,
) => AsyncGenerator<RoleOutput, WorkflowCompletion>;
```
`RoleOutput` carries `contentHash`, `meta`, and `refs` (agent text lives in CAS, addressed by hash).
### Constraints
- Single `.esm.js` file
- No dynamic `import()`
- All static imports must be Node built-in modules only
- XXH64 hash (Crockford Base32) = globally unique version ID
- No dynamic `import()` in bundles (loader exempt in engine)
- Portable bundle static imports are constrained by validation in `@uncaged/workflow-register` (`validateWorkflowBundle`)
- XXH64 hash (Crockford Base32) = version ID
### Why AsyncGenerator?
- Each `yield` → engine writes to `.data.jsonl`, checks abort/pause
- `return` → engine marks thread complete
- Fork = pass historical steps as `input.steps` to a new generator
- Zero injection — bundle doesn't import from the engine
- Each `yield` lets `workflow-execute` persist state, CAS rows, and enforce pause/abort
- `return` supplies `WorkflowCompletion`
- Fork replays historical steps into a new thread context
- Bundle does not import the engine — only protocol/runtime types at build time
## Storage Layout
## Storage layout
```
~/.uncaged/workflow/
├── cas/ # Global content-addressed blobs (see getGlobalCasDir)
├── bundles/
│ ├── C9NMV6V2TQT81.esm.js # Crockford Base32 of XXH64
│ └── C9NMV6V2TQT81.yaml # Role descriptor
│ ├── C9NMV6V2TQT81.esm.js # Crockford Base32 of XXH64
│ └── C9NMV6V2TQT81.yaml # Role descriptor sidecar (when present)
├── logs/ # One folder per bundle hash
│ └── C9NMV6V2TQT81/
│ ├── 01KQXKW…YG.data.jsonl # Thread state
@@ -173,7 +197,7 @@ type WorkflowFn = (
└── workflow.yaml # Registry
```
### ID Encoding: Crockford Base32
### ID encoding: Crockford Base32
- Case-insensitive, filesystem-safe, no ambiguous chars (0/O, 1/I/L)
- Bundle hash: XXH64 → 13-char
@@ -181,45 +205,36 @@ type WorkflowFn = (
### Registry (`workflow.yaml`)
```yaml
workflows:
solve-issue:
hash: "C9NMV6V2TQT81"
timestamp: 1714963200000
history:
- hash: "A7BKR3M1NPQ40"
timestamp: 1714876800000
```
Managed by `@uncaged/workflow-register` (`readWorkflowRegistry`, `writeWorkflowRegistry`, …). Shape includes workflow entries and a top-level `config` section used for extract/supervisor model resolution.
### Thread JSONL
**`.data.jsonl`** — Line 1: start record, Line 2+: role outputs
**`.data.jsonl`** — Line 1: start record; following lines: role steps with CAS-backed content.
```jsonc
// Start record
{ "name": "solve-issue", "hash": "C9NMV6V2TQT81", "threadId": "01KQXKW…",
"parameters": { "prompt": "Fix bug #3", "options": { "maxRounds": 5 } },
"timestamp": 1714963200000 }
// Role output
{ "role": "planner", "content": "...", "meta": { "phases": [...] }, "timestamp": ... }
// Role output (engine persists contentHash + refs; body in ~/.uncaged/workflow/cas/)
{ "role": "planner", "contentHash": "", "meta": { "phases": [...] }, "refs": ["…"], "timestamp": ... }
```
**`.info.jsonl`** — Structured debug log
**`.info.jsonl`** — Structured debug log via `@uncaged/workflow-util` `createLogger`:
```jsonc
{ "tag": "4KNMR2PX", "content": "Loading bundle...", "timestamp": ... }
```
Tags are 8-char Crockford Base32 (40-bit random), one per call site. `grep "4KNMR2PX"` instant code location.
Tags are 8-char Crockford Base32 (40-bit random), one per call site. `grep "4KNMR2PX"` → code location.
## Execution Model
## Execution model
- **No daemon.** `uncaged-workflow run <name>` starts a worker process
- Same bundle's threads share one process (memory efficiency)
- Process exits when all threads complete
- Thread termination via IPC within the process
- **No daemon.** `uncaged-workflow run <name>` starts a worker process (`workflow-execute` worker entry via `getWorkerHostScriptPath`)
- Threads share bundle-scoped workers as implemented in CLI/engine
- Pause/resume/abort via engine IPC and pause gate (`createThreadPauseGate`)
## CLI Commands
## CLI commands
| Priority | Command | Description |
|----------|---------|-------------|
@@ -239,18 +254,16 @@ Tags are 8-char Crockford Base32 (40-bit random), one per call site. `grep "4KNM
| P2 | `resume <thread-id>` | Resume a paused thread |
| P3 | `fork <thread-id> [--from-role <role>]` | Fork from historical state |
All commands implemented and tested. ✅
## Design Decisions
## Design decisions
| Decision | Rationale |
|----------|-----------|
| **Role = pure data** | Decouples definition from execution; same role with different agents |
| **Agent bound at runtime** | WorkflowDefinition is reusable; agent choice is deployment concern |
| **Three-phase context** | Each phase sees only what it needs; clean separation |
| **ExtractFn as general tool** | Agents use it for pre-execution extraction; engine uses it for meta |
| **Single-file ESM** | Hash = version, no dependency hell, self-contained |
| **No daemon** | OS handles process lifecycle; unnecessary complexity |
| **Agent bound at runtime** | `WorkflowDefinition` is reusable; agent choice is deployment concern |
| **Three-phase context** | Each phase sees only what it needs; types live in `workflow-protocol` |
| **`WorkflowRuntime.extract` + CAS `contentHash`** | Large agent bodies deduplicated globally; Merkle roots summarize threads |
| **`workflow-reactor` split** | LLM tool-calling loop isolated from filesystem/registry concerns |
| **Single-file ESM** | Hash = version, self-contained bundle |
| **No daemon** | OS handles process lifecycle |
| **Crockford Base32** | Filesystem-safe, readable, compact |
| **No concurrency in registry** | Different workflows have different constraints; belongs at workflow/role level |
| **No dryRun** | Tests use mock agents + mock fetch; simpler architecture |
| **15-package split** | Clear boundaries: protocol ↔ runtime author API ↔ util/CAS/register ↔ execute ↔ CLI ↔ agents/templates/UI |
@@ -1,5 +1,7 @@
# Workflow-as-Agent Implementation Plan
> ⚠️ This plan references the pre-split package structure. File paths have changed.
> **For Hermes:** Use subagent-driven-development skill to implement this plan task-by-task.
**Goal:** Enable workflows to invoke other workflows as agents, backed by global CAS and refs tracking.
+262
View File
@@ -0,0 +1,262 @@
# RFC: CAS-Based Thread Storage
> Status: Draft
> Author: 小橘 🍊(NEKO Team)
> Date: 2026-05-09
## Summary
Replace `.data.jsonl` with a fully CAS-based thread state chain. Threads become linked lists of immutable CAS nodes, indexed by a per-bundle `threads.json`.
## Motivation
`.data.jsonl` is a flat append-only file with three different row formats (start, role step, end). This makes forking expensive (copy file), deduplication impossible (forked threads repeat shared history), and GC complex (must parse every row to find CAS refs).
Threads are inherently immutable append-only sequences — a natural fit for CAS hash chains, similar to git's commit DAG.
## Design
### Node Types
Two CAS node types, using the existing `{ type, payload, refs }` CAS blob structure:
#### StartNode
Contains workflow-level parameters. **No threadId** (because the same StartNode can be shared across forks). Prompt is stored as a CAS blob and referenced via `refs[0]`.
```
CAS blob:
{
type: "start",
payload: {
name: "solve-issue",
hash: "BUNDLE_HASH",
maxRounds: 10,
depth: 0
},
refs: [
<prompt_hash> // refs[0]: initial task prompt (CAS blob)
]
}
```
- No `role`, `content`, `meta` — this is not a step, it's workflow metadata
- Prompt is **not** inline — it lives in CAS and is referenced by hash
#### StateNode
One per role step (including `__end__`).
```
CAS blob:
{
type: "state",
payload: {
role: "coder",
meta: { ... },
start: "<start_hash>",
content: "<content_merkle_hash>",
ancestors: ["<parent_hash>", "<grandparent_hash>", ...],
compact: null,
timestamp: 1234567890
},
refs: [<start_hash>, <content_hash>, <parent_hash>, ...]
}
```
**Payload is the source of truth.** Application code reads named fields from payload. `refs[]` is a **GC index** — automatically derived from payload by collecting all CAS hashes. GC only scans `refs[]` without understanding payload structure.
**Payload fields:**
| Field | Type | Meaning |
|-------|------|---------|
| `role` | `string` | Role name, or `"__end__"` for completion |
| `meta` | `object` | Structured metadata extracted from agent output |
| `start` | `string` | StartNode hash |
| `content` | `string` | Content Merkle node hash (carries role artifact refs) |
| `ancestors` | `string[]` | `[parent, grandparent, ...]` — up to 11 entries (1 parent + 10 skip-list). Empty for first step after start. `ancestors[0]` is the direct parent. |
| `compact` | `string \| null` | CAS hash of a compacted summary of all nodes before this one. When present, LLM context assembly can use this instead of walking the full chain. |
| `timestamp` | `number` | Unix timestamp in ms |
### Content Merkle Node
The content at `refs[2]` of each StateNode is itself a CAS Merkle node. This is where **role artifact references** live:
```
CAS blob:
{
type: "content",
payload: "<role output text>",
refs: [
<artifact_hash_1>, // e.g. a commit, a file, a sub-result
<artifact_hash_2>,
...
]
}
```
The Extractor is responsible for producing both `meta` and `refs` from raw agent output:
```
Agent raw output
Extractor → { meta, contentPayload, refs[] }
CAS put content Merkle: { type: "content", payload: contentPayload, refs }
↓ contentHash
StateNode: { ..., refs: [start, parent, contentHash, ...ancestors] }
```
This keeps StateNode refs fixed and simple. All role-specific artifact references are encapsulated in the content Merkle node. GC follows: `thread head → StateNode.refs → content Merkle.refs → artifacts`, full chain recursive.
### End Node
An end is just a StateNode with `role: "__end__"`:
```
{
type: "state",
payload: {
role: "__end__",
meta: { returnCode: 0, summary: "completed successfully" },
start: "<start_hash>",
content: "<content_hash>",
ancestors: ["<parent_hash>", ...],
compact: null,
timestamp: 1234567891
},
refs: [<start_hash>, <content_hash>, <parent_hash>, ...]
}
```
### Thread Index: `threads.json`
Per-bundle directory, one `threads.json` file. **Only active (in-progress) threads** live here:
```
~/.uncaged/workflow/bundles/<hash>/threads.json
```
```json
{
"01JTHREAD1AAAAAAAAAAAAAAA": {
"head": "<latest_state_node_hash>",
"start": "<start_node_hash>",
"updatedAt": 1234567891
}
}
```
When a thread completes (`__end__`), it is **removed from `threads.json`** and appended to a date-partitioned history file:
```
~/.uncaged/workflow/bundles/<hash>/history/{YYYY-MM-DD}.jsonl
```
Each line:
```json
{"threadId":"01JTHREAD1AAAAAAAAAAAAAAA","head":"<end_node_hash>","start":"<start_node_hash>","completedAt":1234567891}
```
Benefits:
- `threads.json` stays small — only in-flight threads
- Dashboard watches `threads.json` for real-time updates; completed threads don't trigger watches
- History is queryable by date but not actively monitored
- GC roots = all heads from `threads.json` + all heads from `history/*.jsonl`
### Ancestor Skip-List
Each StateNode carries up to 11 entries in `payload.ancestors` (1 parent + 10 skip-list, newest first):
```
Node 15: ancestors = [node14, node13, node12, node11, node10, node9, node8, node7, node6, node5, node4]
^parent ^--- skip-list (10 most recent) ---^
```
This enables:
- **Paginated fetch**: jump to any recent ancestor without walking the full chain
- **Partial replay**: fetch last N steps without loading the entire history
- The list is capped at 10 to keep node size bounded
### Fork
Forking a thread at step N:
1. Create new threadId
2. Create a new StateNode whose `parent` (refs[1]) points to the fork point's StateNode
3. Register the new threadId in `threads.json` with its own head
4. **Zero data duplication** — the forked thread shares all ancestor nodes via CAS
### Compact
When a StateNode has `payload.compact` set:
```json
{
"type": "state",
"payload": {
"role": "coder",
"meta": { ... },
"compact": "<cas_hash_of_summary>",
"timestamp": 1234
},
"refs": [...]
}
```
This means: "everything before this node has been summarized into the blob at `compact`". When building LLM context:
1. Walk back from head
2. If a node has `compact`, stop walking — use the compact summary + all nodes after it
3. If no compact found, use full chain
This enables long-running threads without unbounded context growth.
### GC
Simple mark-and-sweep:
1. **Roots**: all `head` and `start` hashes from `threads.json` + all `history/*.jsonl` files
2. **Mark**: from each root, recursively mark all reachable hashes via `refs[]` (including content Merkle → artifact refs)
3. **Sweep**: delete unmarked CAS blobs
No per-row format parsing needed. GC only needs to understand `refs[]`.
### refs[] Derivation
`refs[]` is auto-derived from payload at write time via a `collectRefs(payload)` function that extracts all CAS hash strings from named fields (`start`, `content`, `ancestors`, `compact`). Application code never reads `refs[]` — it reads named payload fields. This makes `refs[]` a pure GC optimization with zero semantic coupling.
### Extract Phase
The Extractor is expanded from the current design. Currently it only extracts `meta` from agent output. In the new design it extracts:
| Output | Purpose |
|--------|---------|
| `meta` | Structured metadata (same as before) |
| `contentPayload` | The text payload for the content Merkle node |
| `refs[]` | CAS hashes of artifacts produced by this role step |
The `refs[]` become the content Merkle node's refs, enabling GC to trace all role-produced artifacts.
## What Stays Unchanged
- `.info.jsonl` — debug logging stays as-is (high-frequency append, not suitable for CAS)
- CAS blob storage format (`~/.uncaged/workflow/cas/`)
- Bundle registry (`workflow.yaml`)
## Migration
Breaking change. Old `.data.jsonl` files become incompatible. No backward compat fallback (per project convention).
## Changes by Package
| Package | Changes |
|---------|---------|
| `workflow-protocol` | Replace `StartStep`, `RoleStep` types with `StartNode`, `StateNode`. Add `ContentMerkleNode` type. Expand `ExtractResult` to include `refs[]`. |
| `workflow-cas` | Add `findReachableHashes(roots)` for GC mark phase |
| `workflow-execute` | Rewrite engine to write CAS nodes + update `threads.json` instead of appending JSONL. Move completed threads to `history/`. Simplify `gc.ts`. Simplify `fork-thread.ts`. Expand extract phase to produce refs. |
| `workflow-runtime` | `ThreadContext` built by walking chain from head. `start.prompt` resolved from CAS via StartNode.refs[0]. |
| `cli-workflow` | `thread list/show/rm` read from `threads.json` + `history/`. SSE watches `threads.json`. |
| `workflow-dashboard` | Watch `threads.json` instead of `.data.jsonl` |
| Templates & Agents | Update extract definitions to produce `refs[]`. Update `ctx.start.content` → CAS resolved. |
@@ -9,9 +9,6 @@ import { createThreadDispatcher, dispatchLive, dispatchRun } from "./commands/th
import { createWorkflowDispatcher } from "./commands/workflow/index.js";
import { formatSkillIndex, formatSkillTopic, getSkillTopics } from "./skill.js";
export type { CommandEntry, CommandGroup, DispatchFn } from "./cli-command-types.js";
export { getCommandRegistry } from "./cli-registry.js";
function dispatchGroup(
tableName: string,
table: Record<string, CommandEntry>,
+3 -4
View File
@@ -7,10 +7,10 @@ The agent builds a full prompt (system + task + step history via `@uncaged/workf
## Install
```bash
bun add @uncaged/workflow-agent-cursor @uncaged/workflow @uncaged/workflow-util-agent zod
bun add @uncaged/workflow-agent-cursor @uncaged/workflow-runtime @uncaged/workflow-util-agent zod
```
In this monorepo: `"@uncaged/workflow-agent-cursor": "workspace:*"` plus `workspace:*` for `@uncaged/workflow` and `@uncaged/workflow-util-agent`.
In this monorepo: `"@uncaged/workflow-agent-cursor": "workspace:*"` plus `workspace:*` for `@uncaged/workflow-runtime` and `@uncaged/workflow-util-agent`, and `zod` ^4.
## Usage
@@ -28,9 +28,8 @@ const agent = createCursorAgent({
| Export | Description |
|--------|-------------|
| `createCursorAgent(config)` | Returns `AgentFn` that runs `cursor-agent` with `buildAgentPrompt(ctx)` |
| `createCursorAgent(config)` | Returns `AgentFn` that runs `cursor-agent` with `buildAgentPrompt(ctx)` from `@uncaged/workflow-util-agent` |
| `CursorAgentConfig` | `model`, `timeout`, `extract` (must supply workspace path) |
| `validateCursorAgentConfig` | Config validation result |
| `buildAgentPrompt` | Re-exported from `@uncaged/workflow-util-agent` |
Requires `cursor-agent` on `PATH` at runtime.
@@ -5,7 +5,6 @@ import * as z from "zod/v4";
import type { CursorAgentConfig } from "./types.js";
import { validateCursorAgentConfig } from "./validate-config.js";
export { buildAgentPrompt } from "@uncaged/workflow-util-agent";
export type { CursorAgentConfig } from "./types.js";
export { validateCursorAgentConfig } from "./validate-config.js";
+2 -3
View File
@@ -7,10 +7,10 @@ The agent composes the same thread-aware prompt as other CLI-backed agents via `
## Install
```bash
bun add @uncaged/workflow-agent-hermes @uncaged/workflow @uncaged/workflow-util-agent
bun add @uncaged/workflow-agent-hermes @uncaged/workflow-runtime @uncaged/workflow-util-agent
```
In this monorepo: use `workspace:*` for all three `@uncaged/*` packages.
In this monorepo: use `workspace:*` for `@uncaged/workflow-agent-hermes`, `@uncaged/workflow-runtime`, and `@uncaged/workflow-util-agent`.
## Usage
@@ -30,6 +30,5 @@ const agent = createHermesAgent({
| `createHermesAgent(config)` | Returns `AgentFn` wrapping `hermes chat -q ...` |
| `HermesAgentConfig` | `model`, `timeout` |
| `validateHermesAgentConfig` | Config validation result |
| `buildAgentPrompt` | Re-exported from `@uncaged/workflow-util-agent` |
Requires `hermes` on `PATH` at runtime.
@@ -6,7 +6,6 @@ import { validateHermesAgentConfig } from "./validate-config.js";
const HERMES_DEFAULT_MAX_TURNS = 90;
export { buildAgentPrompt } from "@uncaged/workflow-util-agent";
export type { HermesAgentConfig } from "./types.js";
export { validateHermesAgentConfig } from "./validate-config.js";
+3 -3
View File
@@ -1,16 +1,16 @@
# @uncaged/workflow-agent-llm
`AgentFn` adapter that calls an OpenAI-compatible `POST /chat/completions` endpoint using `@uncaged/workflow`’s `LlmProvider` (base URL, API key, model).
`AgentFn` adapter that calls an OpenAI-compatible `POST /chat/completions` endpoint using `LlmProvider` from `@uncaged/workflow-runtime`.
Single-turn: system text is the current role’s `systemPrompt`, user text is the thread’s initial prompt (`ctx.start.content`). Errors from HTTP, JSON, or empty choices are thrown as `Error` with a JSON payload string.
## Install
```bash
bun add @uncaged/workflow-agent-llm @uncaged/workflow
bun add @uncaged/workflow-agent-llm @uncaged/workflow-runtime zod
```
In this monorepo: `"@uncaged/workflow-agent-llm": "workspace:*"`, `"@uncaged/workflow": "workspace:*"`.
In this monorepo: `"@uncaged/workflow-agent-llm": "workspace:*"`, `"@uncaged/workflow-runtime": "workspace:*"` (and satisfy `zod` ^4 as required by `@uncaged/workflow-runtime`).
## Usage
+31
View File
@@ -0,0 +1,31 @@
# @uncaged/workflow-cas
Content-addressable storage implementation, bundle hashing, and Merkle helpers.
## What This Package Does
It implements `CasStore` from `@uncaged/workflow-protocol`, hashes workflow bundle bytes and strings with XXH64, and builds serializable Merkle nodes for thread/step/content payloads used when persisting execution artifacts.
## Key Exports
From `src/index.ts`:
- **CAS:** `createCasStore`
- **Hash:** `hashString`, `hashWorkflowBundleBytes`
- **Merkle:** `createContentMerkleNode`, `getContentMerklePayload`, `parseMerkleNode`, `putContentMerkleNode`, `putStepMerkleNode`, `putThreadMerkleNode`, `serializeMerkleNode`
- **Types:** `CasStore`, `MerkleNode`, `MerkleNodeType`, `StepMerklePayload`, `ThreadMerklePayload`
## Dependencies
- **Workspace:** `@uncaged/workflow-protocol` (`CasStore` contract), `@uncaged/workflow-util`
- **npm:** `xxhashjs`, `yaml`
## Usage
```typescript
import { createCasStore, hashWorkflowBundleBytes } from "@uncaged/workflow-cas";
import { getGlobalCasDir } from "@uncaged/workflow-util";
const store = createCasStore(getGlobalCasDir());
const hash = await hashWorkflowBundleBytes(esmJsBytes);
```
@@ -0,0 +1,65 @@
import { describe, expect, test } from "bun:test";
import type { StateNode } from "@uncaged/workflow-protocol";
import { collectRefs } from "../src/collect-refs.js";
function payload(
partial: Partial<StateNode["payload"]> & Pick<StateNode["payload"], "role">,
): StateNode["payload"] {
return {
role: partial.role,
meta: partial.meta ?? {},
start: partial.start ?? "STARTHASH000000000000001",
content: partial.content ?? "CONTENTHASH00000000000001",
ancestors: partial.ancestors ?? [],
compact: partial.compact ?? null,
timestamp: partial.timestamp ?? 0,
};
}
describe("collectRefs", () => {
test("collects start, content, ancestors, and compact hashes in order", () => {
const refs = collectRefs(
payload({
role: "coder",
start: "01START00000000000000001",
content: "01CONTENT0000000000000001",
ancestors: ["01PARENT0000000000000001", "01GRAND000000000000000001"],
compact: "01COMPACT0000000000000001",
}),
);
expect(refs).toEqual([
"01START00000000000000001",
"01CONTENT0000000000000001",
"01PARENT0000000000000001",
"01GRAND000000000000000001",
"01COMPACT0000000000000001",
]);
});
test("does not collect compact when compact is null", () => {
const refs = collectRefs(
payload({
role: "coder",
start: "S1",
content: "C1",
ancestors: ["A1"],
compact: null,
}),
);
expect(refs).toEqual(["S1", "C1", "A1"]);
});
test("returns only start and content when ancestors is empty", () => {
const refs = collectRefs(
payload({
role: "coder",
start: "S2",
content: "C2",
ancestors: [],
compact: null,
}),
);
expect(refs).toEqual(["S2", "C2"]);
});
});
@@ -0,0 +1,69 @@
import { describe, expect, test } from "bun:test";
import type { CasStore } from "@uncaged/workflow-protocol";
import { stringify } from "yaml";
import { findReachableHashes } from "../src/reachable.js";
function yamlBlob(refs: readonly string[]): string {
return stringify({ type: "node", payload: {}, refs: [...refs] }, { indent: 2 });
}
function memoryCas(entries: Record<string, string>): CasStore {
const map = { ...entries };
return {
async put(): Promise<string> {
throw new Error("memoryCas.put not used in tests");
},
async get(hash: string): Promise<string | null> {
return map[hash] ?? null;
},
async delete(): Promise<void> {},
async list(): Promise<string[]> {
return Object.keys(map);
},
};
}
describe("findReachableHashes", () => {
test("walks refs recursively from a single root", async () => {
const cas = memoryCas({
R1: yamlBlob(["R2"]),
R2: yamlBlob(["R3"]),
R3: yamlBlob([]),
});
const reachable = await findReachableHashes(["R1"], cas);
expect([...reachable].sort()).toEqual(["R1", "R2", "R3"]);
});
test("union of reachability from multiple roots", async () => {
const cas = memoryCas({
A: yamlBlob(["X"]),
B: yamlBlob(["Y"]),
X: yamlBlob([]),
Y: yamlBlob(["Z"]),
Z: yamlBlob([]),
});
const reachable = await findReachableHashes(["A", "B"], cas);
expect([...reachable].sort()).toEqual(["A", "B", "X", "Y", "Z"]);
});
test("handles cycles via visited set", async () => {
const cas = memoryCas({
C1: yamlBlob(["C2"]),
C2: yamlBlob(["C1"]),
});
const reachable = await findReachableHashes(["C1"], cas);
expect(reachable.size).toBe(2);
expect(reachable.has("C1")).toBe(true);
expect(reachable.has("C2")).toBe(true);
});
test("does not throw when a ref points to a missing blob", async () => {
const cas = memoryCas({
H1: yamlBlob(["MISSINGHASH0000000000001"]),
});
const reachable = await findReachableHashes(["H1"], cas);
expect(reachable.has("H1")).toBe(true);
expect(reachable.has("MISSINGHASH0000000000001")).toBe(false);
});
});
+3
View File
@@ -2,6 +2,9 @@
"name": "@uncaged/workflow-cas",
"version": "0.1.0",
"type": "module",
"scripts": {
"test": "bun test"
},
"exports": {
".": {
"types": "./dist/index.d.ts",
+4
View File
@@ -3,10 +3,14 @@ import { join } from "node:path";
import { hashString } from "./hash.js";
import { createContentMerkleNode, parseMerkleNode, serializeMerkleNode } from "./merkle.js";
import { isCasNodeYaml } from "./nodes.js";
import type { CasStore } from "./types.js";
/** Raw strings become content merkle YAML; already-valid merkle documents pass through. */
function normalizeCasPutContent(content: string): string {
if (isCasNodeYaml(content)) {
return content;
}
try {
parseMerkleNode(content);
return content;
+13
View File
@@ -0,0 +1,13 @@
import type { StateNode } from "@uncaged/workflow-protocol";
/** Collects CAS hashes from {@link StateNode} payload fields for GC `refs[]` derivation. */
export function collectRefs(payload: StateNode["payload"]): string[] {
const out: string[] = [payload.start, payload.content];
for (const h of payload.ancestors) {
out.push(h);
}
if (payload.compact !== null) {
out.push(payload.compact);
}
return out;
}
+9
View File
@@ -1,4 +1,5 @@
export { createCasStore } from "./cas.js";
export { collectRefs } from "./collect-refs.js";
export { hashString, hashWorkflowBundleBytes } from "./hash.js";
export {
createContentMerkleNode,
@@ -9,6 +10,14 @@ export {
putThreadMerkleNode,
serializeMerkleNode,
} from "./merkle.js";
export {
isCasNodeYaml,
putContentNodeWithRefs,
putStartNode,
putStateNode,
serializeCasNode,
} from "./nodes.js";
export { findReachableHashes } from "./reachable.js";
export type {
CasStore,
MerkleNode,
+13 -4
View File
@@ -82,7 +82,12 @@ export async function putContentMerkleNode(store: CasStore, content: string): Pr
return store.put(content);
}
/** Loads a CAS blob and returns the payload string for a `content` Merkle node. */
/**
* Loads a CAS blob and returns the payload string for a `content` node.
*
* Accepts both the legacy `{type:content, payload, children}` Merkle layout
* and the RFC v3 `{type:content, payload, refs}` content node layout.
*/
export async function getContentMerklePayload(
store: CasStore,
hash: string,
@@ -91,9 +96,13 @@ export async function getContentMerklePayload(
if (yamlText === null) {
return null;
}
const node = parseMerkleNode(yamlText);
if (node.type !== "content" || typeof node.payload !== "string") {
const raw = parse(yamlText) as unknown;
if (raw === null || typeof raw !== "object") {
return null;
}
return node.payload;
const rec = raw as Record<string, unknown>;
if (rec.type !== "content" || typeof rec.payload !== "string") {
return null;
}
return rec.payload;
}
+79
View File
@@ -0,0 +1,79 @@
import type { ContentMerkleNode, StartNode, StateNode } from "@uncaged/workflow-protocol";
import { parse, stringify } from "yaml";
import { collectRefs } from "./collect-refs.js";
import type { CasStore } from "./types.js";
/** YAML-serialize a CAS node carrying `{type, payload, refs}` (RFC v3 thread storage format). */
export function serializeCasNode(node: StartNode | StateNode | ContentMerkleNode): string {
return stringify({ type: node.type, payload: node.payload, refs: node.refs }, { indent: 2 });
}
/**
* Recognizes a YAML CAS blob with the `{type, payload, refs[]}` shape used by
* `start` / `state` / `content` thread nodes. Used by {@link createCasStore}
* to skip the legacy auto-wrap step when the caller already supplied a
* pre-serialized RFC v3 node.
*/
export function isCasNodeYaml(content: string): boolean {
let raw: unknown;
try {
raw = parse(content) as unknown;
} catch {
return false;
}
if (raw === null || typeof raw !== "object" || Array.isArray(raw)) {
return false;
}
const rec = raw as Record<string, unknown>;
if (typeof rec.type !== "string") {
return false;
}
if (!Array.isArray(rec.refs)) {
return false;
}
for (const r of rec.refs) {
if (typeof r !== "string") {
return false;
}
}
return true;
}
export async function putStartNode(
store: CasStore,
payload: StartNode["payload"],
promptHash: string,
): Promise<string> {
const node: StartNode = {
type: "start",
payload,
refs: [promptHash],
};
return store.put(serializeCasNode(node));
}
export async function putStateNode(
store: CasStore,
payload: StateNode["payload"],
): Promise<string> {
const node: StateNode = {
type: "state",
payload,
refs: collectRefs(payload),
};
return store.put(serializeCasNode(node));
}
export async function putContentNodeWithRefs(
store: CasStore,
payload: string,
refs: readonly string[],
): Promise<string> {
const node: ContentMerkleNode = {
type: "content",
payload,
refs: [...refs],
};
return store.put(serializeCasNode(node));
}
+55
View File
@@ -0,0 +1,55 @@
import { parse } from "yaml";
import type { CasStore } from "./types.js";
function refsFromBlob(content: string): string[] {
try {
const raw = parse(content) as unknown;
if (raw === null || typeof raw !== "object") {
return [];
}
const rec = raw as Record<string, unknown>;
const refs = rec.refs;
if (!Array.isArray(refs)) {
return [];
}
const out: string[] = [];
for (const r of refs) {
if (typeof r === "string") {
out.push(r);
}
}
return out;
} catch {
return [];
}
}
/** Recursively collects all CAS hashes reachable from `roots` via each blob's `refs[]`. */
export async function findReachableHashes(
roots: readonly string[],
cas: CasStore,
): Promise<ReadonlySet<string>> {
const visited = new Set<string>();
const stack = [...roots];
while (stack.length > 0) {
const hash = stack.pop();
if (hash === undefined) {
break;
}
if (visited.has(hash)) {
continue;
}
const blob = await cas.get(hash);
if (blob === null) {
continue;
}
visited.add(hash);
for (const ref of refsFromBlob(blob)) {
if (!visited.has(ref)) {
stack.push(ref);
}
}
}
return visited;
}
+1 -4
View File
@@ -5,8 +5,5 @@
"outDir": "dist"
},
"include": ["src"],
"references": [
{ "path": "../workflow-protocol" },
{ "path": "../workflow-util" }
]
"references": [{ "path": "../workflow-protocol" }, { "path": "../workflow-util" }]
}
+33
View File
@@ -0,0 +1,33 @@
# @uncaged/workflow-execute
Thread engine: execution, fork/GC, extract pipeline, supervisor/worker wiring, and workflow-as-agent.
## What This Package Does
It runs `WorkflowFn` generators against disk-backed threads, integrates CAS and registry-backed extract (`createExtract`), coordinates LLM tool usage via `@uncaged/workflow-reactor`, handles fork plans and garbage collection, and exposes `workflowAsAgent` for nesting workflows.
## Key Exports
From `src/index.ts`:
- **Engine:** `createWorkflow` (engine-local re-export), `executeThread`, `getWorkerHostScriptPath`
- **Fork / parse:** `buildForkPlan`, `parseThreadDataJsonl`, `selectForkHistoricalSteps`, `tryParseRoleStepRecord`, `tryParseWorkflowResultRecord`
- **GC / pause:** `garbageCollectCas`, `createThreadPauseGate`
- **Engine types:** `ExecuteThreadIo`, `ExecuteThreadOptions`, `ForkHistoricalStep`, `ForkPlan`, `GcResult`, `ParsedThreadStartRecord`, `PrefilledDiskStep`, `SupervisorDecision`, `ThreadPauseGate`
- **Extract:** `buildExtractUserContent`, `createExtract`, `extractFunctionToolFromZodSchema`, `llmErrorToCause`, `llmExtract`, types `ExtractFn`, `ExtractThreadContext`, `LlmError`, `LlmExtractArgs`
- **Agent composition:** `workflowAsAgent`, `WorkflowAsAgentOptions`
## Dependencies
- **Workspace:** `@uncaged/workflow-protocol`, `@uncaged/workflow-runtime`, `@uncaged/workflow-util`, `@uncaged/workflow-cas`, `@uncaged/workflow-reactor`, `@uncaged/workflow-register`
- **npm:** `yaml`
- **Peer:** `zod` ^4
`@uncaged/workflow-reactor` is used for LLM-backed extract and supervisor flows (`extract-fn.ts`, `supervisor.ts`).
## Usage
```typescript
import { executeThread } from "@uncaged/workflow-execute";
// Typical callers are CLI/tests that supply ExecuteThreadIo (paths, CAS, abort, logger, …).
```
@@ -0,0 +1,317 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { createCasStore } from "@uncaged/workflow-cas";
import type {
RoleOutput,
ThreadContext,
WorkflowCompletion,
WorkflowFn,
WorkflowRuntime,
} from "@uncaged/workflow-runtime";
import { parse as parseYaml } from "yaml";
import { executeThread } from "../src/engine/engine.js";
import type { ExecuteThreadIo, ExecuteThreadOptions } from "../src/engine/types.js";
const TEST_REGISTRY_YAML = `config:
maxDepth: 3
supervisorInterval: 0
providers:
stub:
baseUrl: http://127.0.0.1:9
apiKey: test
models:
default: stub/m
workflows: {}
`;
function noLogger(): (tag: string, content: string) => void {
return () => {};
}
function makeOptions(overrides: Partial<ExecuteThreadOptions>): ExecuteThreadOptions {
return {
maxRounds: 5,
depth: 0,
signal: new AbortController().signal,
awaitAfterEachYield: async () => {},
forkSourceThreadId: null,
prefilledDiskSteps: null,
storageRoot: "/tmp/never",
...overrides,
};
}
async function setupStorage(): Promise<{
storageRoot: string;
casDir: string;
bundleHash: string;
bundleDir: string;
}> {
const storageRoot = await mkdtemp(join(tmpdir(), "uncaged-wf-engine-"));
await writeFile(join(storageRoot, "workflow.yaml"), TEST_REGISTRY_YAML, "utf8");
const casDir = join(storageRoot, "cas");
await mkdir(casDir, { recursive: true });
const bundleHash = "TESTHASH00001";
const bundleDir = join(storageRoot, "bundles", bundleHash);
return { storageRoot, casDir, bundleHash, bundleDir };
}
function readCasNode(casDir: string, hash: string): Record<string, unknown> {
const text = require("node:fs").readFileSync(join(casDir, `${hash}.txt`), "utf8") as string;
return parseYaml(text) as Record<string, unknown>;
}
describe("executeThread (Phase 2 — CAS thread storage)", () => {
let storageRoot: string;
let casDir: string;
let bundleHash: string;
let bundleDir: string;
beforeEach(async () => {
const setup = await setupStorage();
storageRoot = setup.storageRoot;
casDir = setup.casDir;
bundleHash = setup.bundleHash;
bundleDir = setup.bundleDir;
});
afterEach(async () => {
await rm(storageRoot, { recursive: true, force: true });
});
test("writes a StartNode whose refs[0] is the prompt CAS hash", async () => {
const cas = createCasStore(casDir);
// biome-ignore lint/correctness/useYield: deliberately empty generator — exercises the start/end path with no role steps
const wf: WorkflowFn = async function* (
_thread: ThreadContext,
_runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
return { returnCode: 0, summary: "no-op" };
};
const io: ExecuteThreadIo = {
threadId: "T01",
hash: bundleHash,
infoJsonlPath: join(storageRoot, "logs", bundleHash, "T01.info.jsonl"),
cas,
};
const result = await executeThread(
wf,
"demo",
{ prompt: "hello", steps: [] },
makeOptions({ storageRoot, maxRounds: 5 }),
io,
noLogger(),
);
expect(result.returnCode).toBe(0);
const historyText = await readFile(
(await import("node:fs/promises")).readdir ? await firstHistoryFile(bundleDir) : "",
"utf8",
);
const histLine = historyText.trim().split("\n")[0] ?? "";
const histEntry = JSON.parse(histLine) as Record<string, unknown>;
expect(histEntry.threadId).toBe("T01");
const startHash = histEntry.start as string;
const startNode = readCasNode(casDir, startHash);
expect(startNode.type).toBe("start");
expect((startNode.payload as Record<string, unknown>).name).toBe("demo");
expect((startNode.payload as Record<string, unknown>).hash).toBe(bundleHash);
expect((startNode.payload as Record<string, unknown>).maxRounds).toBe(5);
const refs = startNode.refs as string[];
expect(refs.length).toBe(1);
const promptBlob = await cas.get(refs[0] ?? "");
expect(promptBlob).not.toBeNull();
const promptParsed = parseYaml(promptBlob ?? "") as Record<string, unknown>;
expect(promptParsed.payload).toBe("hello");
});
test("each role yield produces a chained StateNode and updates threads.json head", async () => {
const cas = createCasStore(casDir);
const wf: WorkflowFn = async function* (
_thread: ThreadContext,
runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
const h1 = await runtime.cas.put("plan-text");
yield { role: "planner", contentHash: h1, meta: { plan: 1 }, refs: [h1] };
const h2 = await runtime.cas.put("code-text");
yield { role: "coder", contentHash: h2, meta: { diff: "y" }, refs: [h2] };
return { returnCode: 0, summary: "done" };
};
const io: ExecuteThreadIo = {
threadId: "T02",
hash: bundleHash,
infoJsonlPath: join(storageRoot, "logs", bundleHash, "T02.info.jsonl"),
cas,
};
let observedHead: string | null = null;
let observedHeadAtSecondYield: string | null = null;
const opts = makeOptions({
storageRoot,
maxRounds: 5,
awaitAfterEachYield: async () => {
const text = await readFile(join(bundleDir, "threads.json"), "utf8");
const parsed = JSON.parse(text) as Record<string, { head: string }>;
const head = parsed.T02?.head ?? null;
if (observedHead === null) {
observedHead = head;
} else if (observedHeadAtSecondYield === null) {
observedHeadAtSecondYield = head;
}
},
});
const result = await executeThread(
wf,
"demo",
{ prompt: "p", steps: [] },
opts,
io,
noLogger(),
);
expect(result.returnCode).toBe(0);
expect(observedHead).not.toBeNull();
expect(observedHeadAtSecondYield).not.toBeNull();
expect(observedHead).not.toBe(observedHeadAtSecondYield);
const firstState = readCasNode(casDir, observedHead ?? "");
expect(firstState.type).toBe("state");
expect((firstState.payload as Record<string, unknown>).role).toBe("planner");
expect((firstState.payload as Record<string, unknown>).ancestors).toEqual([]);
const secondState = readCasNode(casDir, observedHeadAtSecondYield ?? "");
expect(secondState.type).toBe("state");
expect((secondState.payload as Record<string, unknown>).role).toBe("coder");
expect((secondState.payload as Record<string, unknown>).ancestors).toEqual([observedHead]);
expect((secondState.payload as Record<string, unknown>).start).toBe(
(firstState.payload as Record<string, unknown>).start,
);
});
test("on completion: removes threads.json entry, appends history with __end__ head", async () => {
const cas = createCasStore(casDir);
const wf: WorkflowFn = async function* (
_thread: ThreadContext,
runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
const h = await runtime.cas.put("only-step");
yield { role: "only", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "completed" };
};
const io: ExecuteThreadIo = {
threadId: "T03",
hash: bundleHash,
infoJsonlPath: join(storageRoot, "logs", bundleHash, "T03.info.jsonl"),
cas,
};
const result = await executeThread(
wf,
"demo",
{ prompt: "p", steps: [] },
makeOptions({ storageRoot, maxRounds: 5 }),
io,
noLogger(),
);
expect(result.returnCode).toBe(0);
const indexText = await readFile(join(bundleDir, "threads.json"), "utf8");
const indexParsed = JSON.parse(indexText) as Record<string, unknown>;
expect(indexParsed).toEqual({});
const historyPath = await firstHistoryFile(bundleDir);
const historyText = await readFile(historyPath, "utf8");
const lines = historyText.trim().split("\n");
expect(lines.length).toBe(1);
const entry = JSON.parse(lines[0] ?? "") as Record<string, unknown>;
expect(entry.threadId).toBe("T03");
expect(entry.head).toBe(result.rootHash);
const endNode = readCasNode(casDir, String(entry.head));
expect(endNode.type).toBe("state");
expect((endNode.payload as Record<string, unknown>).role).toBe("__end__");
expect((endNode.payload as Record<string, unknown>).meta).toEqual({
returnCode: 0,
summary: "completed",
});
});
test("does not write any .data.jsonl file under storageRoot", async () => {
const cas = createCasStore(casDir);
const wf: WorkflowFn = async function* (
_thread: ThreadContext,
runtime: WorkflowRuntime,
): AsyncGenerator<RoleOutput, WorkflowCompletion> {
const h = await runtime.cas.put("step");
yield { role: "only", contentHash: h, meta: {}, refs: [h] };
return { returnCode: 0, summary: "done" };
};
const io: ExecuteThreadIo = {
threadId: "T04",
hash: bundleHash,
infoJsonlPath: join(storageRoot, "logs", bundleHash, "T04.info.jsonl"),
cas,
};
await executeThread(
wf,
"demo",
{ prompt: "p", steps: [] },
makeOptions({ storageRoot, maxRounds: 5 }),
io,
noLogger(),
);
const fsp = await import("node:fs/promises");
const found: string[] = [];
async function walk(dir: string): Promise<void> {
let entries: { name: string; isDirectory: () => boolean; isFile: () => boolean }[];
try {
entries = await fsp.readdir(dir, { withFileTypes: true });
} catch {
return;
}
for (const ent of entries) {
const p = join(dir, ent.name);
if (ent.isDirectory()) {
await walk(p);
} else if (ent.isFile() && ent.name.endsWith(".data.jsonl")) {
found.push(p);
}
}
}
await walk(storageRoot);
expect(found).toEqual([]);
});
});
async function firstHistoryFile(bundleDir: string): Promise<string> {
const fsp = await import("node:fs/promises");
const dir = join(bundleDir, "history");
const entries = await fsp.readdir(dir);
const file = entries.find((n) => n.endsWith(".jsonl"));
if (file === undefined) {
throw new Error(`no history file under ${dir}`);
}
return join(dir, file);
}
@@ -0,0 +1,91 @@
import { afterEach, beforeEach, describe, expect, test } from "bun:test";
import { mkdtemp, readdir, readFile, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import {
appendThreadHistoryEntry,
removeThreadEntry,
upsertThreadEntry,
} from "../src/engine/threads-index.js";
describe("threads-index", () => {
let bundleDir: string;
beforeEach(async () => {
bundleDir = await mkdtemp(join(tmpdir(), "uncaged-wf-threads-"));
});
afterEach(async () => {
await rm(bundleDir, { recursive: true, force: true });
});
test("upsertThreadEntry creates threads.json and persists entries", async () => {
await upsertThreadEntry(bundleDir, "T1", { head: "H1", start: "S1", updatedAt: 100 });
const text = await readFile(join(bundleDir, "threads.json"), "utf8");
const parsed = JSON.parse(text) as Record<string, unknown>;
expect(parsed).toEqual({
T1: { head: "H1", start: "S1", updatedAt: 100 },
});
});
test("upsertThreadEntry overwrites the head while preserving siblings", async () => {
await upsertThreadEntry(bundleDir, "T1", { head: "H1", start: "S1", updatedAt: 100 });
await upsertThreadEntry(bundleDir, "T2", { head: "H2", start: "S2", updatedAt: 200 });
await upsertThreadEntry(bundleDir, "T1", { head: "H1B", start: "S1", updatedAt: 300 });
const text = await readFile(join(bundleDir, "threads.json"), "utf8");
const parsed = JSON.parse(text) as Record<string, unknown>;
expect(parsed).toEqual({
T1: { head: "H1B", start: "S1", updatedAt: 300 },
T2: { head: "H2", start: "S2", updatedAt: 200 },
});
});
test("removeThreadEntry deletes the entry but keeps the file", async () => {
await upsertThreadEntry(bundleDir, "T1", { head: "H1", start: "S1", updatedAt: 100 });
await upsertThreadEntry(bundleDir, "T2", { head: "H2", start: "S2", updatedAt: 200 });
await removeThreadEntry(bundleDir, "T1");
const text = await readFile(join(bundleDir, "threads.json"), "utf8");
const parsed = JSON.parse(text) as Record<string, unknown>;
expect(parsed).toEqual({
T2: { head: "H2", start: "S2", updatedAt: 200 },
});
});
test("removeThreadEntry on a missing thread is a no-op", async () => {
await removeThreadEntry(bundleDir, "MISSING");
const dirEntries = await readdir(bundleDir);
expect(dirEntries.includes("threads.json")).toBe(false);
});
test("appendThreadHistoryEntry writes one JSONL line per call into a date-keyed file", async () => {
const ts = Date.UTC(2026, 4, 9, 12, 0, 0);
await appendThreadHistoryEntry(bundleDir, {
threadId: "T1",
head: "H1",
start: "S1",
completedAt: ts,
});
await appendThreadHistoryEntry(bundleDir, {
threadId: "T2",
head: "H2",
start: "S2",
completedAt: ts,
});
const text = await readFile(join(bundleDir, "history", "2026-05-09.jsonl"), "utf8");
const lines = text.trim().split("\n");
expect(lines.length).toBe(2);
expect(JSON.parse(lines[0] ?? "{}")).toEqual({
threadId: "T1",
head: "H1",
start: "S1",
completedAt: ts,
});
expect(JSON.parse(lines[1] ?? "{}")).toEqual({
threadId: "T2",
head: "H2",
start: "S2",
completedAt: ts,
});
});
});
+225 -118
View File
@@ -1,5 +1,18 @@
import { appendFile, mkdir } from "node:fs/promises";
import { mkdir } from "node:fs/promises";
import { dirname } from "node:path";
import {
type CasStore,
getContentMerklePayload,
putContentNodeWithRefs,
putStartNode,
putStateNode,
} from "@uncaged/workflow-cas";
import type { StateNode } from "@uncaged/workflow-protocol";
import {
readWorkflowRegistry,
resolveModel,
type WorkflowConfig,
} from "@uncaged/workflow-register";
import type {
LlmProvider,
RoleOutput,
@@ -9,21 +22,38 @@ import type {
WorkflowResult,
WorkflowRuntime,
} from "@uncaged/workflow-runtime";
import { START } from "@uncaged/workflow-runtime";
import {
type CasStore,
getContentMerklePayload,
putStepMerkleNode,
putThreadMerkleNode,
} from "@uncaged/workflow-cas";
import { resolveModel } from "@uncaged/workflow-register";
import { createExtract } from "../extract/index.js";
import { readWorkflowRegistry, type WorkflowConfig } from "@uncaged/workflow-register";
import { err, type LogFn, normalizeRefsField, ok, type Result } from "@uncaged/workflow-util";
import { END, START } from "@uncaged/workflow-runtime";
import { err, type LogFn, ok, type Result } from "@uncaged/workflow-util";
import { createExtract } from "../extract/index.js";
import { runSupervisor } from "./supervisor.js";
import {
appendThreadHistoryEntry,
getBundleDir,
removeThreadEntry,
upsertThreadEntry,
} from "./threads-index.js";
import type { ExecuteThreadIo, ExecuteThreadOptions } from "./types.js";
/** Cap for {@link StateNode}.payload.ancestors: 1 parent + 10 skip-list. */
const ANCESTORS_CAP = 11;
type ChainState = {
/** State hash of the most recently written {@link StateNode}, or `null` before the first step. */
parentStateHash: string | null;
/** Ancestors recorded on the most recently written {@link StateNode}. */
parentAncestors: readonly string[];
};
const EMPTY_CHAIN: ChainState = { parentStateHash: null, parentAncestors: [] };
function computeAncestors(chain: ChainState): string[] {
if (chain.parentStateHash === null) {
return [];
}
return [chain.parentStateHash, ...chain.parentAncestors].slice(0, ANCESTORS_CAP);
}
async function resolveEngineRegistryRuntime(
storageRoot: string,
cas: CasStore,
@@ -57,51 +87,108 @@ async function resolveEngineRegistryRuntime(
return ok({ extract: createExtract(llmProvider, { cas }), workflowConfig: cfg });
}
async function appendDataLine(path: string, record: unknown): Promise<void> {
const line = `${JSON.stringify(record)}\n`;
await appendFile(path, line, "utf8");
async function appendStateForStep(params: {
cas: CasStore;
startHash: string;
chain: ChainState;
role: string;
contentHash: string;
meta: Record<string, unknown>;
refs: readonly string[];
timestamp: number;
}): Promise<{ stateHash: string; chain: ChainState }> {
const text = await getContentMerklePayload(params.cas, params.contentHash);
if (text === null) {
throw new Error(
`role step ${params.role}: CAS blob missing for contentHash ${params.contentHash}`,
);
}
const artifactRefs = params.refs.filter((r) => r !== params.contentHash);
const contentHash = await putContentNodeWithRefs(params.cas, text, artifactRefs);
const ancestors = computeAncestors(params.chain);
const payload: StateNode["payload"] = {
role: params.role,
meta: params.meta,
start: params.startHash,
content: contentHash,
ancestors,
compact: null,
timestamp: params.timestamp,
};
const stateHash = await putStateNode(params.cas, payload);
return {
stateHash,
chain: { parentStateHash: stateHash, parentAncestors: ancestors },
};
}
async function finalizeThreadResult(params: {
async function appendEndState(params: {
cas: CasStore;
workflowName: string;
startHash: string;
chain: ChainState;
completion: WorkflowCompletion;
timestamp: number;
}): Promise<string> {
const contentHash = await putContentNodeWithRefs(params.cas, params.completion.summary, []);
const ancestors = computeAncestors(params.chain);
const payload: StateNode["payload"] = {
role: END,
meta: { returnCode: params.completion.returnCode, summary: params.completion.summary },
start: params.startHash,
content: contentHash,
ancestors,
compact: null,
timestamp: params.timestamp,
};
return putStateNode(params.cas, payload);
}
async function finalizeThread(params: {
cas: CasStore;
bundleDir: string;
threadId: string;
stepMerkleHashes: readonly string[];
startHash: string;
chain: ChainState;
completion: WorkflowCompletion;
}): Promise<WorkflowResult> {
const rootHash = await putThreadMerkleNode(
params.cas,
{
workflow: params.workflowName,
threadId: params.threadId,
result: {
returnCode: params.completion.returnCode,
summary: params.completion.summary,
},
},
params.stepMerkleHashes,
);
const ts = Date.now();
const endHash = await appendEndState({
cas: params.cas,
startHash: params.startHash,
chain: params.chain,
completion: params.completion,
timestamp: ts,
});
await removeThreadEntry(params.bundleDir, params.threadId);
await appendThreadHistoryEntry(params.bundleDir, {
threadId: params.threadId,
head: endHash,
start: params.startHash,
completedAt: ts,
});
return {
returnCode: params.completion.returnCode,
summary: params.completion.summary,
rootHash,
rootHash: endHash,
};
}
async function finalizeAbortedThread(params: {
cas: CasStore;
workflowName: string;
bundleDir: string;
threadId: string;
stepMerkleHashes: string[];
startHash: string;
chain: ChainState;
logger: LogFn;
abortLogTag: string;
}): Promise<WorkflowResult> {
params.logger(params.abortLogTag, `thread ${params.threadId} aborted`);
return finalizeThreadResult({
return finalizeThread({
cas: params.cas,
workflowName: params.workflowName,
bundleDir: params.bundleDir,
threadId: params.threadId,
stepMerkleHashes: params.stepMerkleHashes,
startHash: params.startHash,
chain: params.chain,
completion: { returnCode: 130, summary: "thread aborted" },
});
}
@@ -114,8 +201,9 @@ async function maybeSupervisorHaltsThread(params: {
logger: LogFn;
threadId: string;
cas: CasStore;
workflowName: string;
stepMerkleHashes: string[];
bundleDir: string;
startHash: string;
chain: ChainState;
}): Promise<WorkflowResult | null> {
const interval = params.workflowConfig.supervisorInterval;
if (interval <= 0 || params.written % interval !== 0) {
@@ -135,41 +223,55 @@ async function maybeSupervisorHaltsThread(params: {
return null;
}
params.logger("M4QX8VHN", `thread ${params.threadId} stopped by supervisor`);
return finalizeThreadResult({
return finalizeThread({
cas: params.cas,
workflowName: params.workflowName,
bundleDir: params.bundleDir,
threadId: params.threadId,
stepMerkleHashes: params.stepMerkleHashes,
startHash: params.startHash,
chain: params.chain,
completion: { returnCode: 0, summary: "completed: supervisor stopped thread" },
});
}
async function publishHead(params: {
bundleDir: string;
threadId: string;
startHash: string;
headHash: string;
}): Promise<void> {
await upsertThreadEntry(params.bundleDir, params.threadId, {
head: params.headHash,
start: params.startHash,
updatedAt: Date.now(),
});
}
async function driveWorkflowGenerator(params: {
fn: WorkflowFn;
workflowName: string;
workflowConfig: WorkflowConfig;
thread: ThreadContext;
runtime: WorkflowRuntime;
executeOptions: ExecuteThreadOptions;
dataJsonlPath: string;
threadId: string;
logger: LogFn;
cas: CasStore;
stepMerkleHashes: string[];
bundleDir: string;
startHash: string;
chain: ChainState;
}): Promise<WorkflowResult> {
const {
fn,
workflowName,
workflowConfig,
thread,
runtime,
executeOptions,
dataJsonlPath,
threadId,
logger,
cas,
stepMerkleHashes,
bundleDir,
startHash,
} = params;
let chain: ChainState = params.chain;
const gen = fn(thread, runtime);
let written = 0;
const recentSupervisorSteps: { role: string; summary: string }[] = thread.steps.map((s) => ({
@@ -181,9 +283,10 @@ async function driveWorkflowGenerator(params: {
if (executeOptions.signal.aborted) {
return await finalizeAbortedThread({
cas,
workflowName,
bundleDir,
threadId,
stepMerkleHashes,
startHash,
chain,
logger,
abortLogTag: "V8JX4NP2",
});
@@ -191,11 +294,12 @@ async function driveWorkflowGenerator(params: {
if (written >= executeOptions.maxRounds) {
logger("R3CW7YBQ", `thread ${threadId} stopped at maxRounds=${executeOptions.maxRounds}`);
return await finalizeThreadResult({
return await finalizeThread({
cas,
workflowName,
bundleDir,
threadId,
stepMerkleHashes,
startHash,
chain,
completion: {
returnCode: 0,
summary: `completed: reached maxRounds (${executeOptions.maxRounds})`,
@@ -207,39 +311,31 @@ async function driveWorkflowGenerator(params: {
if (iterResult.done) {
logger("F3HN8QKP", `thread ${threadId} generator finished`);
const completion = iterResult.value;
return await finalizeThreadResult({
return await finalizeThread({
cas,
workflowName,
bundleDir,
threadId,
stepMerkleHashes,
completion,
startHash,
chain,
completion: iterResult.value,
});
}
written++;
const step = iterResult.value;
const resolved = await getContentMerklePayload(cas, step.contentHash);
if (resolved === null) {
throw new Error(
`role step ${step.role}: CAS blob missing for contentHash ${step.contentHash}`,
);
}
const ts = Date.now();
await appendDataLine(dataJsonlPath, {
const written_ = await appendStateForStep({
cas,
startHash,
chain,
role: step.role,
contentHash: step.contentHash,
meta: step.meta,
refs: normalizeRefsField(step.refs),
refs: step.refs,
timestamp: ts,
});
const stepNodeHash = await putStepMerkleNode(
cas,
{ role: step.role, meta: step.meta },
step.contentHash,
);
stepMerkleHashes.push(stepNodeHash);
chain = written_.chain;
await publishHead({ bundleDir, threadId, startHash, headHash: written_.stateHash });
logger("N7BW4YHQ", `thread ${threadId} wrote role ${step.role}`);
@@ -262,9 +358,10 @@ async function driveWorkflowGenerator(params: {
if (executeOptions.signal.aborted) {
return await finalizeAbortedThread({
cas,
workflowName,
bundleDir,
threadId,
stepMerkleHashes,
startHash,
chain,
logger,
abortLogTag: "V8JX4NP4",
});
@@ -278,8 +375,9 @@ async function driveWorkflowGenerator(params: {
logger,
threadId,
cas,
workflowName,
stepMerkleHashes,
bundleDir,
startHash,
chain,
});
if (supervised !== null) {
return supervised;
@@ -288,8 +386,16 @@ async function driveWorkflowGenerator(params: {
}
/**
* Execute a workflow thread: drive the bundle's AsyncGenerator, RFC-001 `.data.jsonl` records,
* debug lines via `logger` to `.info.jsonl`.
* Execute a workflow thread by driving the bundle's `AsyncGenerator`.
*
* Persistence layout (RFC v3 — CAS-based thread storage):
* - Thread chain is written as immutable CAS blobs: a single {@link StartNode}
* plus one {@link StateNode} per role step (including a final `__end__`
* state on completion / abort / `maxRounds`).
* - The active thread head is published in `<bundleDir>/threads.json`; on
* completion it is removed and a record is appended to
* `<bundleDir>/history/{YYYY-MM-DD}.jsonl`.
* - Debug logging continues to flow through `logger` to `.info.jsonl`.
*/
export async function executeThread(
fn: WorkflowFn,
@@ -299,7 +405,6 @@ export async function executeThread(
io: ExecuteThreadIo,
logger: LogFn,
): Promise<WorkflowResult> {
await mkdir(dirname(io.dataJsonlPath), { recursive: true });
await mkdir(dirname(io.infoJsonlPath), { recursive: true });
const prefilled = options.prefilledDiskSteps;
@@ -309,61 +414,63 @@ export async function executeThread(
);
}
const nowMs = Date.now();
const startRecord: Record<string, unknown> = {
name: workflowName,
hash: io.hash,
threadId: io.threadId,
parameters: {
prompt: input.prompt,
options: {
maxRounds: options.maxRounds,
depth: options.depth,
},
},
timestamp: nowMs,
};
if (options.forkSourceThreadId !== null) {
startRecord.forkFrom = { threadId: options.forkSourceThreadId };
}
const bundleDir = getBundleDir(options.storageRoot, io.hash);
await appendDataLine(io.dataJsonlPath, startRecord);
const promptHash = await io.cas.put(input.prompt);
const startHash = await putStartNode(
io.cas,
{
name: workflowName,
hash: io.hash,
maxRounds: options.maxRounds,
depth: options.depth,
},
promptHash,
);
await publishHead({
bundleDir,
threadId: io.threadId,
startHash,
headHash: startHash,
});
logger("T9HQ2KHM", `thread ${io.threadId} started for workflow ${workflowName}`);
const stepMerkleHashes: string[] = [];
let chain: ChainState = EMPTY_CHAIN;
if (prefilled !== null) {
for (const row of prefilled) {
const prefilledPayload = await getContentMerklePayload(io.cas, row.contentHash);
if (prefilledPayload === null) {
throw new Error(
`prefilled step ${row.role}: CAS blob missing for contentHash ${row.contentHash}`,
);
}
await appendDataLine(io.dataJsonlPath, {
const written = await appendStateForStep({
cas: io.cas,
startHash,
chain,
role: row.role,
contentHash: row.contentHash,
meta: row.meta,
refs: normalizeRefsField(row.refs),
refs: row.refs,
timestamp: row.timestamp,
});
const stepNodeHash = await putStepMerkleNode(
io.cas,
{ role: row.role, meta: row.meta },
row.contentHash,
);
stepMerkleHashes.push(stepNodeHash);
chain = written.chain;
await publishHead({
bundleDir,
threadId: io.threadId,
startHash,
headHash: written.stateHash,
});
}
}
const nowMs = Date.now();
if (options.maxRounds <= 0) {
logger("R3CW7YBQ", `thread ${io.threadId} stopped at maxRounds=${options.maxRounds}`);
return await finalizeThreadResult({
return await finalizeThread({
cas: io.cas,
workflowName,
bundleDir,
threadId: io.threadId,
stepMerkleHashes,
startHash,
chain,
completion: {
returnCode: 0,
summary: `completed: reached maxRounds (${options.maxRounds})`,
@@ -401,15 +508,15 @@ export async function executeThread(
return await driveWorkflowGenerator({
fn,
workflowName,
workflowConfig: registryRuntime.value.workflowConfig,
thread,
runtime,
executeOptions: options,
dataJsonlPath: io.dataJsonlPath,
threadId: io.threadId,
logger,
cas: io.cas,
stepMerkleHashes,
bundleDir,
startHash,
chain,
});
}
@@ -9,6 +9,13 @@ export {
} from "./fork-thread.js";
export { garbageCollectCas } from "./gc.js";
export { createThreadPauseGate } from "./thread-pause-gate.js";
export type { ThreadHistoryEntry, ThreadIndex, ThreadIndexEntry } from "./threads-index.js";
export {
appendThreadHistoryEntry,
getBundleDir,
removeThreadEntry,
upsertThreadEntry,
} from "./threads-index.js";
export type {
ExecuteThreadIo,
ExecuteThreadOptions,
@@ -75,7 +75,7 @@ export async function runSupervisor(
});
if (!result.ok) {
args.logger("R9CW4PLM", `supervisor failed: ${result.error}`);
args.logger("R9CW4PHM", `supervisor failed: ${result.error}`);
return err(`supervisor: ${result.error}`);
}
@@ -0,0 +1,136 @@
import { appendFile, mkdir, readFile, rename, writeFile } from "node:fs/promises";
import { dirname, join } from "node:path";
/**
* Active-thread index entry stored in `<bundleDir>/threads.json`.
*
* Once the thread reaches `__end__`, the entry is removed from `threads.json`
* and a corresponding line is appended to `history/{YYYY-MM-DD}.jsonl`.
*/
export type ThreadIndexEntry = {
head: string;
start: string;
updatedAt: number;
};
export type ThreadHistoryEntry = {
threadId: string;
head: string;
start: string;
completedAt: number;
};
export type ThreadIndex = Record<string, ThreadIndexEntry>;
export function getBundleDir(storageRoot: string, bundleHash: string): string {
return join(storageRoot, "bundles", bundleHash);
}
function threadsJsonPath(bundleDir: string): string {
return join(bundleDir, "threads.json");
}
function isPlainObject(v: unknown): v is Record<string, unknown> {
return v !== null && typeof v === "object" && !Array.isArray(v);
}
function parseThreadIndexEntry(raw: unknown): ThreadIndexEntry | null {
if (!isPlainObject(raw)) {
return null;
}
const head = raw.head;
const start = raw.start;
const updatedAt = raw.updatedAt;
if (typeof head !== "string" || typeof start !== "string" || typeof updatedAt !== "number") {
return null;
}
return { head, start, updatedAt };
}
function parseThreadIndex(text: string): ThreadIndex {
const trimmed = text.trim();
if (trimmed === "") {
return {};
}
let raw: unknown;
try {
raw = JSON.parse(trimmed) as unknown;
} catch {
return {};
}
if (!isPlainObject(raw)) {
return {};
}
const out: ThreadIndex = {};
for (const [k, v] of Object.entries(raw)) {
const entry = parseThreadIndexEntry(v);
if (entry !== null) {
out[k] = entry;
}
}
return out;
}
async function readThreadIndex(bundleDir: string): Promise<ThreadIndex> {
const path = threadsJsonPath(bundleDir);
let text: string;
try {
text = await readFile(path, "utf8");
} catch (e) {
const errObj = e as NodeJS.ErrnoException;
if (errObj.code === "ENOENT") {
return {};
}
throw e;
}
return parseThreadIndex(text);
}
async function writeThreadIndex(bundleDir: string, index: ThreadIndex): Promise<void> {
const path = threadsJsonPath(bundleDir);
await mkdir(dirname(path), { recursive: true });
const tmp = `${path}.tmp.${process.pid}.${Date.now()}`;
const json = `${JSON.stringify(index, null, 2)}\n`;
await writeFile(tmp, json, "utf8");
await rename(tmp, path);
}
/** Insert/update a thread entry in `threads.json`. */
export async function upsertThreadEntry(
bundleDir: string,
threadId: string,
entry: ThreadIndexEntry,
): Promise<void> {
const index = await readThreadIndex(bundleDir);
index[threadId] = entry;
await writeThreadIndex(bundleDir, index);
}
/** Remove a thread entry from `threads.json` (no-op when absent). */
export async function removeThreadEntry(bundleDir: string, threadId: string): Promise<void> {
const index = await readThreadIndex(bundleDir);
if (!(threadId in index)) {
return;
}
delete index[threadId];
await writeThreadIndex(bundleDir, index);
}
function dateKey(epochMs: number): string {
const d = new Date(epochMs);
const y = d.getUTCFullYear().toString().padStart(4, "0");
const m = (d.getUTCMonth() + 1).toString().padStart(2, "0");
const day = d.getUTCDate().toString().padStart(2, "0");
return `${y}-${m}-${day}`;
}
/** Append a completion record to `history/{YYYY-MM-DD}.jsonl` keyed off `completedAt`. */
export async function appendThreadHistoryEntry(
bundleDir: string,
entry: ThreadHistoryEntry,
): Promise<void> {
const path = join(bundleDir, "history", `${dateKey(entry.completedAt)}.jsonl`);
await mkdir(dirname(path), { recursive: true });
const line = `${JSON.stringify(entry)}\n`;
await appendFile(path, line, "utf8");
}
@@ -1,5 +1,5 @@
import type { RoleOutput } from "@uncaged/workflow-runtime";
import type { CasStore } from "@uncaged/workflow-cas";
import type { RoleOutput } from "@uncaged/workflow-runtime";
import type { Result } from "@uncaged/workflow-util";
export type SupervisorDecision = "continue" | "stop";
@@ -7,7 +7,6 @@ export type SupervisorDecision = "continue" | "stop";
export type ExecuteThreadIo = {
threadId: string;
hash: string;
dataJsonlPath: string;
infoJsonlPath: string;
cas: CasStore;
};
@@ -1,7 +1,7 @@
import { appendFile, mkdir, unlink, writeFile } from "node:fs/promises";
import { mkdir, unlink, writeFile } from "node:fs/promises";
import { createServer, type Socket } from "node:net";
import { dirname, join } from "node:path";
import type { RoleOutput, WorkflowFn, WorkflowResult } from "@uncaged/workflow-runtime";
import type { RoleOutput, WorkflowFn } from "@uncaged/workflow-runtime";
import { ensureUncagedWorkflowSymlink, importWorkflowBundleModule } from "@uncaged/workflow-register";
import { createCasStore } from "@uncaged/workflow-cas";
import {
@@ -364,13 +364,11 @@ async function main(): Promise<void> {
const threadId = cmd.threadId;
const runningPath = join(storageRoot, "logs", hash, `${threadId}.running`);
const dataJsonlPath = join(storageRoot, "logs", hash, `${threadId}.data.jsonl`);
const infoJsonlPath = join(storageRoot, "logs", hash, `${threadId}.info.jsonl`);
const io: ExecuteThreadIo = {
threadId,
hash,
dataJsonlPath,
infoJsonlPath,
cas,
};
@@ -387,7 +385,6 @@ async function main(): Promise<void> {
try {
await mkdir(dirname(runningPath), { recursive: true });
await mkdir(dirname(dataJsonlPath), { recursive: true });
await writeFile(runningPath, "", "utf8");
const logger = createLogger({ sink: { kind: "file", path: infoJsonlPath } });
@@ -407,7 +404,7 @@ async function main(): Promise<void> {
});
}
const runResult = await executeThread(
await executeThread(
workflowFn,
cmd.workflowName,
{ prompt: cmd.prompt, steps: cmd.steps },
@@ -422,12 +419,9 @@ async function main(): Promise<void> {
io,
logger,
);
await appendFile(dataJsonlPath, `${JSON.stringify(runResult)}\n`, "utf8");
} catch (e) {
const message = e instanceof Error ? e.message : String(e);
bootLog("Q3MN8YKW", `thread ${threadId} failed: ${message}`);
const failure: WorkflowResult = { returnCode: 1, summary: message, rootHash: "" };
await appendFile(dataJsonlPath, `${JSON.stringify(failure)}\n`, "utf8").catch(() => {});
} finally {
threads.delete(threadId);
await unlink(runningPath).catch(() => {});
@@ -74,13 +74,11 @@ export function workflowAsAgent(
};
const childThreadId = generateUlid(Date.now());
const dataJsonlPath = join(storageRoot, "logs", entry.hash, `${childThreadId}.data.jsonl`);
const infoJsonlPath = join(storageRoot, "logs", entry.hash, `${childThreadId}.info.jsonl`);
const io: ExecuteThreadIo = {
threadId: childThreadId,
hash: entry.hash,
dataJsonlPath,
infoJsonlPath,
cas: createCasStore(getGlobalCasDir(storageRoot)),
};
+29
View File
@@ -0,0 +1,29 @@
# @uncaged/workflow-protocol
Shared workflow types, sentinel constants, and `Result` helpers.
## What This Package Does
It defines the cross-package contract for bundles and the engine: thread/step shapes, `WorkflowFn`, agent/extract contexts, descriptor types, and `CasStore` as an interface. Implementations (CAS store, CLI, extract) depend on these types so bundles stay decoupled from Node APIs.
## Key Exports
From `src/index.ts`:
- **Types:** `Result`, `CasStore`, `WorkflowRoleSchema`, `WorkflowRoleDescriptor`, `WorkflowDescriptor`, `RoleMeta`, `RoleOutput`, `StartStep`, `RoleStep`, `ThreadContext`, `ModeratorContext`, `AgentContext`, `ExtractContext`, `WorkflowCompletion`, `WorkflowResult`, `LlmProvider`, `ProviderConfig`, `ResolvedModel`, `WorkflowConfig`, `ExtractFn`, `AgentFn`, `AgentBinding`, `WorkflowRuntime`, `WorkflowFn`, `RoleDefinition`, `Moderator`, `WorkflowDefinition`, `AdvanceOutcome`
- **Constants:** `START`, `END`
- **Functions:** `ok`, `err`
## Dependencies
- **Peer:** `zod` ^4 — used in type positions for schemas (`ExtractFn`, `RoleDefinition`, etc.)
No workspace packages; this is the bottom layer.
## Usage
```typescript
import { END, START, type WorkflowFn, type ThreadContext } from "@uncaged/workflow-protocol";
```
Concrete `WorkflowFn` implementations are built with `@uncaged/workflow-runtime` (`createWorkflow`).
@@ -0,0 +1,36 @@
// ── CAS thread chain nodes (RFC: CAS-based thread storage) ──────────
export type StartNodePayload = {
name: string;
hash: string;
maxRounds: number;
depth: number;
};
export type StartNode = {
type: "start";
payload: StartNodePayload;
refs: string[];
};
export type StateNodePayload = {
role: string;
meta: Record<string, unknown>;
start: string;
content: string;
ancestors: string[];
compact: string | null;
timestamp: number;
};
export type StateNode = {
type: "state";
payload: StateNodePayload;
refs: string[];
};
export type ContentMerkleNode = {
type: "content";
payload: string;
refs: string[];
};
+36 -30
View File
@@ -1,40 +1,46 @@
// ── Types ──────────────────────────────────────────────────────────
export type {
Result,
CasStore,
WorkflowRoleSchema,
WorkflowRoleDescriptor,
WorkflowDescriptor,
RoleMeta,
RoleOutput,
StartStep,
RoleStep,
ThreadContext,
ModeratorContext,
AgentContext,
ExtractContext,
WorkflowCompletion,
WorkflowResult,
LlmProvider,
ProviderConfig,
ResolvedModel,
WorkflowConfig,
ExtractFn,
AgentFn,
AgentBinding,
WorkflowRuntime,
WorkflowFn,
RoleDefinition,
Moderator,
WorkflowDefinition,
AdvanceOutcome,
ContentMerkleNode,
StartNode,
StateNode,
} from "./cas-types.js";
export type {
AdvanceOutcome,
AgentBinding,
AgentContext,
AgentFn,
CasStore,
ExtractContext,
ExtractFn,
LlmProvider,
Moderator,
ModeratorContext,
ProviderConfig,
ResolvedModel,
Result,
RoleDefinition,
RoleMeta,
RoleOutput,
RoleStep,
StartStep,
ThreadContext,
WorkflowCompletion,
WorkflowConfig,
WorkflowDefinition,
WorkflowDescriptor,
WorkflowFn,
WorkflowResult,
WorkflowRoleDescriptor,
WorkflowRoleSchema,
WorkflowRuntime,
} from "./types.js";
// ── Constants ──────────────────────────────────────────────────────
export { START, END } from "./types.js";
export { END, START } from "./types.js";
// ── Constructor functions ──────────────────────────────────────────
export { ok, err } from "./result.js";
export { err, ok } from "./result.js";
+26
View File
@@ -0,0 +1,26 @@
# @uncaged/workflow-reactor
LLM calling abstraction and thread “reactor” for structured tool invocation.
## What This Package Does
It exposes `createLlmFn` (chat completion wrapper) and `createThreadReactor` (multi-turn tool loop configuration) plus supporting message/tool types. `@uncaged/workflow-execute` consumes this for extractor and supervisor paths that talk to OpenAI-style APIs with tools.
## Key Exports
From `src/index.ts`:
- **Functions:** `createLlmFn`, `createThreadReactor`
- **Types:** `ChatMessage`, `LlmFn`, `StructuredToolSpec`, `ThreadReactorConfig`, `ThreadReactorFn`, `ThreadReactorInvokeArgs`, `ToolCall`, `ToolDefinition`
## Dependencies
- **Workspace:** `@uncaged/workflow-protocol`
- **Peer:** `zod` ^4
## Usage
```typescript
import { createLlmFn, createThreadReactor } from "@uncaged/workflow-reactor";
// Usually composed inside @uncaged/workflow-execute rather than directly by applications.
```
+38
View File
@@ -0,0 +1,38 @@
# @uncaged/workflow-register
Bundle validation, dynamic export extraction, registry YAML, and model/provider resolution.
## What This Package Does
It validates workflow `.esm.js` bundles, extracts `descriptor` / `run` exports at runtime, reads and writes `workflow.yaml`, and resolves which LLM endpoint/model to use from registry config (`resolveModel`, `splitProviderModelRef`).
## Key Exports
From `src/index.ts`:
- **Bundle:** `buildDescriptor`, `importWorkflowBundleModule`, `validateWorkflowBundle`, `ensureUncagedWorkflowSymlink`, `extractBundleExports`, `stringifyWorkflowDescriptor`, `validateWorkflowDescriptor`
- **Bundle types:** `ExtractBundleExportsOptions`, `ExtractedBundleExports`, `WorkflowBundleValidationInput`, `WorkflowDescriptor`, `WorkflowRoleDescriptor`, `WorkflowRoleSchema`
- **Registry:** `getRegisteredWorkflow`, `listRegisteredWorkflowNames`, `parseWorkflowRegistryYaml`, `readWorkflowRegistry`, `registerWorkflowVersion`, `rollbackWorkflowToHistoryHash`, `stringifyWorkflowRegistryYaml`, `unregisterWorkflow`, `workflowRegistryPath`, `writeWorkflowRegistry`
- **Registry types:** `WorkflowConfig`, `WorkflowHistoryEntry`, `WorkflowRegistryEntry`, `WorkflowRegistryFile`
- **Config:** `resolveModel`, `splitProviderModelRef`, types `ProviderConfig`, `ResolvedModel`
## Dependencies
- **Workspace:** `@uncaged/workflow-protocol`, `@uncaged/workflow-util`
- **Peer:** `acorn`, `yaml`, `zod` ^4 — parsing/validation at runtime for consumers
## Usage
```typescript
import { readFile } from "node:fs/promises";
import { readWorkflowRegistry, validateWorkflowBundle } from "@uncaged/workflow-register";
import { getDefaultWorkflowStorageRoot } from "@uncaged/workflow-util";
const reg = await readWorkflowRegistry(getDefaultWorkflowStorageRoot());
if (!reg.ok) throw new Error(reg.error.message);
const path = "./my.esm.js";
const source = await readFile(path, "utf8");
const v = validateWorkflowBundle({ filePath: path, source });
if (!v.ok) throw new Error(v.error);
```
+28
View File
@@ -0,0 +1,28 @@
# @uncaged/workflow-runtime
Workflow author API: `createWorkflow` plus re-exports of protocol workflow types.
## What This Package Does
Bundle code imports `createWorkflow` to turn a `WorkflowDefinition` plus `AgentBinding` into a `WorkflowFn` generator. It re-exports the protocol types and constants most authors need so workflows rarely import `@uncaged/workflow-protocol` directly.
## Key Exports
From `src/index.ts`:
- **Functions:** `createWorkflow`, `ok`, `err`
- **Types:** `AgentBinding`, `AgentContext`, `AgentFn`, `CasStore`, `ExtractContext`, `ExtractFn`, `LlmProvider`, `Moderator`, `ModeratorContext`, `Result`, `RoleDefinition`, `RoleMeta`, `RoleOutput`, `RoleStep`, `StartStep`, `ThreadContext`, `WorkflowCompletion`, `WorkflowDefinition`, `WorkflowDescriptor`, `WorkflowFn`, `WorkflowResult`, `WorkflowRoleDescriptor`, `WorkflowRoleSchema`, `WorkflowRuntime`
- **Constants:** `END`, `START`
## Dependencies
- **Workspace:** `@uncaged/workflow-protocol` — contract types and helpers
- **Peer:** `zod` ^4 — matches schema usage on role definitions
## Usage
```typescript
import { createWorkflow, type WorkflowDefinition, type AgentBinding } from "@uncaged/workflow-runtime";
export const run = createWorkflow(myDefinition, myBinding);
```
+32
View File
@@ -0,0 +1,32 @@
# @uncaged/workflow-util
Shared utilities: encoding, IDs, logging, storage paths, and ref-field normalization.
## What This Package Does
It provides filesystem-safe Base32 and ULID generation, the structured logger used across packages, helpers for the default workflow data directory and global CAS path, and utilities to merge/normalize `refs` on steps. It re-exports `ok`/`err` from protocol for convenience.
## Key Exports
From `src/index.ts`:
- **Base32:** `CROCKFORD_BASE32_ALPHABET`, `decodeCrockfordBase32Bits`, `decodeCrockfordToUint64`, `encodeCrockfordBase32Bits`, `encodeUint64AsCrockford`
- **Logger:** `createLogger`
- **Refs:** `mergeRefsWithContentHash`, `normalizeRefsField`
- **Result:** `ok`, `err` (from `@uncaged/workflow-protocol`)
- **Paths:** `getDefaultWorkflowStorageRoot`, `getGlobalCasDir`
- **ULID:** `generateUlid`
- **Types:** `CreateLoggerOptions`, `LogFn`, `LoggerSink`, `Result`
## Dependencies
- **Workspace:** `@uncaged/workflow-protocol``Result` and shared types used by helpers
## Usage
```typescript
import { createLogger, getDefaultWorkflowStorageRoot, generateUlid } from "@uncaged/workflow-util";
const log = createLogger();
log("4KNMR2PX", "example");
```